Spaces:
Runtime error
Runtime error
File size: 7,170 Bytes
a5fb347 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
from keras.layers import Input, Dense, Flatten
from keras.models import Model
from Database import Database
import numpy as np, json
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from dotenv import dotenv_values
import pandas as pd
# from tensorflow.python.ops.confusion_matrix import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
class Autoencoder:
def __get_autoencoder(self, input_dim) -> Model:
input_shape = (input_dim,)
input_layer = Input(shape=input_shape)
# Encoder layers
encoder = Flatten()(input_layer)
encoder = Dense(128, activation='relu')(encoder)
encoder = Dense(64, activation='relu')(encoder)
# encoder = Dense(32, activation='relu')(encoder)
# Decoder layers
# decoder = Dense(64, activation='relu')(encoder)
decoder = Dense(128, activation='relu')(encoder) #decoder
decoder = Dense(input_dim, activation='sigmoid')(decoder)
# Autoencoder model
autoencoder = Model(inputs=input_layer, outputs=decoder)
# autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
autoencoder.compile(optimizer='adam', loss='mse')
return autoencoder
def __print_summary(self, model: Model):
print(model.summary())
return
def __fit_autoencoder(self,epochs,batch_size,model: Model, train_var,valid_var=None):
history = model.fit(train_var,train_var,
# validation_data=(valid_var,valid_var),
epochs=epochs,batch_size=batch_size)
return history, model
def __split_train_test_val(self, data):
train_array, test_array = train_test_split(data,test_size=0.2,random_state=42)
train_array, valid_array = train_test_split(train_array,test_size=0.1,random_state=42)
return train_array, valid_array, test_array
@staticmethod
def __compute_metrics(conf_matrix):
precision = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[0][1])
if precision==1:
print(conf_matrix)
recall = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[1][0])
f1 = (2 * precision * recall) / (precision + recall)
# print("precision: " + str(precision) + ", recall: " + str(recall) + ", f1: " + str(f1))
return precision, recall, f1
def __find_optimal_modified(self,error_df: pd.DataFrame, steps=50):
min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max()
optimal_threshold = (min_error+max_error)/2
y_pred = [0 if e > optimal_threshold else 1 for e in error_df.Reconstruction_error.values]
precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro')
return optimal_threshold, precision, recall, f1
def __find_optimal(self,error_df: pd.DataFrame, steps=50):
min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max()
optimal_threshold = min_error
max_f1 = 0
max_pr = 0
max_re = 0
# step_value = (max_error-min_error)/(steps - 1)
for threshold in np.arange(min_error, max_error, 0.005):
# print("Threshold: " + str(threshold))
# y_pred = [1 if e > threshold else 0 for e in error_df.Reconstruction_error.values]
y_pred = [0 if e > threshold else 1 for e in error_df.Reconstruction_error.values]
# conf_matrix = confusion_matrix(error_df.True_class, y_pred)
# precision, recall, f1 = self.__compute_metrics(conf_matrix)
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro')
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='micro')
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='weighted')
precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='binary')
if f1 > max_f1:
max_f1 = f1
optimal_threshold = threshold
max_pr = precision
max_re = recall
print(f"Result optimal_threshold={optimal_threshold}, max_precision={max_pr}, max_recall={max_re}, max_f1={max_f1}")
# return optimal_threshold, max_pr.numpy(), max_re.numpy(), max_f1.numpy()
return optimal_threshold, max_pr, max_re, max_f1
@staticmethod
def __split_by_percent(data,percent):
return train_test_split(data,test_size=0.3,random_state=42)
def train_autoencoder(self):
#GraphCodeBERT
autoencoder = self.__get_autoencoder(768)
self.__print_summary(autoencoder)
#Create Dataset df
df = pd.DataFrame(columns=['Embedding','True_class'])
#DB
db = Database(dotenv_values(".env")['COLLECTION_NAME'])
# embeddings_list = [emb["embedding"] for emb in list(db.find_docs({"refactoring_type":"Extract Method"}))]
pos_emb_list, neg_emb_list = [],[]
for doc in list(db.find_docs({"refactoring_type":"Extract Method"})):
pos_emb_list.append(doc['embedding_pos'])
neg_emb_list.append(doc['embedding_neg'])
pos_emb_list_train, pos_emb_list_test = self.__split_by_percent(pos_emb_list,0.3)
_, neg_emb_list_test = self.__split_by_percent(neg_emb_list,0.3)
x_train = np.array(pos_emb_list_train)
x_test = np.array(pos_emb_list_test+neg_emb_list_test)
y_test = np.array([1 for i in range(0,len(pos_emb_list_test))]+[0 for i in range(0,len(neg_emb_list_test))])
# print(np.array(pos_emb_list_train).shape)
epoch = 25
history, trained_model = self.__fit_autoencoder(epoch,32,autoencoder,x_train)
trained_model.save('./results/autoencoder_'+str(epoch)+'.hdf5')
#Test
test_predict = trained_model.predict(x_test)
mse = np.mean(np.power(x_test - test_predict, 2), axis=1)
error_df = pd.DataFrame({'Reconstruction_error': mse,
'True_class': y_test})
print("Max: ", error_df["Reconstruction_error"].max())
print("Min: ", error_df["Reconstruction_error"].min())
# optimal_threshold, precision, recall, f1 = self.__find_optimal(error_df,100)
optimal_threshold, precision, recall, f1 = self.__find_optimal_modified(error_df,100)
print(f"Result optimal_threshold={optimal_threshold}, max_precision={precision}, max_recall={recall}, max_f1={f1}")
metrics = {
"Threshold":optimal_threshold,
"Precision": precision,
"Recall":recall,
"F1":f1
}
with open('./results/metrics.json','w') as fp:
json.dump(metrics,fp)
plt.plot(history.history['loss'])
plt.savefig("./results/training_graph.png")
if __name__=="__main__":
Autoencoder().train_autoencoder() |