# flake8: noqa: F401, F403 import abc import argparse import copy as cp import csv import datetime import multiprocessing as mp import os import os.path as osp import random as rd import shutil import subprocess import warnings from collections import OrderedDict, defaultdict from multiprocessing import Pool, current_process import matplotlib.pyplot as plt import pandas as pd import requests import seaborn as sns from huggingface_hub import scan_cache_dir from sty import bg, ef, fg, rs from tabulate import tabulate, tabulate_formats from tqdm import tqdm def process_punctuation(inText): import re outText = inText punct = [ ';', r'/', '[', ']', '"', '{', '}', '(', ')', '=', '+', '\\', '_', '-', '>', '<', '@', '`', ',', '?', '!' ] commaStrip = re.compile('(\d)(,)(\d)') # noqa: W605 periodStrip = re.compile('(?!<=\d)(\.)(?!\d)') # noqa: W605 for p in punct: if (p + ' ' in inText or ' ' + p in inText) or (re.search( commaStrip, inText) is not None): outText = outText.replace(p, '') else: outText = outText.replace(p, ' ') outText = periodStrip.sub('', outText, re.UNICODE) return outText def h2r(value): if value[0] == '#': value = value[1:] assert len(value) == 6 return tuple(int(value[i:i + 2], 16) for i in range(0, 6, 2)) def r2h(rgb): return '#%02x%02x%02x' % rgb def colored(s, color): if isinstance(color, str): if hasattr(fg, color): return getattr(fg, color) + s + fg.rs color = h2r(color) return fg(*color) + s + fg.rs def istype(s, type): if isinstance(s, type): return True try: return isinstance(eval(s), type) except Exception as _: return False def bincount(lst): bins = defaultdict(lambda: 0) for item in lst: bins[item] += 1 return bins def get_cache_path(repo_id): hf_cache_info = scan_cache_dir() repos = list(hf_cache_info.repos) repo = None for r in repos: if r.repo_id == repo_id: repo = r break if repo is None: return None revs = list(repo.revisions) rev2keep, last_modified = None, 0 for rev in revs: if rev.last_modified > last_modified: rev2keep, last_modified = rev, rev.last_modified if rev2keep is None: return None return str(rev2keep.snapshot_path) def proxy_set(s): import os for key in ['http_proxy', 'HTTP_PROXY', 'https_proxy', 'HTTPS_PROXY']: os.environ[key] = s def get_rank_and_world_size(): local_rank = int(os.environ.get("RANK", 0)) world_size = int(os.environ.get("WORLD_SIZE", 1)) return local_rank, world_size def get_local_rank_and_world_size(): local_rank = int(os.environ.get("LOCAL_RANK", 0)) world_size = int(os.environ.get("WORLD_SIZE", 1)) return local_rank, world_size def splitlen(s, sym='/'): return len(s.split(sym)) def listinstr(lst, s): assert isinstance(lst, list) for item in lst: if item in s: return True return False def d2df(D): return pd.DataFrame({x: [D[x]] for x in D}) def cn_string(s): import re if re.search(u'[\u4e00-\u9fff]', s): return True return False try: import decord except ImportError: pass def timestr(second=True, minute=False): s = datetime.datetime.now().strftime('%Y%m%d%H%M%S')[2:] if second: return s elif minute: return s[:-2] else: return s[:-4] def dict_merge(dct, merge_dct): for k, _ in merge_dct.items(): if (k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], dict)): # noqa dict_merge(dct[k], merge_dct[k]) else: dct[k] = merge_dct[k] def youtube_dl(idx): cmd = f'youtube-dl -f best -f mp4 "{idx}" -o {idx}.mp4' os.system(cmd) def run_command(cmd): if isinstance(cmd, str): cmd = cmd.split() return subprocess.check_output(cmd)