In [3]:
import pandas as pd
import numpy as np
import re
import random
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer, T5ForConditionalGeneration, DataCollatorForLanguageModeling
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import os
import matplotlib.pyplot as plt


# Dataset class for pre-training
class PythonCodeDataset(Dataset):
 def __init__(self, tokenizer, dataframe, max_len=512):
 self.tokenizer = tokenizer
 self.data = dataframe
 self.max_len = max_len

 def __len__(self):
 return len(self.data)

 def __getitem__(self, index):
 code = self.data.iloc[index]['source']
 inputs = self.tokenizer.encode_plus(code, None, add_special_tokens=True, max_length=self.max_len, padding='max_length', truncation=True)
 return {'input_ids': torch.tensor(inputs['input_ids'], dtype=torch.long), 'attention_mask': torch.tensor(inputs['attention_mask'], dtype=torch.long)}

# Function to mask if conditions
def mask_if_condition(code_snippet):
 if_conditions = re.findall(r'(if\s+.*?:)', code_snippet)
 masked_snippet = code_snippet.replace(if_conditions[0], '', 1) if if_conditions else code_snippet
 return masked_snippet, if_conditions[0] if if_conditions else None

# Fine-tuning and evaluation dataset classes
class MaskedIfDataset(PythonCodeDataset):
 def __getitem__(self, index):
 masked_code = self.data.iloc[index]['masked_code']
 ground_truth = self.data.iloc[index]['ground_truth']
 inputs = self.tokenizer(masked_code, max_length=self.max_len, padding='max_length', truncation=True, return_tensors="pt")
 labels = self.tokenizer(ground_truth, max_length=self.max_len, padding='max_length', truncation=True, return_tensors="pt").input_ids
 labels[labels == self.tokenizer.pad_token_id] = -100
 return {'input_ids': inputs.input_ids.squeeze(), 'attention_mask': inputs.attention_mask.squeeze(), 'labels': labels.squeeze()}

# Define the pre-training loop
def pretrain(model, dataloader, epochs, print_every=10):
 model.train()
 optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
 global_step = 0 # Initialize a counter for the global training step

 for epoch in range(epochs):
 for batch in dataloader:
 batch = {k: v.to(device) for k, v in batch.items()}
 optimizer.zero_grad()
 inputs = {'input_ids': batch['input_ids'], 'attention_mask': batch['attention_mask']}
 outputs = model(**inputs, labels=batch['input_ids'])
 loss = outputs.loss
 loss.backward()
 optimizer.step()

 if global_step % print_every == 0: # Print every steps
 print(f"Step {global_step}, Loss: {loss.item()}")

 global_step += 1 # Increment the step counter

 print(f"Epoch {epoch+1}/{epochs} completed.")
 

def fine_tune_with_eval(model, train_loader, eval_loader, epochs, save_path, print_every=10, early_stopping_patience=3):
 optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
 best_epoch = 0
 best_eval_accuracy = 0
 patience_counter = 0
 train_losses, eval_accuracies = [], []

 for epoch in range(epochs):
 model.train()
 total_loss = 0

 # Training loop with tqdm for progress tracking
 for batch in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{epochs}"):
 batch = {k: v.to(device) for k, v in batch.items()}
 optimizer.zero_grad()
 inputs = {'input_ids': batch['input_ids'], 'attention_mask': batch['attention_mask'], 'labels': batch['labels']}
 outputs = model(**inputs)
 loss = outputs.loss
 total_loss += loss.item()
 loss.backward()
 optimizer.step()

 average_loss = total_loss / len(train_loader)
 train_losses.append(average_loss)

 # Evaluate on the evaluation set
 eval_accuracy = evaluate_accuracy(model, eval_loader, tokenizer, device)
 eval_accuracies.append(eval_accuracy)
 print(f"Epoch {epoch+1}/{epochs}, Train Loss: {average_loss:.4f}, Eval Accuracy: {eval_accuracy:.4f}")

 # Early stopping and checkpointing
 if eval_accuracy > best_eval_accuracy:
 best_eval_accuracy = eval_accuracy
 best_epoch = epoch
 patience_counter = 0
 else:
 patience_counter += 1
 if patience_counter >= early_stopping_patience:
 print("Early stopping triggered.")
 break
 
 save_directory = f"{save_path}/{epoch}"
 model.save_pretrained(save_directory)
 
 # Plotting the training loss and evaluation accuracy
 plt.figure(figsize=(12, 5))
 plt.subplot(1, 2, 1)
 plt.plot(train_losses, label='Training Loss')
 plt.title('Training Loss')
 plt.xlabel('Epoch')
 plt.ylabel('Loss')
 plt.legend()

 plt.subplot(1, 2, 2)
 plt.plot(eval_accuracies, label='Evaluation Accuracy')
 plt.title('Evaluation Accuracy')
 plt.xlabel('Epoch')
 plt.ylabel('Accuracy')
 plt.legend()

 plt.savefig(f"{save_path}/training_metrics.png")
 
 return best_epoch


