import os import subprocess import yaml YAML_PATH = "./config.yaml" PIPE_PATH = "./tmp/pipe" class Dumper(yaml.Dumper): def increase_indent(self, flow=False, *args, **kwargs): return super().increase_indent(flow=flow, indentless=False) # read scanners from yaml file # return a list of scanners def read_scanners(path): scanners = [] with open(path, "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) scanners = config.get("detectors", []) return scanners # convert a list of scanners to yaml file def write_scanners(scanners): print(scanners) with open(YAML_PATH, "r+") as f: config = yaml.load(f, Loader=yaml.FullLoader) if config: config["detectors"] = scanners # save scanners to detectors in yaml yaml.dump(config, f, Dumper=Dumper) # read model_type from yaml file def read_inference_type(path): inference_type = "" with open(path, "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) inference_type = config.get("inference_type", "") return inference_type # write model_type to yaml file def write_inference_type(use_inference): with open(YAML_PATH, "r+") as f: config = yaml.load(f, Loader=yaml.FullLoader) if use_inference: config["inference_type"] = "hf_inference_api" else: config["inference_type"] = "hf_pipeline" # save inference_type to inference_type in yaml yaml.dump(config, f, Dumper=Dumper) # read column mapping from yaml file def read_column_mapping(path): column_mapping = {} with open(path, "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) if config: column_mapping = config.get("column_mapping", dict()) return column_mapping # write column mapping to yaml file def write_column_mapping(mapping): with open(YAML_PATH, "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) if config is None: return if mapping is None and "column_mapping" in config.keys(): del config["column_mapping"] else: config["column_mapping"] = mapping with open(YAML_PATH, "w") as f: # save column_mapping to column_mapping in yaml yaml.dump(config, f, Dumper=Dumper) # convert column mapping dataframe to json def convert_column_mapping_to_json(df, label=""): column_mapping = {} column_mapping[label] = [] for _, row in df.iterrows(): column_mapping[label].append(row.tolist()) return column_mapping def get_logs_file(uid): try: file = open(f"./tmp/{uid}_log", "r") return file.read() except Exception: return "Log file does not exist" def write_log_to_user_file(id, log): with open(f"./tmp/{id}_log", "a") as f: f.write(log) def save_job_to_pipe(id, job, lock): if not os.path.exists("./tmp"): os.makedirs("./tmp") job = [str(i) for i in job] job = ",".join(job) print(job) with lock: with open(PIPE_PATH, "a") as f: # write each element in job f.write(f"{id}@{job}\n") def pop_job_from_pipe(): if not os.path.exists(PIPE_PATH): return with open(PIPE_PATH, "r") as f: job = f.readline().strip() remaining = f.readlines() f.close() with open(PIPE_PATH, "w") as f: f.write("\n".join(remaining)) f.close() if len(job) == 0: return job_info = job.split("\n")[0].split("@") if len(job_info) != 2: raise ValueError("Invalid job info: ", job_info) write_log_to_user_file(job_info[0], f"Running job id {job_info[0]}\n") command = job_info[1].split(",") masked_command = command.copy() hf_token_index = masked_command.index("--hf_token") masked_command[hf_token_index + 1] = "hf_********" write_log_to_user_file(job_info[0], f"Running command {masked_command}\n") log_file = open(f"./tmp/{job_info[0]}_log", "a") subprocess.Popen( command, cwd=os.path.join(os.path.dirname(os.path.realpath(__file__)), "cicd"), stdout=log_file, stderr=log_file, )