komodel / preprocessors /vctkfewsinger.py
RMSnow's picture
add backend inference and inferface output
0883aa1
# Copyright (c) 2023 Amphion.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import json
import pickle
import glob
from collections import defaultdict
from tqdm import tqdm
# Train: male 20 hours, female 10 hours
TRAIN_MALE_MAX_SECONDS = 20 * 3600
TRAIN_FEMALE_MAX_SECONDS = 10 * 3600
TEST_MAX_NUM_EVERY_PERSON = 5
def select_sample_idxs():
chosen_speakers = get_chosen_speakers()
with open(os.path.join(vctk_dir, "train.json"), "r") as f:
raw_train = json.load(f)
with open(os.path.join(vctk_dir, "test.json"), "r") as f:
raw_test = json.load(f)
train_idxs, test_idxs = [], []
# =========== Test ===========
test_nums = defaultdict(int)
for utt in tqdm(raw_train):
idx = utt["index"]
singer = utt["Singer"]
if singer in chosen_speakers and test_nums[singer] < TEST_MAX_NUM_EVERY_PERSON:
test_nums[singer] += 1
test_idxs.append("train_{}".format(idx))
for utt in tqdm(raw_test):
idx = utt["index"]
singer = utt["Singer"]
if singer in chosen_speakers and test_nums[singer] < TEST_MAX_NUM_EVERY_PERSON:
test_nums[singer] += 1
test_idxs.append("test_{}".format(idx))
# =========== Train ===========
for utt in tqdm(raw_train):
idx = utt["index"]
singer = utt["Singer"]
if singer in chosen_speakers and "train_{}".format(idx) not in test_idxs:
train_idxs.append("train_{}".format(idx))
for utt in tqdm(raw_test):
idx = utt["index"]
singer = utt["Singer"]
if singer in chosen_speakers and "test_{}".format(idx) not in test_idxs:
train_idxs.append("test_{}".format(idx))
train_idxs.sort()
test_idxs.sort()
return train_idxs, test_idxs, raw_train, raw_test
def statistics_of_speakers():
speaker2time = defaultdict(float)
sex2time = defaultdict(float)
with open(os.path.join(vctk_dir, "train.json"), "r") as f:
train = json.load(f)
with open(os.path.join(vctk_dir, "test.json"), "r") as f:
test = json.load(f)
for utt in train + test:
# minutes
speaker2time[utt["Singer"]] += utt["Duration"]
# hours
sex2time[utt["Singer"].split("_")[0]] += utt["Duration"]
print(
"Female: {:.2f} hours, Male: {:.2f} hours.\n".format(
sex2time["female"] / 3600, sex2time["male"] / 3600
)
)
speaker2time = sorted(speaker2time.items(), key=lambda x: x[-1], reverse=True)
for singer, seconds in speaker2time:
print("{}\t{:.2f} mins".format(singer, seconds / 60))
return speaker2time
def get_chosen_speakers():
speaker2time = statistics_of_speakers()
chosen_time = defaultdict(float)
chosen_speaker = defaultdict(list)
train_constrait = {
"male": TRAIN_MALE_MAX_SECONDS,
"female": TRAIN_FEMALE_MAX_SECONDS,
}
for speaker, seconds in speaker2time:
sex = speaker.split("_")[0]
if chosen_time[sex] < train_constrait[sex]:
chosen_time[sex] += seconds
chosen_speaker[sex].append(speaker)
speaker2time = dict(speaker2time)
chosen_speaker = chosen_speaker["male"] + chosen_speaker["female"]
print("\n#Chosen speakers = {}".format(len(chosen_speaker)))
for spk in chosen_speaker:
print("{}\t{:.2f} mins".format(spk, speaker2time[spk] / 60))
return chosen_speaker
if __name__ == "__main__":
root_path = ""
vctk_dir = os.path.join(root_path, "vctk")
fewspeaker_dir = os.path.join(root_path, "vctkfewspeaker")
os.makedirs(fewspeaker_dir, exist_ok=True)
train_idxs, test_idxs, raw_train, raw_test = select_sample_idxs()
print("#Train = {}, #Test = {}".format(len(train_idxs), len(test_idxs)))
# There are no data leakage
assert len(set(train_idxs).intersection(set(test_idxs))) == 0
for idx in train_idxs + test_idxs:
# No test data of raw vctk
assert "test_" not in idx
for split, chosen_idxs in zip(["train", "test"], [train_idxs, test_idxs]):
print("{}: #chosen idx = {}\n".format(split, len(chosen_idxs)))
# Select features
feat_files = glob.glob("**/train.pkl", root_dir=vctk_dir, recursive=True)
for file in tqdm(feat_files):
raw_file = os.path.join(vctk_dir, file)
new_file = os.path.join(
fewspeaker_dir, file.replace("train.pkl", "{}.pkl".format(split))
)
new_dir = "/".join(new_file.split("/")[:-1])
os.makedirs(new_dir, exist_ok=True)
if "mel_min" in file or "mel_max" in file:
os.system("cp {} {}".format(raw_file, new_file))
continue
with open(raw_file, "rb") as f:
raw_feats = pickle.load(f)
print("file: {}, #raw_feats = {}".format(file, len(raw_feats)))
new_feats = []
for idx in chosen_idxs:
chosen_split_is_train, raw_idx = idx.split("_")
assert chosen_split_is_train == "train"
new_feats.append(raw_feats[int(raw_idx)])
with open(new_file, "wb") as f:
pickle.dump(new_feats, f)
print("New file: {}, #new_feats = {}".format(new_file, len(new_feats)))
# Utterance re-index
news_utts = [raw_train[int(idx.split("_")[-1])] for idx in chosen_idxs]
for i, utt in enumerate(news_utts):
utt["Dataset"] = "vctkfewsinger"
utt["index"] = i
with open(os.path.join(fewspeaker_dir, "{}.json".format(split)), "w") as f:
json.dump(news_utts, f, indent=4)