In [None]:
%%capture
!pip install sentence_transformers lime

# Обучение модели

In [1]:
import pandas as pd
import numpy as np
from IPython.display import HTML
from functools import partial
import lime
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score
from lime.lime_text import LimeTextExplainer
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModel


class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            return_token_type_ids=True,
            return_attention_mask=True,
            truncation=True
        )
        input_ids = inputs['input_ids']
        attention_mask = inputs['attention_mask']
        return {
            'input_ids': torch.tensor(input_ids, dtype=torch.long),
            'attention_mask': torch.tensor(attention_mask, dtype=torch.long),
            'labels': torch.tensor(self.labels[idx], dtype=torch.float)
        }

    
data = pd.read_csv('/kaggle/input/zvon-v-ushah/zvon_v_ushah.csv').drop(columns='Unnamed: 0')

X, y = data['code'].values, data['by_human'].values
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=data['by_human'])
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.3, random_state=42, stratify=y_temp)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
class LogRegClassifier(nn.Module):
    def __init__(self, transformer_output_dim):
        super(LogRegClassifier, self).__init__()
        self.linear = nn.Linear(transformer_output_dim, 1)

    def forward(self, x):
        return torch.sigmoid(self.linear(x))
    

class CombinedModel(nn.Module):
    def __init__(self, transformer, classifier):
        super(CombinedModel, self).__init__()
        self.transformer = transformer
        self.classifier = classifier
        
#         for param in self.transformer.parameters():
#             param.requires_grad = False

    def forward(self, input_ids, attention_mask):
        outputs = self.transformer(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.last_hidden_state[:, 0, :]
        return self.classifier(pooled_output)

    
def train_model(combined_model, train_dataloader, val_dataloader, scheduler, epochs=5):
    combined_model.train()
    best_f1 = 0
    best_model_state = None

    for epoch in range(epochs):
        for batch in train_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = combined_model(input_ids=input_ids, attention_mask=attention_mask).squeeze(1)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        scheduler.step()
        
        val_f1 = validate_model(combined_model, val_dataloader)
        if val_f1 > best_f1:
            best_f1 = val_f1
            best_model_state = combined_model.state_dict()
        
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}, Val F1: {val_f1:.4f}')
    
    if best_model_state:
        combined_model.load_state_dict(best_model_state)
        torch.save(combined_model.state_dict(), 'best_model.pth')
    print(f'Best Validation F1: {best_f1:.4f}')

    
def validate_model(combined_model, val_dataloader):
    combined_model.eval()
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for batch in val_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = combined_model(input_ids=input_ids, attention_mask=attention_mask).squeeze()
            predictions = (outputs > 0.5).int()
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())
    
    f1 = f1_score(all_labels, all_predictions)
    return f1


def test_model(combined_model, test_dataloader):
    combined_model.eval()
    all_labels = []
    all_predictions = []
    total_loss = 0

    with torch.no_grad():
        for batch in test_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = combined_model(input_ids=input_ids, attention_mask=attention_mask).squeeze(1)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            predictions = (outputs > 0.5).int()
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())
    
    average_loss = total_loss / len(test_dataloader)
    accuracy = f1_score(all_labels, all_predictions)
    print(f'Test Loss: {average_loss:.4f}, Test F1: {accuracy:.4f}')

    
def predict(texts, tokenizer, combined_model):
    combined_model.eval()
    inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    with torch.no_grad():
        outputs = combined_model(input_ids=input_ids, attention_mask=attention_mask).squeeze()
        predictions = (outputs > 0.5).int().cpu().numpy()
    return predictions

In [9]:
# tokenizer = AutoTokenizer.from_pretrained('microsoft/graphcodebert-base')
# transformer = AutoModel.from_pretrained('microsoft/graphcodebert-base')

# train_dataset = TextDataset(X_train, y_train, tokenizer)
# train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
# val_dataset = TextDataset(X_val, y_val, tokenizer)
# val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=False)
# test_dataset = TextDataset(X_test, y_test, tokenizer)
# test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# transformer_output_dim = transformer.config.hidden_size
# classifier = LogRegClassifier(transformer_output_dim)
# combined_model = CombinedModel(transformer, classifier).to(device)

# criterion = nn.BCELoss()
# optimizer = torch.optim.Adam(combined_model.classifier.parameters(), lr=0.01)
# scheduler = StepLR(optimizer, step_size=6, gamma=0.8)

