praeclarumjj3's picture
:zap: add code
9fa3d89
raw
history blame
4.62 kB
import csv
import hashlib
import json
import os
import os.path as osp
import pickle
import time
import numpy as np
import pandas as pd
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (np.int_, np.intc, np.intp, np.int8,
np.int16, np.int32, np.int64, np.uint8,
np.uint16, np.uint32, np.uint64)):
return int(obj)
elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)):
return float(obj)
elif isinstance(obj, (np.complex_, np.complex64, np.complex128)):
return {'real': obj.real, 'imag': obj.imag}
elif isinstance(obj, (np.ndarray,)):
return obj.tolist()
elif isinstance(obj, (np.bool_)):
return bool(obj)
elif isinstance(obj, (np.void)):
return None
return json.JSONEncoder.default(self, obj)
# LOAD & DUMP
def dump(data, f, **kwargs):
def dump_pkl(data, pth, **kwargs):
pickle.dump(data, open(pth, 'wb'))
def dump_json(data, pth, **kwargs):
json.dump(data, open(pth, 'w'), indent=4, ensure_ascii=False, cls=NumpyEncoder)
def dump_jsonl(data, f, **kwargs):
lines = [json.dumps(x, ensure_ascii=False, cls=NumpyEncoder) for x in data]
with open(f, 'w', encoding='utf8') as fout:
fout.write('\n'.join(lines))
def dump_xlsx(data, f, **kwargs):
data.to_excel(f, index=False, engine='xlsxwriter')
def dump_csv(data, f, quoting=csv.QUOTE_ALL):
data.to_csv(f, index=False, encoding='utf-8', quoting=quoting)
def dump_tsv(data, f, quoting=csv.QUOTE_ALL):
data.to_csv(f, sep='\t', index=False, encoding='utf-8', quoting=quoting)
handlers = dict(pkl=dump_pkl, json=dump_json, jsonl=dump_jsonl, xlsx=dump_xlsx, csv=dump_csv, tsv=dump_tsv)
suffix = f.split('.')[-1]
return handlers[suffix](data, f, **kwargs)
def load(f):
def load_pkl(pth):
return pickle.load(open(pth, 'rb'))
def load_json(pth):
return json.load(open(pth, 'r', encoding='utf-8'))
def load_jsonl(f):
lines = open(f, encoding='utf-8').readlines()
lines = [x.strip() for x in lines]
if lines[-1] == '':
lines = lines[:-1]
data = [json.loads(x) for x in lines]
return data
def load_xlsx(f):
return pd.read_excel(f)
def load_csv(f):
return pd.read_csv(f)
def load_tsv(f):
return pd.read_csv(f, sep='\t')
handlers = dict(pkl=load_pkl, json=load_json, jsonl=load_jsonl, xlsx=load_xlsx, csv=load_csv, tsv=load_tsv)
suffix = f.split('.')[-1]
return handlers[suffix](f)
def download_file(url, filename=None):
import urllib.request
from tqdm import tqdm
class DownloadProgressBar(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
if filename is None:
filename = url.split('/')[-1]
with DownloadProgressBar(unit='B', unit_scale=True,
miniters=1, desc=url.split('/')[-1]) as t:
urllib.request.urlretrieve(url, filename=filename, reporthook=t.update_to)
return filename
def ls(dirname='.', match='', mode='all', level=1):
if dirname == '.':
ans = os.listdir(dirname)
else:
ans = [osp.join(dirname, x) for x in os.listdir(dirname)]
assert mode in ['all', 'dir', 'file']
assert level >= 1 and isinstance(level, int)
if level == 1:
ans = [x for x in ans if match in x]
if mode == 'dir':
ans = [x for x in ans if osp.isdir(x)]
elif mode == 'file':
ans = [x for x in ans if not osp.isdir(x)]
else:
ans = [x for x in ans if osp.isdir(x)]
res = []
for d in ans:
res.extend(ls(d, match=match, mode=mode, level=level-1))
ans = res
return ans
def mrlines(fname, sp='\n'):
f = open(fname).read().split(sp)
while f != [] and f[-1] == '':
f = f[:-1]
return f
def mwlines(lines, fname):
with open(fname, 'w') as fout:
fout.write('\n'.join(lines))
def md5(file_pth):
with open(file_pth, 'rb') as f:
hash = hashlib.new('md5')
for chunk in iter(lambda: f.read(2**20), b''):
hash.update(chunk)
return str(hash.hexdigest())
def last_modified(pth):
stamp = osp.getmtime(pth)
m_ti = time.ctime(stamp)
t_obj = time.strptime(m_ti)
t = time.strftime('%Y%m%d%H%M%S', t_obj)[2:]
return t