SocialAISchool / campain_launcher.py
grg's picture
Cleaned old git history
be5548b
import sys
import time
from pathlib import Path
from datetime import date
import subprocess
import shutil
import os
import stat
import getpass
def get_sec(time_str):
"""Get seconds from time."""
h, m, s = time_str.split(':')
return int(h) * 3600 + int(m) * 60 + int(s)
def write_script(script_fullname, exp_name, PYTHON_INTERP, n_cpu_cores, slurm_conf_name, run_args, script_frames,
is_continue=False, dependecy_jobid=None):
print('creating slurm script with: --model {} {} --frames {} {}'.format(exp_name, run_args, script_frames, "--continue-train auto" if is_continue else ""))
logfile_name = "{}{}_jid_%A".format(exp_name, "_cont_"+dependecy_jobid if is_continue else "")
with open(script_fullname, 'w') as f:
f.write('#!/bin/sh\n')
if is_continue:
f.write('#SBATCH --dependency=afterok:{}\n'.format(dependecy_jobid))
f.write('#SBATCH --kill-on-invalid-dep=yes\n')
f.write('#SBATCH --ntasks=1\n')
f.write('#SBATCH --cpus-per-task={}\n'.format((n_cpu_cores * n_seeds_per_one_launch)//2)) # cpus asked = num_cores // 2
if "jz" in slurm_conf_name:
f.write('#SBATCH --hint=nomultithread\n')
f.write(slurm_confs[slurm_conf_name])
f.write('#SBATCH --open-mode=append\n') # append logs in logs files instead of truncating
f.write('#SBATCH -o campain_logs/jobouts/{}.sh.out\n'
'#SBATCH -e campain_logs/jobouts/{}.sh.err\n'.format(logfile_name, logfile_name))
f.write("export EXP_INTERP='{}' ;\n".format(PYTHON_INTERP))
f.write('# Launch !\n')
f.write(
'cpu_list=$(taskset -pc $$ | sed -E "s/(.*): (.*)/\\2/g" | tr "," "\\n" | sed -E "s/^[0-9]*$/&-&/g" | sed -E "s/-/ /g" | xargs -l seq | tr "\\n" " ")\n')
f.write('echo "cpu list: $cpu_list"\n')
f.write('COUNT=${1:-0}\n')
f.write('i=0\n')
f.write('cpus=""\n')
f.write('for cpu in $cpu_list; do\n')
f.write('cpus="$cpus$cpu"\n')
f.write('i=$(($i+1))\n')
f.write('if [ "$i" = "{}" ]; then\n'.format(n_cpu_cores))
if "2gpus" in slurm_conf_name:
f.write(
"{}".format('CUDA_VISIBLE_DEVICES=$(( $COUNT % 2 )); ') +
'taskset -c $cpus $EXP_INTERP -m scripts.train --model {}/$COUNT --seed $COUNT'.format(exp_name) +
run_args + " --frames {}".format(script_frames) + "{}".format(" --continue-train auto" if is_continue else "") + ' &\n')
elif "4gpus" in slurm_conf_name:
f.write(
"{}".format('CUDA_VISIBLE_DEVICES=$(( $COUNT % 4 )); ') +
'taskset -c $cpus $EXP_INTERP -m scripts.train --model {}/$COUNT --seed $COUNT'.format(exp_name) +
run_args + " --frames {}".format(script_frames) + "{}".format(" --continue-train auto" if is_continue else "") + ' &\n')
else:
f.write(
# "{}".format('CUDA_VISIBLE_DEVICES=$(( $COUNT % 2 )); ' if "2gpus" in slurm_conf_name else "") +
'taskset -c $cpus $EXP_INTERP -m scripts.train --model {}/$COUNT --seed $COUNT'.format(exp_name) +
run_args + " --frames {}".format(script_frames) + "{}".format(" --continue-train auto" if is_continue else "") + ' &\n')
f.write('echo "Using cpus $cpus for seed $COUNT"\n')
f.write('COUNT=$(( $COUNT + 1 ))\n')
f.write('cpus=""\n')
f.write('i=0\n')
f.write('else\n')
f.write('cpus="$cpus,"\n')
f.write('fi\n')
f.write('done\n')
f.write('wait\n')
f.close()
st = os.stat(script_fullname)
os.chmod(script_fullname, st.st_mode | stat.S_IEXEC)
def write_script_one_seed(script_fullname, exp_name, PYTHON_INTERP, n_cpu_cores, slurm_conf_name, run_args, script_frames,
is_continue=False, dependecy_jobid=None):
n_cpus = n_cpu_cores//2
assert n_seeds_per_one_launch == 1, "Use write_script_old"
print('creating slurm script with: --model {} {} --frames {} {}'.format(exp_name, run_args, script_frames, "--continue-train auto" if is_continue else ""))
logfile_name = "{}{}_jid_%A".format(exp_name, "_cont_"+dependecy_jobid if is_continue else "")
with open(script_fullname, 'w') as f:
f.write('#!/bin/sh\n')
if is_continue:
f.write('#SBATCH --dependency=afterok:{}\n'.format(dependecy_jobid))
f.write('#SBATCH --kill-on-invalid-dep=yes\n')
f.write('#SBATCH --ntasks=1\n')
f.write('#SBATCH --cpus-per-task={}\n'.format((n_cpus)))
if "jz" in slurm_conf_name:
f.write('#SBATCH --hint=nomultithread\n')
f.write(slurm_confs[slurm_conf_name])
f.write('#SBATCH --open-mode=append\n') # append logs in logs files instead of truncating
f.write('#SBATCH -o campain_logs/jobouts/{}.sh.out\n'
'#SBATCH -e campain_logs/jobouts/{}.sh.err\n'.format(logfile_name, logfile_name))
f.write("export EXP_INTERP='{}' ;\n".format(PYTHON_INTERP))
f.write('SEED=${1:-0}\n')
f.write('# Launch !\n')
f.write(
'$EXP_INTERP -m scripts.train --model {}/$SEED --seed $SEED'.format(exp_name) +
run_args + " --frames {}".format(script_frames) + "{}".format(" --continue-train auto" if is_continue else ""))
f.close()
st = os.stat(script_fullname)
os.chmod(script_fullname, st.st_mode | stat.S_IEXEC)
def process_arg_string(expe_args): # function to extract flagged (with a *) arguments as details for experience name
details_string = ''
processed_arg_string = expe_args.replace('*', '') # keep a version of args cleaned from exp name related flags
# args = [arg_chunk.split(' -') for arg_chunk in expe_args.split(' --')]
arg_chunks = [arg_chunk for arg_chunk in expe_args.split(' --')]
args_list = []
for arg in arg_chunks:
if ' -' in arg and arg.split(' -')[1].isalpha():
args_list.extend(arg.split(' -'))
else:
args_list.append(arg)
# args_list = [item for sublist in args for item in sublist] # flatten
for arg in args_list:
if arg == '':
continue
if arg[0] == '*':
if arg[-1] == ' ':
arg = arg[:-1]
details_string += '_' + arg[1:].replace(' ', '_').replace('/', '-')
return details_string, processed_arg_string
slurm_confs = {'curta_extra_long': "#SBATCH -p inria\n"
"#SBATCH -t 119:00:00\n",
'curta_long': "#SBATCH -p inria\n"
"#SBATCH -t 72:00:00\n",
'curta_medium': "#SBATCH -p inria\n"
"#SBATCH -t 48:00:00\n",
'curta_short': "#SBATCH -p inria\n"
"#SBATCH -t 24:00:00\n",
'jz_super_short_gpu':
'#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:1\n'
"#SBATCH -t 3:59:00\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_short_gpu': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:1\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_super_short_gpu_chained':
'#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:1\n'
"#SBATCH -t 3:59:00\n"
"#SBATCH -C v100\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_short_gpu_chained': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:1\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH -C v100\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_short_gpu_chained_a100_4h': '#SBATCH -A imi@a100\n'
'#SBATCH --gres=gpu:1\n'
"#SBATCH -t 3:59:00\n"
"#SBATCH -C a100\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_short_gpu_chained_a100': '#SBATCH -A imi@a100\n'
'#SBATCH --gres=gpu:1\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH -C a100\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_short_2gpus_chained': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:2\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH -C v100\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_short_4gpus_chained': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:4\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH -C v100\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_medium_gpu': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:1\n'
"#SBATCH -t 48:00:00\n"
"#SBATCH --qos=qos_gpu-t4\n",
'jz_super_short_2gpus': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:2\n'
"#SBATCH -t 14:59:00\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_short_2gpus': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:2\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_short_2gpus_32g': '#SBATCH -A imi@v100\n'
'#SBATCH -C v100-32g\n'
'#SBATCH --gres=gpu:2\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH --qos=qos_gpu-t3\n",
'jz_medium_2gpus': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:2\n'
"#SBATCH -t 48:00:00\n"
"#SBATCH --qos=qos_gpu-t4\n",
'jz_medium_2gpus_32g': '#SBATCH -A imi@v100\n'
'#SBATCH -C v100-32g\n'
'#SBATCH --gres=gpu:2\n'
"#SBATCH -t 48:00:00\n"
"#SBATCH --qos=qos_gpu-t4\n",
'jz_long_gpu': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:1\n'
"#SBATCH -t 72:00:00\n"
"#SBATCH --qos=qos_gpu-t4\n",
'jz_long_2gpus': '#SBATCH -A imi@v100\n'
'#SBATCH --gres=gpu:2\n'
'#SBATCH -t 72:00:00\n'
'#SBATCH --qos=qos_gpu-t4\n',
'jz_long_2gpus_32g': '#SBATCH -A imi@v100\n'
'#SBATCH -C v100-32g\n'
'#SBATCH --gres=gpu:2\n'
"#SBATCH -t 72:00:00\n"
"#SBATCH --qos=qos_gpu-t4\n",
'jz_super_long_2gpus_32g': '#SBATCH -A imi@v100\n'
'#SBATCH -C v100-32g\n'
'#SBATCH --gres=gpu:2\n'
"#SBATCH -t 99:00:00\n"
"#SBATCH --qos=qos_gpu-t4\n",
'jz_short_cpu_chained': '#SBATCH -A imi@cpu\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH --qos=qos_cpu-t3\n",
'jz_short_cpu': '#SBATCH -A imi@cpu\n'
"#SBATCH -t 19:59:00\n"
"#SBATCH --qos=qos_cpu-t3\n",
'jz_medium_cpu': '#SBATCH -A imi@cpu\n'
"#SBATCH -t 48:00:00\n"
"#SBATCH --qos=qos_cpu-t4\n",
'jz_long_cpu': '#SBATCH -A imi@cpu\n'
"#SBATCH -t 72:00:00\n"
"#SBATCH --qos=qos_cpu-t4\n",
'plafrim_cpu_medium': "#SBATCH -t 48:00:00\n",
'plafrim_cpu_long': "#SBATCH -t 72:00:00\n",
'plafrim_gpu_medium': '#SBATCH -p long_sirocco\n'
"#SBATCH -t 48:00:00\n"
'#SBATCH --gres=gpu:1\n'
}
cur_path = str(Path.cwd())
date = date.today().strftime("%d-%m")
# create campain log dir if not already done
Path(cur_path + "/campain_logs/jobouts/").mkdir(parents=True, exist_ok=True)
Path(cur_path + "/campain_logs/scripts/").mkdir(parents=True, exist_ok=True)
# Load txt file containing experiments to run (give it as argument to this script)
filename = 'to_run.txt'
if len(sys.argv) >= 2:
filename = sys.argv[1]
launch = True
# Save a copy of txt file
shutil.copyfile(cur_path + "/" + filename, cur_path + '/campain_logs/scripts/' + date + '_' + filename)
# how many seeds does one launch launch
# one_launch_per_n_seeds = 8
global_seed_offset = 0
incremental = False
if len(sys.argv) >= 3:
if sys.argv[2] == 'nolaunch':
launch = False
if sys.argv[2] == 'seed_offset':
global_seed_offset = int(sys.argv[3])
if sys.argv[2] == 'incremental_seed_offset':
global_seed_offset = int(sys.argv[3])
incremental = True
if launch:
print('Creating and Launching slurm scripts given arguments from {}'.format(filename))
# time.sleep(1.0)
expe_list = []
with open(filename, 'r') as f:
expe_list = [line.rstrip() for line in f]
exp_names = set()
for expe_args in expe_list:
seed_offset_to_use = global_seed_offset
if len(expe_args) == 0:
# empty line
continue
if expe_args[0] == '#':
# comment line
continue
arguments = ['slurm_conf', 'nb_seeds', 'cpu_cores_per_seed', 'gpus_per_seed', 'seeds_per_launch', 'frames', 'model']
exp_config = expe_args.split('--')[1:len(arguments)+1]
given_args = [arg.split(' ')[0] for arg in exp_config]
if not given_args == arguments:
raise ValueError("Arguments must be in the following order {}, and are {}".format(arguments, given_args))
slurm_conf_name, nb_seeds, n_cpu_cores_per_seed, n_gpus_per_seed, n_seeds_per_one_launch, frames, exp_name = [arg.split(' ')[1] for arg in exp_config]
n_seeds_per_one_launch = int(n_seeds_per_one_launch)
n_cpu_cores_per_seed = int(n_cpu_cores_per_seed)
user = getpass.getuser()
if 'curta' in slurm_conf_name:
gpu = ''
PYTHON_INTERP = "$HOME/anaconda3/envs/act_and_speak/bin/python"
n_cpu_cores_per_seed = 1
elif 'plafrim' in slurm_conf_name:
gpu = ''
PYTHON_INTERP = '/home/{}/USER/conda/envs/act_and_speak/bin/python'.format(user)
n_cpu_cores_per_seed = 1
elif 'jz' in slurm_conf_name:
if user == "utu57ed" or user == 'flowers':
PYTHON_INTERP='/gpfsscratch/rech/imi/{}/miniconda3/envs/social_ai/bin/python'.format(user)
elif user == "uxo14qj":
PYTHON_INTERP='/gpfswork/rech/imi/{}/miniconda3/envs/act_and_speak/bin/python'.format(user)
else:
if user != "flowers":
raise ValueError("Who are you? User {} unknown.".format(user))
gpu = '' # '--gpu_id 0'
# n_cpus = 2
# n_seeds_per_one_launch = 4
# n_cpu_cores = 16 # n cpu cores for one seed
# assert n_cpu_cores * n_seeds_per_one_launch == 64
# n_seeds_per_one_launch = 2
# n_cpu_cores = 16 # n cpu cores for one seed
# assert n_cpu_cores * n_seeds_per_one_launch == 32
# n_seeds_per_one_launch = 2
# n_cpu_cores = 32 # n cpu cores for one seed
# assert n_cpu_cores * n_seeds_per_one_launch == 64
# n_seeds_per_one_launch = 1
# n_cpu_cores = 16 # n cpu cores for one seed
# assert n_cpu_cores * n_seeds_per_one_launch == 16
#
# n_seeds_per_one_launch = 1
# n_cpu_cores = 32 # n cpu cores for one seed
# assert n_cpu_cores * n_seeds_per_one_launch == 32
#
# assert n_seeds_per_one_launch == 1
# assert n_cpu_cores_per_seed == 64 # n cpu cores for one seed
# assert n_cpu_cores_per_seed * n_seeds_per_one_launch == 64
# n_cpus = 64 # n cpu cores for one seed
# assert n_cpus*one_launch_per_n_seeds == 256 # cpus_per_task is 8 will result in 16 cpu cores
if "2gpus" in slurm_conf_name:
job_gpus = 2
elif "4gpus" in slurm_conf_name:
job_gpus = 4
elif "gpu" in slurm_conf_name:
job_gpus = 1
else:
print("No gpus used")
job_gpus = 1
assert float(n_gpus_per_seed) == float(job_gpus / n_seeds_per_one_launch)
print(f"\nJob configuration (1 launch):")
print(f"\tSeeds: {n_seeds_per_one_launch}")
print(f"\tGPUs: {job_gpus}")
print(f"\n1 seed configuration:")
print(f"\tCPU cores {n_cpu_cores_per_seed}")
print(f"\tGPUs {job_gpus / n_seeds_per_one_launch}")
time.sleep(0.5)
else:
raise Exception("Unrecognized conf name: {} ".format(slurm_conf_name))
# assert ((int(nb_seeds) % 8) == 0), 'number of seeds should be divisible by 8'
assert ((int(nb_seeds) % 4) == 0) or (int(nb_seeds) == 1), f'number of seeds should be divisible by 4 or 1 and is {nb_seeds}'
run_args = expe_args.split(exp_name, 1)[
1] # WARNING: assumes that exp_name comes after slurm_conf and nb_seeds and frames in txt
# prepare experiment name formatting (use --* or -* instead of -- or - to use argument in experiment name
# print(expe_args.split(exp_name))
exp_details, run_args = process_arg_string(run_args)
exp_name = date + '_' + exp_name + exp_details
# no two trains are to be put in the same dir
assert exp_names not in exp_names
exp_names.add(exp_name)
slurm_script_fullname = cur_path + "/campain_logs/scripts/{}".format(exp_name) + ".sh"
# create corresponding slurm script
# calculate how many chained jobs we need
chained_training = "chained" in slurm_conf_name
frames = int(frames)
print(chained_training)
if chained_training:
# assume 10M frames per 20h (fps 140 - very conservative)
timelimit = slurm_confs[slurm_conf_name].split("-t ")[-1].split("\n")[0]
if timelimit == '19:59:00':
one_script_frames = 10000000
elif timelimit == "3:59:00":
one_script_frames = 2500000
else:
raise ValueError(f"Bad timelimit {timelimit}.")
print(f"One script frames: {one_script_frames}")
num_chained_jobs = frames // one_script_frames + bool(frames % one_script_frames)
# # assume conservative fps - 300 (for one seed per gpu)
# fps = 300
# timelimit = slurm_confs[slurm_conf_name].split("-t ")[-1].split("\n")[0]
# assert timelimit == '3:59:00'
# timelimit_secs = get_sec(timelimit)
#
# one_script_frames = fps*timelimit_secs
#
# num_chained_jobs = frames // one_script_frames + bool(frames % one_script_frames)
#
# print(f"One script frames: {one_script_frames} -> num chained jobs {num_chained_jobs}")
else:
one_script_frames = frames
num_chained_jobs = 1 # no chaining
assert "--frames " not in run_args
current_script_frames = min(one_script_frames, frames)
if n_seeds_per_one_launch == 1:
write_script_one_seed(slurm_script_fullname, exp_name, PYTHON_INTERP, n_cpu_cores_per_seed,
slurm_conf_name, run_args, current_script_frames, is_continue=False,
dependecy_jobid=None)
else:
write_script(slurm_script_fullname, exp_name, PYTHON_INTERP, n_cpu_cores_per_seed, slurm_conf_name,
run_args, current_script_frames, is_continue=False, dependecy_jobid=None)
# launch scripts
if launch:
for i in range(int(nb_seeds) // n_seeds_per_one_launch):
print('starting from seed {}'.format((i * n_seeds_per_one_launch) + global_seed_offset))
# run start job
sbatch_pipe = subprocess.Popen(
['sbatch', 'campain_logs/scripts/{}.sh'.format(exp_name), str((i * n_seeds_per_one_launch) + seed_offset_to_use)], # 0 4 8 12
stdout=subprocess.PIPE
)
job_id = subprocess.check_output(('cut', '-d', ' ', '-f', '4'), stdin=sbatch_pipe.stdout).decode("utf_8").rstrip()
sbatch_pipe.wait()
# out = subprocess.run(
# ['sbatch', 'campain_logs/scripts/{}.sh'.format(exp_name), str((i * one_launch_per_n_seeds) + seed_offset_to_use)], # 0 4 8 12
# capture_output=True
# ).stdout.decode("utf-8")
# continue jobs
for cont_job_i in range(num_chained_jobs-1):
# write continue script
cont_script_name = "{}_continue_{}.sh".format(exp_name, job_id)
continue_slurm_script_fullname = cur_path + "/campain_logs/scripts/"+cont_script_name
current_script_frames = min(one_script_frames*(2+cont_job_i), frames)
if n_seeds_per_one_launch == 1:
write_script_one_seed(continue_slurm_script_fullname, exp_name, PYTHON_INTERP, n_cpu_cores_per_seed,
slurm_conf_name, run_args, current_script_frames,
is_continue=True, dependecy_jobid=job_id)
else:
write_script(continue_slurm_script_fullname, exp_name, PYTHON_INTERP, n_cpu_cores_per_seed, slurm_conf_name, run_args, current_script_frames,
is_continue=True, dependecy_jobid=job_id)
# run continue job
sbatch_pipe = subprocess.Popen(
['sbatch', 'campain_logs/scripts/{}'.format(cont_script_name), str((i * n_seeds_per_one_launch) + seed_offset_to_use)], # 0 4 8 12
stdout=subprocess.PIPE
)
job_id = subprocess.check_output(('cut', '-d', ' ', '-f', '4'), stdin=sbatch_pipe.stdout).decode("utf_8").rstrip()
sbatch_pipe.wait()
if incremental:
global_seed_offset += int(nb_seeds)