Spaces:
Runtime error
Runtime error
from typing import List, Union | |
from copy import deepcopy | |
from collections import namedtuple | |
from pathlib import Path | |
import argparse | |
from argparse import RawDescriptionHelpFormatter | |
import yaml | |
from pydantic import BaseModel as _Base | |
class BaseConf(_Base): | |
class Config: | |
validate_all = True | |
allow_mutation = True | |
extra = "ignore" | |
def SingleOrList(inner_type): | |
return Union[inner_type, List[inner_type]] | |
def optional_load_config(fname="config.yml"): | |
cfg = {} | |
conf_fname = Path.cwd() / fname | |
if conf_fname.is_file(): | |
with conf_fname.open("r") as f: | |
raw = f.read() | |
print("loaded config\n ") | |
print(raw) # yaml raw itself is well formatted | |
cfg = yaml.safe_load(raw) | |
return cfg | |
def write_full_config(cfg_obj, fname="full_config.yml"): | |
cfg = cfg_obj.dict() | |
cfg = _dict_to_yaml(cfg) | |
print(f"\n--- full config ---\n\n{cfg}\n") | |
with (Path.cwd() / fname).open("w") as f: | |
f.write(cfg) | |
def argparse_cfg_template(curr_cfgs): | |
parser = argparse.ArgumentParser( | |
description='Manual spec of configs', | |
epilog=f'curr cfgs:\n\n{_dict_to_yaml(curr_cfgs)}', | |
formatter_class=RawDescriptionHelpFormatter | |
) | |
_, args = parser.parse_known_args() | |
clauses = [] | |
for i in range(0, len(args), 2): | |
assert args[i][:2] == "--", "please start args with --" | |
clauses.append({args[i][2:]: args[i+1]}) | |
print(f"cmdline clauses: {clauses}") | |
maker = ConfigMaker(curr_cfgs) | |
for clu in clauses: | |
maker.execute_clause(clu) | |
final = maker.state.copy() | |
return final | |
def _dict_to_yaml(arg): | |
return yaml.safe_dump(arg, sort_keys=False, allow_unicode=True) | |
def dispatch(module): | |
cfg = optional_load_config() | |
cfg = module(**cfg).dict() | |
cfg = argparse_cfg_template(cfg) # cmdline takes priority | |
mod = module(**cfg) | |
write_full_config(mod) | |
mod.run() | |
# below are some support tools | |
class ConfigMaker(): | |
CMD = namedtuple('cmd', field_names=['sub', 'verb', 'objs']) | |
VERBS = ('add', 'replace', 'del') | |
def __init__(self, base_node): | |
self.state = base_node | |
self.clauses = [] | |
def clone(self): | |
return deepcopy(self) | |
def execute_clause(self, raw_clause): | |
cls = self.__class__ | |
assert isinstance(raw_clause, (str, dict)) | |
if isinstance(raw_clause, dict): | |
assert len(raw_clause) == 1, \ | |
"a clause can only have 1 statement: {} clauses in {}".format( | |
len(raw_clause), raw_clause | |
) | |
cmd = list(raw_clause.keys())[0] | |
arg = raw_clause[cmd] | |
else: | |
cmd = raw_clause | |
arg = None | |
cmd = self.parse_clause_cmd(cmd) | |
tracer = NodeTracer(self.state) | |
tracer.advance_pointer(path=cmd.sub) | |
if cmd.verb == cls.VERBS[0]: | |
tracer.add(cmd.objs, arg) | |
elif cmd.verb == cls.VERBS[1]: | |
tracer.replace(cmd.objs, arg) | |
elif cmd.verb == cls.VERBS[2]: | |
assert isinstance(raw_clause, str) | |
tracer.delete(cmd.objs) | |
self.state = tracer.state | |
def parse_clause_cmd(cls, input): | |
""" | |
Args: | |
input: a string to be parsed | |
1. First test whether a verb is present | |
2. If not present, then str is a single subject, and verb is replace | |
This is a syntactical sugar that makes writing config easy | |
3. If a verb is found, whatever comes before is a subject, and after the | |
objects. | |
4. Handle the edge cases properly. Below are expected parse outputs | |
input sub verb obj | |
--- No verb | |
'' '' replace [] | |
'a.b' 'a.b' replace [] | |
'add' '' add [] | |
'P Q' err: 2 subjects | |
--- Verb present | |
'T add' 'T' add [] | |
'T del a b' 'T' del [a, b] | |
'P Q add a' err: 2 subjects | |
'P add del b' err: 2 verbs | |
""" | |
assert isinstance(input, str) | |
input = input.split() | |
objs = [] | |
sub = '' | |
verb, verb_inx = cls.scan_for_verb(input) | |
if verb is None: | |
assert len(input) <= 1, "no verb present; more than 1 subject: {}"\ | |
.format(input) | |
sub = input[0] if len(input) == 1 else '' | |
verb = cls.VERBS[1] | |
else: | |
assert not verb_inx > 1, 'verb {} at inx {}; more than 1 subject in: {}'\ | |
.format(verb, verb_inx, input) | |
sub = input[0] if verb_inx == 1 else '' | |
objs = input[verb_inx + 1:] | |
cmd = cls.CMD(sub=sub, verb=verb, objs=objs) | |
return cmd | |
def scan_for_verb(cls, input_list): | |
assert isinstance(input_list, list) | |
counts = [ input_list.count(v) for v in cls.VERBS ] | |
presence = [ cnt > 0 for cnt in counts ] | |
if sum(presence) == 0: | |
return None, -1 | |
elif sum(presence) > 1: | |
raise ValueError("multiple verbs discovered in {}".format(input_list)) | |
if max(counts) > 1: | |
raise ValueError("verbs repeated in cmd: {}".format(input_list)) | |
# by now, there is 1 verb that has occured exactly 1 time | |
verb = cls.VERBS[presence.index(1)] | |
inx = input_list.index(verb) | |
return verb, inx | |
class NodeTracer(): | |
def __init__(self, src_node): | |
""" | |
A src node can be either a list or dict | |
""" | |
assert isinstance(src_node, (list, dict)) | |
# these are movable pointers | |
self.child_token = "_" # init token can be anything | |
self.parent = {self.child_token: src_node} | |
# these are permanent pointers at the root | |
self.root_child_token = self.child_token | |
self.root = self.parent | |
def state(self): | |
return self.root[self.root_child_token] | |
def pointed(self): | |
return self.parent[self.child_token] | |
def advance_pointer(self, path): | |
if len(path) == 0: | |
return | |
path_list = list( | |
map(lambda x: int(x) if str.isdigit(x) else x, path.split('.')) | |
) | |
for i, token in enumerate(path_list): | |
self.parent = self.pointed | |
self.child_token = token | |
try: | |
self.pointed | |
except (IndexError, KeyError): | |
raise ValueError( | |
"During the tracing of {}, {}-th token '{}'" | |
" is not present in node {}".format( | |
path, i, self.child_token, self.state | |
) | |
) | |
def replace(self, objs, arg): | |
assert len(objs) == 0 | |
val_type = type(self.parent[self.child_token]) | |
# this is such an unfortunate hack | |
# turn everything to string, so that eval could work | |
# some of the clauses come from cmdline, some from yaml files for sow. | |
arg = str(arg) | |
if val_type == str: | |
pass | |
else: | |
arg = eval(arg) | |
assert type(arg) == val_type, \ | |
f"require {val_type.__name__}, given {type(arg).__name__}" | |
self.parent[self.child_token] = arg | |