File size: 7,170 Bytes
a5fb347
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from keras.layers import Input, Dense, Flatten
from keras.models import Model
from Database import Database
import numpy as np, json
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from dotenv import dotenv_values
import pandas as pd
# from tensorflow.python.ops.confusion_matrix import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support

class Autoencoder:

    def __get_autoencoder(self, input_dim) -> Model:
        input_shape = (input_dim,)
        input_layer = Input(shape=input_shape)

        # Encoder layers
        encoder = Flatten()(input_layer)
        encoder = Dense(128, activation='relu')(encoder)
        encoder = Dense(64, activation='relu')(encoder)
        # encoder = Dense(32, activation='relu')(encoder)

        # Decoder layers
        # decoder = Dense(64, activation='relu')(encoder)
        decoder = Dense(128, activation='relu')(encoder)  #decoder
        decoder = Dense(input_dim, activation='sigmoid')(decoder)

        # Autoencoder model
        autoencoder = Model(inputs=input_layer, outputs=decoder)
        # autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
        autoencoder.compile(optimizer='adam', loss='mse')

        return autoencoder
    
    def __print_summary(self, model: Model):
        print(model.summary())
        return
    
    def __fit_autoencoder(self,epochs,batch_size,model: Model, train_var,valid_var=None):
        history =  model.fit(train_var,train_var,
                        #  validation_data=(valid_var,valid_var),
                         epochs=epochs,batch_size=batch_size)
        return history, model

    def __split_train_test_val(self, data):
        train_array, test_array = train_test_split(data,test_size=0.2,random_state=42)
        train_array, valid_array = train_test_split(train_array,test_size=0.1,random_state=42)
        return train_array, valid_array, test_array
    
    @staticmethod
    def __compute_metrics(conf_matrix):
        precision = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[0][1])

        if precision==1:
            print(conf_matrix)

        recall = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[1][0])
        f1 = (2 * precision * recall) / (precision + recall)
        # print("precision: " + str(precision) + ", recall: " + str(recall) + ", f1: " + str(f1))
        return precision, recall, f1

    def __find_optimal_modified(self,error_df: pd.DataFrame, steps=50):
        min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max()
        optimal_threshold = (min_error+max_error)/2
        y_pred = [0 if e > optimal_threshold else 1 for e in error_df.Reconstruction_error.values]
        precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro')

        return optimal_threshold, precision, recall, f1

    def __find_optimal(self,error_df: pd.DataFrame, steps=50):
        min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max()
        optimal_threshold = min_error
        max_f1 = 0
        max_pr = 0
        max_re = 0
        # step_value = (max_error-min_error)/(steps - 1)
        for threshold in np.arange(min_error, max_error, 0.005):
            # print("Threshold: " + str(threshold))
            # y_pred = [1 if e > threshold else 0 for e in error_df.Reconstruction_error.values]
            y_pred = [0 if e > threshold else 1 for e in error_df.Reconstruction_error.values]
            # conf_matrix = confusion_matrix(error_df.True_class, y_pred)
            # precision, recall, f1 = self.__compute_metrics(conf_matrix)
            # precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro')
            # precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='micro')
            # precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='weighted')
            precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='binary')

            if f1 > max_f1:
                max_f1 = f1
                optimal_threshold = threshold
                max_pr = precision
                max_re = recall
        print(f"Result optimal_threshold={optimal_threshold}, max_precision={max_pr}, max_recall={max_re}, max_f1={max_f1}")
        # return optimal_threshold, max_pr.numpy(), max_re.numpy(), max_f1.numpy()
        return optimal_threshold, max_pr, max_re, max_f1

    @staticmethod
    def __split_by_percent(data,percent):
        return train_test_split(data,test_size=0.3,random_state=42)



    def train_autoencoder(self):
        #GraphCodeBERT

        autoencoder = self.__get_autoencoder(768)
        self.__print_summary(autoencoder)

        #Create Dataset df
        df = pd.DataFrame(columns=['Embedding','True_class'])


        #DB
        db = Database(dotenv_values(".env")['COLLECTION_NAME'])
        # embeddings_list = [emb["embedding"] for emb in list(db.find_docs({"refactoring_type":"Extract Method"}))]
        pos_emb_list, neg_emb_list = [],[] 
        for doc in list(db.find_docs({"refactoring_type":"Extract Method"})):
            pos_emb_list.append(doc['embedding_pos'])
            neg_emb_list.append(doc['embedding_neg'])
        
        pos_emb_list_train, pos_emb_list_test = self.__split_by_percent(pos_emb_list,0.3)
        _, neg_emb_list_test = self.__split_by_percent(neg_emb_list,0.3)

        x_train = np.array(pos_emb_list_train)
        x_test = np.array(pos_emb_list_test+neg_emb_list_test)
        y_test = np.array([1 for i in range(0,len(pos_emb_list_test))]+[0 for i in range(0,len(neg_emb_list_test))])
        # print(np.array(pos_emb_list_train).shape)

        epoch = 25
        history, trained_model = self.__fit_autoencoder(epoch,32,autoencoder,x_train)
        trained_model.save('./results/autoencoder_'+str(epoch)+'.hdf5')

        #Test
        test_predict = trained_model.predict(x_test)

        mse = np.mean(np.power(x_test - test_predict, 2), axis=1)

        
        error_df = pd.DataFrame({'Reconstruction_error': mse,
                        'True_class': y_test})

        print("Max: ", error_df["Reconstruction_error"].max())
        print("Min: ", error_df["Reconstruction_error"].min())

        # optimal_threshold, precision, recall, f1 = self.__find_optimal(error_df,100)
        optimal_threshold, precision, recall, f1 = self.__find_optimal_modified(error_df,100)
        print(f"Result optimal_threshold={optimal_threshold}, max_precision={precision}, max_recall={recall}, max_f1={f1}")
        metrics = {
            "Threshold":optimal_threshold,
            "Precision": precision,
            "Recall":recall,
            "F1":f1
        }
        with open('./results/metrics.json','w') as fp:
            json.dump(metrics,fp)

        plt.plot(history.history['loss'])

        plt.savefig("./results/training_graph.png")

if __name__=="__main__":
    Autoencoder().train_autoencoder()