def evaluate_accuracy(model, dataloader, tokenizer, device):
 model.eval()
 correct_predictions, total_predictions = 0, 0

 for batch in tqdm(dataloader, desc="Evaluating"):
 batch = {k: v.to(device) for k, v in batch.items()}
 with torch.no_grad():
 outputs = model.generate(batch['input_ids'], attention_mask=batch['attention_mask'], max_length=512)
 decoded_outputs = [tokenizer.decode(output, skip_special_tokens=True) for output in outputs]

 # Decode labels with added check for None values
 decoded_labels = []
 for label in batch['labels']:
 label_trimmed = [l for l in label.tolist() if l != tokenizer.pad_token_id and l != -100]
 if label_trimmed:
 decoded_label = tokenizer.decode(label_trimmed, skip_special_tokens=True)
 decoded_labels.append(decoded_label)
 else:
 decoded_labels.append(None) # Append None for invalid/empty labels

 # Calculate accuracy
 for output, label in zip(decoded_outputs, decoded_labels):
 if label is not None and output.strip() == label.strip():
 correct_predictions += 1
 if label is not None:
 total_predictions += 1

 return correct_predictions / total_predictions if total_predictions > 0 else 0
 
 
# Read the dataset
df = pd.read_parquet('../if-statements/dataset/extracted/functions.pq')
#df = df.head(50)

# Split the dataset into pre-training, fine-tuning, evaluation, and test sets
pretrain_df, fine_tune_df = train_test_split(df, test_size=0.5, random_state=42)
eval_df = fine_tune_df.sample(frac=0.1, random_state=42)
test_df = fine_tune_df.drop(eval_df.index).sample(frac=0.1111, random_state=42)
fine_tune_df = fine_tune_df.drop(eval_df.index).drop(test_df.index)

assert len(set(eval_df.index).intersection(set(test_df.index))) == 0


# Initialize tokenizer and model
tokenizer = RobertaTokenizer.from_pretrained('Salesforce/codet5-small')
model = T5ForConditionalGeneration.from_pretrained('Salesforce/codet5-small')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
model.to(device)
 
# Instantiate the dataset for pre-training
pretrain_dataset = PythonCodeDataset(tokenizer, pretrain_df)

# Set up the data collator for MLM
data_collator = DataCollatorForLanguageModeling(
 tokenizer=tokenizer,
 mlm=True,
 mlm_probability=0.15
)

# Create a DataLoader for pre-training
pretrain_loader = DataLoader(pretrain_dataset, batch_size=8, shuffle=True, collate_fn=data_collator)

# Pre-train the model
#pretrain(model, pretrain_loader, epochs=1)


# Prepare data for fine-tuning and evaluation
fine_tune_df['masked_code'], fine_tune_df['ground_truth'] = zip(*fine_tune_df['source'].apply(mask_if_condition))
eval_df['masked_code'], eval_df['ground_truth'] = zip(*eval_df['source'].apply(mask_if_condition))
fine_tune_df.dropna(subset=['ground_truth'], inplace=True)
eval_df.dropna(subset=['ground_truth'], inplace=True)


fine_tune_dataset = MaskedIfDataset(tokenizer, fine_tune_df)
eval_dataset = MaskedIfDataset(tokenizer, eval_df)


# Dataloaders for fine-tuning and evaluation
fine_tune_loader = DataLoader(fine_tune_dataset, batch_size=8, shuffle=True)
eval_loader = DataLoader(eval_dataset, batch_size=8, shuffle=False)


# Instantiate the datasets for fine-tuning and evaluation
fine_tune_dataset = MaskedIfDataset(tokenizer, fine_tune_df)
eval_dataset = MaskedIfDataset(tokenizer, eval_df)


