|
import torch |
|
from torch.utils import data |
|
import numpy as np |
|
from os.path import join as pjoin |
|
import random |
|
import codecs as cs |
|
from tqdm.auto import tqdm |
|
from utils.word_vectorizer import WordVectorizer, POS_enumerator |
|
from utils.motion_process import recover_from_ric |
|
|
|
|
|
class Text2MotionDataset(data.Dataset): |
|
""" |
|
Dataset for Text2Motion generation task. |
|
""" |
|
|
|
data_root = "" |
|
min_motion_len = 40 |
|
joints_num = None |
|
dim_pose = None |
|
max_motion_length = 196 |
|
|
|
def __init__(self, opt, split, mode="train", accelerator=None): |
|
self.max_text_len = getattr(opt, "max_text_len", 20) |
|
self.unit_length = getattr(opt, "unit_length", 4) |
|
self.mode = mode |
|
motion_dir = pjoin(self.data_root, "new_joint_vecs") |
|
text_dir = pjoin(self.data_root, "texts") |
|
|
|
if mode not in ["train", "eval", "gt_eval", "xyz_gt", "hml_gt"]: |
|
raise ValueError( |
|
f"Mode '{mode}' is not supported. Please use one of: 'train', 'eval', 'gt_eval', 'xyz_gt','hml_gt'." |
|
) |
|
|
|
mean, std = None, None |
|
if mode == "gt_eval": |
|
print(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy")) |
|
|
|
mean = np.load(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_mean.npy")) |
|
std = np.load(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy")) |
|
elif mode in ["eval"]: |
|
print(pjoin(opt.meta_dir, "std.npy")) |
|
|
|
mean = np.load(pjoin(opt.meta_dir, "mean.npy")) |
|
std = np.load(pjoin(opt.meta_dir, "std.npy")) |
|
else: |
|
|
|
mean = np.load(pjoin(self.data_root, "Mean.npy")) |
|
std = np.load(pjoin(self.data_root, "Std.npy")) |
|
|
|
if mode == "eval": |
|
|
|
|
|
self.mean_for_eval = np.load( |
|
pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_mean.npy") |
|
) |
|
self.std_for_eval = np.load( |
|
pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy") |
|
) |
|
if mode in ["gt_eval", "eval"]: |
|
self.w_vectorizer = WordVectorizer(opt.glove_dir, "our_vab") |
|
|
|
data_dict = {} |
|
id_list = [] |
|
split_file = pjoin(self.data_root, f"{split}.txt") |
|
with cs.open(split_file, "r") as f: |
|
for line in f.readlines(): |
|
id_list.append(line.strip()) |
|
|
|
if opt.debug == True: |
|
id_list = id_list[:1000] |
|
|
|
new_name_list = [] |
|
length_list = [] |
|
for name in tqdm( |
|
id_list, |
|
disable=( |
|
not accelerator.is_local_main_process |
|
if accelerator is not None |
|
else False |
|
), |
|
): |
|
motion = np.load(pjoin(motion_dir, name + ".npy")) |
|
if (len(motion)) < self.min_motion_len or (len(motion) >= 200): |
|
continue |
|
text_data = [] |
|
flag = False |
|
with cs.open(pjoin(text_dir, name + ".txt")) as f: |
|
for line in f.readlines(): |
|
text_dict = {} |
|
line_split = line.strip().split("#") |
|
caption = line_split[0] |
|
try: |
|
tokens = line_split[1].split(" ") |
|
f_tag = float(line_split[2]) |
|
to_tag = float(line_split[3]) |
|
f_tag = 0.0 if np.isnan(f_tag) else f_tag |
|
to_tag = 0.0 if np.isnan(to_tag) else to_tag |
|
except: |
|
tokens = ["a/NUM", "a/NUM"] |
|
f_tag = 0.0 |
|
to_tag = 8.0 |
|
text_dict["caption"] = caption |
|
text_dict["tokens"] = tokens |
|
if f_tag == 0.0 and to_tag == 0.0: |
|
flag = True |
|
text_data.append(text_dict) |
|
else: |
|
n_motion = motion[int(f_tag * 20) : int(to_tag * 20)] |
|
if (len(n_motion)) < self.min_motion_len or ( |
|
len(n_motion) >= 200 |
|
): |
|
continue |
|
new_name = random.choice("ABCDEFGHIJKLMNOPQRSTUVW") + "_" + name |
|
while new_name in data_dict: |
|
new_name = ( |
|
random.choice("ABCDEFGHIJKLMNOPQRSTUVW") + "_" + name |
|
) |
|
data_dict[new_name] = { |
|
"motion": n_motion, |
|
"length": len(n_motion), |
|
"text": [text_dict], |
|
} |
|
new_name_list.append(new_name) |
|
length_list.append(len(n_motion)) |
|
if flag: |
|
data_dict[name] = { |
|
"motion": motion, |
|
"length": len(motion), |
|
"text": text_data, |
|
} |
|
new_name_list.append(name) |
|
length_list.append(len(motion)) |
|
|
|
name_list, length_list = zip( |
|
*sorted(zip(new_name_list, length_list), key=lambda x: x[1]) |
|
) |
|
|
|
if mode == "train": |
|
if opt.dataset_name != "amass": |
|
joints_num = self.joints_num |
|
|
|
std[0:1] = std[0:1] / opt.feat_bias |
|
|
|
std[1:3] = std[1:3] / opt.feat_bias |
|
|
|
std[3:4] = std[3:4] / opt.feat_bias |
|
|
|
std[4 : 4 + (joints_num - 1) * 3] = ( |
|
std[4 : 4 + (joints_num - 1) * 3] / 1.0 |
|
) |
|
|
|
std[4 + (joints_num - 1) * 3 : 4 + (joints_num - 1) * 9] = ( |
|
std[4 + (joints_num - 1) * 3 : 4 + (joints_num - 1) * 9] / 1.0 |
|
) |
|
|
|
std[ |
|
4 + (joints_num - 1) * 9 : 4 + (joints_num - 1) * 9 + joints_num * 3 |
|
] = ( |
|
std[ |
|
4 |
|
+ (joints_num - 1) * 9 : 4 |
|
+ (joints_num - 1) * 9 |
|
+ joints_num * 3 |
|
] |
|
/ 1.0 |
|
) |
|
|
|
std[4 + (joints_num - 1) * 9 + joints_num * 3 :] = ( |
|
std[4 + (joints_num - 1) * 9 + joints_num * 3 :] / opt.feat_bias |
|
) |
|
|
|
assert 4 + (joints_num - 1) * 9 + joints_num * 3 + 4 == mean.shape[-1] |
|
|
|
if accelerator is not None and accelerator.is_main_process: |
|
np.save(pjoin(opt.meta_dir, "mean.npy"), mean) |
|
np.save(pjoin(opt.meta_dir, "std.npy"), std) |
|
|
|
self.mean = mean |
|
self.std = std |
|
self.data_dict = data_dict |
|
self.name_list = name_list |
|
|
|
def inv_transform(self, data): |
|
return data * self.std + self.mean |
|
|
|
def __len__(self): |
|
return len(self.data_dict) |
|
|
|
def __getitem__(self, idx): |
|
data = self.data_dict[self.name_list[idx]] |
|
motion, m_length, text_list = data["motion"], data["length"], data["text"] |
|
|
|
|
|
text_data = random.choice(text_list) |
|
caption = text_data["caption"] |
|
|
|
"Z Normalization" |
|
if self.mode not in ["xyz_gt", "hml_gt"]: |
|
motion = (motion - self.mean) / self.std |
|
|
|
"crop motion" |
|
if self.mode in ["eval", "gt_eval"]: |
|
|
|
if self.unit_length < 10: |
|
coin2 = np.random.choice(["single", "single", "double"]) |
|
else: |
|
coin2 = "single" |
|
if coin2 == "double": |
|
m_length = (m_length // self.unit_length - 1) * self.unit_length |
|
elif coin2 == "single": |
|
m_length = (m_length // self.unit_length) * self.unit_length |
|
idx = random.randint(0, len(motion) - m_length) |
|
motion = motion[idx : idx + m_length] |
|
elif m_length >= self.max_motion_length: |
|
idx = random.randint(0, len(motion) - self.max_motion_length) |
|
motion = motion[idx : idx + self.max_motion_length] |
|
m_length = self.max_motion_length |
|
|
|
"pad motion" |
|
if m_length < self.max_motion_length: |
|
motion = np.concatenate( |
|
[ |
|
motion, |
|
np.zeros((self.max_motion_length - m_length, motion.shape[1])), |
|
], |
|
axis=0, |
|
) |
|
assert len(motion) == self.max_motion_length |
|
|
|
if self.mode in ["gt_eval", "eval"]: |
|
"word embedding for text-to-motion evaluation" |
|
tokens = text_data["tokens"] |
|
if len(tokens) < self.max_text_len: |
|
|
|
tokens = ["sos/OTHER"] + tokens + ["eos/OTHER"] |
|
sent_len = len(tokens) |
|
tokens = tokens + ["unk/OTHER"] * (self.max_text_len + 2 - sent_len) |
|
else: |
|
|
|
tokens = tokens[: self.max_text_len] |
|
tokens = ["sos/OTHER"] + tokens + ["eos/OTHER"] |
|
sent_len = len(tokens) |
|
pos_one_hots = [] |
|
word_embeddings = [] |
|
for token in tokens: |
|
word_emb, pos_oh = self.w_vectorizer[token] |
|
pos_one_hots.append(pos_oh[None, :]) |
|
word_embeddings.append(word_emb[None, :]) |
|
pos_one_hots = np.concatenate(pos_one_hots, axis=0) |
|
word_embeddings = np.concatenate(word_embeddings, axis=0) |
|
return ( |
|
word_embeddings, |
|
pos_one_hots, |
|
caption, |
|
sent_len, |
|
motion, |
|
m_length, |
|
"_".join(tokens), |
|
) |
|
elif self.mode in ["xyz_gt"]: |
|
"Convert motion hml representation to skeleton points xyz" |
|
|
|
motion = torch.from_numpy(motion).float() |
|
pred_joints = recover_from_ric( |
|
motion, self.joints_num |
|
) |
|
|
|
|
|
floor_height = pred_joints.min(dim=0)[0].min(dim=0)[0][1] |
|
pred_joints[:, :, 1] -= floor_height |
|
return pred_joints |
|
|
|
return caption, motion, m_length |
|
|
|
|
|
class HumanML3D(Text2MotionDataset): |
|
def __init__(self, opt, split="train", mode="train", accelerator=None): |
|
self.data_root = "./data/HumanML3D" |
|
self.min_motion_len = 40 |
|
self.joints_num = 22 |
|
self.dim_pose = 263 |
|
self.max_motion_length = 196 |
|
if accelerator: |
|
accelerator.print( |
|
"\n Loading %s mode HumanML3D %s dataset ..." % (mode, split) |
|
) |
|
else: |
|
print("\n Loading %s mode HumanML3D dataset ..." % mode) |
|
super(HumanML3D, self).__init__(opt, split, mode, accelerator) |
|
|
|
|
|
class KIT(Text2MotionDataset): |
|
def __init__(self, opt, split="train", mode="train", accelerator=None): |
|
self.data_root = "./data/KIT-ML" |
|
self.min_motion_len = 24 |
|
self.joints_num = 21 |
|
self.dim_pose = 251 |
|
self.max_motion_length = 196 |
|
if accelerator: |
|
accelerator.print("\n Loading %s mode KIT %s dataset ..." % (mode, split)) |
|
else: |
|
print("\n Loading %s mode KIT dataset ..." % mode) |
|
super(KIT, self).__init__(opt, split, mode, accelerator) |
|
|