# train_model(combined_model, train_dataloader, val_dataloader, scheduler, epochs=15)
# test_model(combined_model, test_dataloader)

Test Loss: 0.1154, Test F1: 0.9614


In [3]:
tokenizer = AutoTokenizer.from_pretrained('microsoft/graphcodebert-base')
transformer = AutoModel.from_pretrained('microsoft/graphcodebert-base')

train_dataset = TextDataset(X_train, y_train, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_dataset = TextDataset(X_val, y_val, tokenizer)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_dataset = TextDataset(X_test, y_test, tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

transformer_output_dim = transformer.config.hidden_size
classifier = LogRegClassifier(transformer_output_dim)
combined_model = CombinedModel(transformer, classifier)

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/539 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

  return self.fget.__get__(instance, owner)()
Some weights of RobertaModel were not initialized from the model checkpoint at microsoft/graphcodebert-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [4]:
combined_model.load_state_dict(torch.load('/kaggle/input/transformer_classifier/pytorch/default/1/best_model (1).pth'))
combined_model = combined_model.to(device)
combined_model.eval()

CombinedModel(
  (transformer): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): L

In [7]:
sample_texts = [
'''
class DebugInfoFinder {
public:
    void processInstruction(const Module &M, const Instruction &I) {
        // Collect debug info anchors for the instruction
        std::vector<std::string> anchors;
        for (const auto &anchor : M.getDebugInfoAnchors()) {
            if (anchor.getInstruction() == &I) {
                anchors.push_back(anchor.getDebugInfo());
            }
        }

        // Print the collected debug info anchors
        for (const auto &anchor : anchors) {
            std::cout << anchor << std::endl;
        }
    }
};
''',
'''
void llvm::install_out_of_memory_new_handler() {
  std::new_handler old = std::set_new_handler(out_of_memory_new_handler);
  (void)old;
  assert((old == nullptr || old == out_of_memory_new_handler) &&
         "new-handler already installed");
}
''',
'''
unsigned newRegUnit(CodeGenRegister *R0, CodeGenRegister *R1 = nullptr) {
    // Create a new register unit
    CodeGenRegisterUnit *unit = new CodeGenRegisterUnit();

    // Associate the root registers with the unit
    unit->rootRegisters.push_back(R0);
    if (R1!= nullptr) {
        unit->rootRegisters.push_back(R1);
    }

    // Return the new register unit
    return unit;
}
'''
]
# llm, human, llm
predictions = predict(sample_texts, tokenizer, combined_model)
print("Predictions:", predictions)

Predictions: [0 1 0]


# Interpretability

In [79]:
def predict_for_lime(texts, tokenizer, combined_model):
    inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)
    with torch.no_grad():
        outputs = combined_model(input_ids=input_ids, attention_mask=attention_mask).squeeze().cpu().numpy()
    res = []
    for i in range(len(outputs)):
        res.append((1 - outputs[i], outputs[i]))
    return np.array(res)


text = '''
std::vector<std::pair<float, int>> predict1(const std::vector<std::string>& texts, 
                                            torch::jit::script::Module& model, 
                                            /* Your tokenizer type */ tokenizer) {
    std::vector<std::pair<float, int>> proba_labels;

    // Tokenize the input texts
    // Assume tokenize function returns tensors
    auto inputs = tokenize(texts, tokenizer);
    auto input_ids = std::get<0>(inputs);
    auto attention_mask = std::get<1>(inputs);

    // Move tensors to the same device as the model
    input_ids = input_ids.to(torch::kCUDA);
    attention_mask = attention_mask.to(torch::kCUDA);

    // Perform inference
    torch::NoGradGuard no_grad;
    auto outputs = model.forward({input_ids, attention_mask}).toTensor().squeeze();

    // Convert logits to predictions and probabilities
    auto predictions = (outputs > 0.5).to(torch::kInt32);
    auto outputs_cpu = outputs.cpu();
    auto predictions_cpu = predictions.cpu();

    // Prepare the result
    for (int i = 0; i < predictions_cpu.size(0); ++i) {
        proba_labels.push_back({outputs_cpu[i].item<float>(), predictions_cpu[i].item<int>()});
    }

    return proba_labels;
}

'''

explainer = LimeTextExplainer(class_names=['0', '1'])
exp = explainer.explain_instance(text,
                                 partial(predict_for_lime, tokenizer=tokenizer, 
                                         combined_model=combined_model), 
                                 num_features=20, num_samples=200)
                                
display(HTML(exp.as_html()))