best_epoch = 4

# Example of calling the modified function
save_path = '../if-statements/dataset/extracted/final'
#best_epoch = fine_tune_with_eval(model, fine_tune_loader, eval_loader, epochs=5, save_path=save_path)

# Define the directory of the best model
best_model_directory = os.path.join(save_path, str(best_epoch))

# Load the best model and its config
best_model = T5ForConditionalGeneration.from_pretrained(best_model_directory)

# Optionally, load the model's config
model_config = best_model.config # This will load the config file associated with the model

best_model.to(device)

# Prepare and evaluate on the test set
test_df['masked_code'], test_df['ground_truth'] = zip(*test_df['source'].apply(mask_if_condition))
test_df.dropna(subset=['ground_truth'], inplace=True)
test_dataset = MaskedIfDataset(tokenizer, test_df)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Evaluate the model on the test set
test_accuracy = evaluate_accuracy(best_model, test_loader, tokenizer, device)
print(f"Test Accuracy: {test_accuracy:.4f}")

Using device: cuda


Evaluating: 100%|███████████████████████████| 1092/1092 [04:02<00:00, 4.50it/s]

Test Accuracy: 0.3642





In [15]:
# Load the new dataset
new_df = pd.read_csv('../if-statements/dataset/extracted/test_set_usi.csv')

new_df.drop("input_method", axis=1, inplace=True)
new_df.drop("tokens_in_method", axis=1, inplace=True)

print(new_df.head())


 Unnamed: 0 original_method \
0 5126 def stream_edit(request, stream_id, response_f... 
1 10859 def _read_and_parse_includes(self):\n # Map... 
2 10615 def _get_list_key(self, spaces, lines):\n k... 
3 17853 def search_host(self, search_string):\n res... 
4 3922 def pop(self, key: Union[str, Enum], default: ... 

 target_block 
0 if "cancel" not in request . POST : 
1 if isinstance ( node , ast . Include ) : 
2 if len ( line . strip ( ) ) == 0 : 
3 if isinstance ( value , int ) : 
4 if self . _get_flag ( "struct" ) : 


In [16]:
# Function to preprocess the new dataframe
def preprocess_new_df(df):
 # Apply the masking function
 df['masked_code'], df['ground_truth'] = zip(*df['original_method'].apply(mask_if_condition))
 # Drop rows where ground truth (if statement) is None
 df.dropna(subset=['ground_truth'], inplace=True)

# Preprocess the new dataframe
preprocess_new_df(new_df)

# Check the first few rows
print(new_df.head())


 Unnamed: 0 original_method \
0 5126 def stream_edit(request, stream_id, response_f... 
1 10859 def _read_and_parse_includes(self):\n # Map... 
2 10615 def _get_list_key(self, spaces, lines):\n k... 
3 17853 def search_host(self, search_string):\n res... 
4 3922 def pop(self, key: Union[str, Enum], default: ... 

 target_block \
0 if "cancel" not in request . POST : 
1 if isinstance ( node , ast . Include ) : 
2 if len ( line . strip ( ) ) == 0 : 
3 if isinstance ( value , int ) : 
4 if self . _get_flag ( "struct" ) : 

 masked_code \
0 def stream_edit(request, stream_id, response_f... 
1 def _read_and_parse_includes(self):\n # Map... 
2 def _get_list_key(self, spaces, lines):\n k... 
3 def search_host(self, search_string):\n res... 
4 def pop(self, key: Union[str, Enum], default: ... 

 ground_truth 
0 if not request.user.profile.has_permission(str... 
1 if isinstance(node, ast.Include): 
2 if len(line.strip()) == 0: 
3 if host_entry.get("type") != "entry": 
4 if self._get_flag("reado

In [18]:
# Create dataset for the new dataframe
new_dataset = MaskedIfDataset(tokenizer, new_df)

# Create DataLoader for the new dataset
new_loader = DataLoader(new_dataset, batch_size=8, shuffle=False)

# Evaluate the model on the new dataset
new_accuracy = evaluate_accuracy(best_model, new_loader, tokenizer, device)
print(f"New Dataset Accuracy: {new_accuracy:.4f}")


Evaluating: 100%|█████████████████████████████| 624/624 [02:29<00:00, 4.17it/s]

New Dataset Accuracy: 0.2841



