Spaces:
Sleeping
Sleeping
import glob | |
import json | |
import os | |
import os | |
import torch | |
from tqdm import tqdm | |
import random | |
def open_content(path): | |
paths = glob.glob(os.path.join(path, "*.json")) | |
train, dev, test, labels = None, None, None, None | |
for p in paths: | |
if "train" in p: | |
with open(p, "r") as f: | |
train = json.load(f) | |
elif "dev" in p: | |
with open(p, "r") as f: | |
dev = json.load(f) | |
elif "test" in p: | |
with open(p, "r") as f: | |
test = json.load(f) | |
elif "labels" in p: | |
with open(p, "r") as f: | |
labels = json.load(f) | |
return train, dev, test, labels | |
def process(data): | |
words = data['sentence'].split() | |
entities = [] # List of entities (start, end, type) | |
for entity in data['entities']: | |
start_char, end_char = entity['pos'] | |
# Initialize variables to keep track of word positions | |
start_word = None | |
end_word = None | |
# Iterate through words and find the word positions | |
char_count = 0 | |
for i, word in enumerate(words): | |
word_length = len(word) | |
if char_count == start_char: | |
start_word = i | |
if char_count + word_length == end_char: | |
end_word = i | |
break | |
char_count += word_length + 1 # Add 1 for the space | |
# Append the word positions to the list | |
entities.append((start_word, end_word, entity['type'])) | |
# Create a list of word positions for each entity | |
sample = { | |
"tokenized_text": words, | |
"ner": entities | |
} | |
return sample | |
# create dataset | |
def create_dataset(path): | |
train, dev, test, labels = open_content(path) | |
train_dataset = [] | |
dev_dataset = [] | |
test_dataset = [] | |
for data in train: | |
train_dataset.append(process(data)) | |
for data in dev: | |
dev_dataset.append(process(data)) | |
for data in test: | |
test_dataset.append(process(data)) | |
return train_dataset, dev_dataset, test_dataset, labels | |
def get_for_one_path(path, model): | |
# load the dataset | |
_, _, test_dataset, entity_types = create_dataset(path) | |
data_name = path.split("/")[-1] # get the name of the dataset | |
# check if the dataset is flat_ner | |
flat_ner = True | |
if any([i in data_name for i in ["ACE", "GENIA", "Corpus"]]): | |
flat_ner = False | |
# evaluate the model | |
results, f1 = model.evaluate(test_dataset, flat_ner=flat_ner, threshold=0.5, batch_size=12, | |
entity_types=entity_types) | |
return data_name, results, f1 | |
def get_for_all_path(model, steps, log_dir, data_paths): | |
all_paths = glob.glob(f"{data_paths}/*") | |
all_paths = sorted(all_paths) | |
# move the model to the device | |
device = next(model.parameters()).device | |
model.to(device) | |
# set the model to eval mode | |
model.eval() | |
# log the results | |
save_path = os.path.join(log_dir, "results.txt") | |
with open(save_path, "a") as f: | |
f.write("##############################################\n") | |
# write step | |
f.write("step: " + str(steps) + "\n") | |
zero_shot_benc = ["mit-movie", "mit-restaurant", "CrossNER_AI", "CrossNER_literature", "CrossNER_music", | |
"CrossNER_politics", "CrossNER_science"] | |
zero_shot_benc_results = {} | |
all_results = {} # without crossNER | |
for p in tqdm(all_paths): | |
if "sample_" not in p: | |
data_name, results, f1 = get_for_one_path(p, model) | |
# write to file | |
with open(save_path, "a") as f: | |
f.write(data_name + "\n") | |
f.write(str(results) + "\n") | |
if data_name in zero_shot_benc: | |
zero_shot_benc_results[data_name] = f1 | |
else: | |
all_results[data_name] = f1 | |
avg_all = sum(all_results.values()) / len(all_results) | |
avg_zs = sum(zero_shot_benc_results.values()) / len(zero_shot_benc_results) | |
save_path_table = os.path.join(log_dir, "tables.txt") | |
# results for all datasets except crossNER | |
table_bench_all = "" | |
for k, v in all_results.items(): | |
table_bench_all += f"{k:20}: {v:.1%}\n" | |
# (20 size aswell for average i.e. :20) | |
table_bench_all += f"{'Average':20}: {avg_all:.1%}" | |
# results for zero-shot benchmark | |
table_bench_zeroshot = "" | |
for k, v in zero_shot_benc_results.items(): | |
table_bench_zeroshot += f"{k:20}: {v:.1%}\n" | |
table_bench_zeroshot += f"{'Average':20}: {avg_zs:.1%}" | |
# write to file | |
with open(save_path_table, "a") as f: | |
f.write("##############################################\n") | |
f.write("step: " + str(steps) + "\n") | |
f.write("Table for all datasets except crossNER\n") | |
f.write(table_bench_all + "\n\n") | |
f.write("Table for zero-shot benchmark\n") | |
f.write(table_bench_zeroshot + "\n") | |
f.write("##############################################\n\n") | |
def sample_train_data(data_paths, sample_size=10000): | |
all_paths = glob.glob(f"{data_paths}/*") | |
all_paths = sorted(all_paths) | |
# to exclude the zero-shot benchmark datasets | |
zero_shot_benc = ["CrossNER_AI", "CrossNER_literature", "CrossNER_music", | |
"CrossNER_politics", "CrossNER_science", "ACE 2004"] | |
new_train = [] | |
# take 10k samples from each dataset | |
for p in tqdm(all_paths): | |
if any([i in p for i in zero_shot_benc]): | |
continue | |
train, dev, test, labels = create_dataset(p) | |
# add label key to the train data | |
for i in range(len(train)): | |
train[i]["label"] = labels | |
random.shuffle(train) | |
train = train[:sample_size] | |
new_train.extend(train) | |
return new_train | |