jRefactoring / autoencoder.py
gautam-shetty's picture
Initial commit
a5fb347
raw
history blame
7.17 kB
from keras.layers import Input, Dense, Flatten
from keras.models import Model
from Database import Database
import numpy as np, json
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from dotenv import dotenv_values
import pandas as pd
# from tensorflow.python.ops.confusion_matrix import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
class Autoencoder:
def __get_autoencoder(self, input_dim) -> Model:
input_shape = (input_dim,)
input_layer = Input(shape=input_shape)
# Encoder layers
encoder = Flatten()(input_layer)
encoder = Dense(128, activation='relu')(encoder)
encoder = Dense(64, activation='relu')(encoder)
# encoder = Dense(32, activation='relu')(encoder)
# Decoder layers
# decoder = Dense(64, activation='relu')(encoder)
decoder = Dense(128, activation='relu')(encoder) #decoder
decoder = Dense(input_dim, activation='sigmoid')(decoder)
# Autoencoder model
autoencoder = Model(inputs=input_layer, outputs=decoder)
# autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
autoencoder.compile(optimizer='adam', loss='mse')
return autoencoder
def __print_summary(self, model: Model):
print(model.summary())
return
def __fit_autoencoder(self,epochs,batch_size,model: Model, train_var,valid_var=None):
history = model.fit(train_var,train_var,
# validation_data=(valid_var,valid_var),
epochs=epochs,batch_size=batch_size)
return history, model
def __split_train_test_val(self, data):
train_array, test_array = train_test_split(data,test_size=0.2,random_state=42)
train_array, valid_array = train_test_split(train_array,test_size=0.1,random_state=42)
return train_array, valid_array, test_array
@staticmethod
def __compute_metrics(conf_matrix):
precision = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[0][1])
if precision==1:
print(conf_matrix)
recall = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[1][0])
f1 = (2 * precision * recall) / (precision + recall)
# print("precision: " + str(precision) + ", recall: " + str(recall) + ", f1: " + str(f1))
return precision, recall, f1
def __find_optimal_modified(self,error_df: pd.DataFrame, steps=50):
min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max()
optimal_threshold = (min_error+max_error)/2
y_pred = [0 if e > optimal_threshold else 1 for e in error_df.Reconstruction_error.values]
precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro')
return optimal_threshold, precision, recall, f1
def __find_optimal(self,error_df: pd.DataFrame, steps=50):
min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max()
optimal_threshold = min_error
max_f1 = 0
max_pr = 0
max_re = 0
# step_value = (max_error-min_error)/(steps - 1)
for threshold in np.arange(min_error, max_error, 0.005):
# print("Threshold: " + str(threshold))
# y_pred = [1 if e > threshold else 0 for e in error_df.Reconstruction_error.values]
y_pred = [0 if e > threshold else 1 for e in error_df.Reconstruction_error.values]
# conf_matrix = confusion_matrix(error_df.True_class, y_pred)
# precision, recall, f1 = self.__compute_metrics(conf_matrix)
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro')
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='micro')
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='weighted')
precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='binary')
if f1 > max_f1:
max_f1 = f1
optimal_threshold = threshold
max_pr = precision
max_re = recall
print(f"Result optimal_threshold={optimal_threshold}, max_precision={max_pr}, max_recall={max_re}, max_f1={max_f1}")
# return optimal_threshold, max_pr.numpy(), max_re.numpy(), max_f1.numpy()
return optimal_threshold, max_pr, max_re, max_f1
@staticmethod
def __split_by_percent(data,percent):
return train_test_split(data,test_size=0.3,random_state=42)
def train_autoencoder(self):
#GraphCodeBERT
autoencoder = self.__get_autoencoder(768)
self.__print_summary(autoencoder)
#Create Dataset df
df = pd.DataFrame(columns=['Embedding','True_class'])
#DB
db = Database(dotenv_values(".env")['COLLECTION_NAME'])
# embeddings_list = [emb["embedding"] for emb in list(db.find_docs({"refactoring_type":"Extract Method"}))]
pos_emb_list, neg_emb_list = [],[]
for doc in list(db.find_docs({"refactoring_type":"Extract Method"})):
pos_emb_list.append(doc['embedding_pos'])
neg_emb_list.append(doc['embedding_neg'])
pos_emb_list_train, pos_emb_list_test = self.__split_by_percent(pos_emb_list,0.3)
_, neg_emb_list_test = self.__split_by_percent(neg_emb_list,0.3)
x_train = np.array(pos_emb_list_train)
x_test = np.array(pos_emb_list_test+neg_emb_list_test)
y_test = np.array([1 for i in range(0,len(pos_emb_list_test))]+[0 for i in range(0,len(neg_emb_list_test))])
# print(np.array(pos_emb_list_train).shape)
epoch = 25
history, trained_model = self.__fit_autoencoder(epoch,32,autoencoder,x_train)
trained_model.save('./results/autoencoder_'+str(epoch)+'.hdf5')
#Test
test_predict = trained_model.predict(x_test)
mse = np.mean(np.power(x_test - test_predict, 2), axis=1)
error_df = pd.DataFrame({'Reconstruction_error': mse,
'True_class': y_test})
print("Max: ", error_df["Reconstruction_error"].max())
print("Min: ", error_df["Reconstruction_error"].min())
# optimal_threshold, precision, recall, f1 = self.__find_optimal(error_df,100)
optimal_threshold, precision, recall, f1 = self.__find_optimal_modified(error_df,100)
print(f"Result optimal_threshold={optimal_threshold}, max_precision={precision}, max_recall={recall}, max_f1={f1}")
metrics = {
"Threshold":optimal_threshold,
"Precision": precision,
"Recall":recall,
"F1":f1
}
with open('./results/metrics.json','w') as fp:
json.dump(metrics,fp)
plt.plot(history.history['loss'])
plt.savefig("./results/training_graph.png")
if __name__=="__main__":
Autoencoder().train_autoencoder()