Spaces:
Runtime error
Runtime error
File size: 7,235 Bytes
6b89792 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# -*- coding: utf-8 -*-
# :noTabs=true:
import os, sys, time, collections, math
import stat as stat_module
try:
from .base import *
except ImportError: # workaround for B2 back-end's
import imp
imp.load_source(__name__, '/'.join(__file__.split('/')[:-1]) + '/base.py') # A bit of Python magic here, what we trying to say is this: from base import *, but path to base is calculated from our source location # from base import HPC_Driver, execute, NT
_T_slurm_array_job_template_ = '''\
#!/bin/bash
#
#SBATCH --job-name={name}
#SBATCH --output={log_dir}/.hpc.%x.%a.output
#
#SBATCH --time={time}:00
#SBATCH --mem-per-cpu={memory}M
#SBATCH --chdir={working_dir}
#
#SBATCH --array=1-{jobs_to_queue}
srun {executable} {arguments}
'''
_T_slurm_mpi_job_template_ = '''\
#!/bin/bash
#
#SBATCH --job-name={name}
#SBATCH --output={log_dir}/.hpc.%x.output
#
#SBATCH --time={time}:00
#SBATCH --mem-per-cpu={memory}M
#SBATCH --chdir={working_dir}
#
#SBATCH --ntasks={ntasks}
mpirun {executable} {arguments}
'''
class Slurm_HPC_Driver(HPC_Driver):
def head_node_execute(self, message, command_line, *args, **kwargs):
head_node = self.config['slurm'].get('head_node')
command_line, host = (f"ssh {head_node} cd `pwd` '&& {command_line}'", head_node) if head_node else (command_line, 'localhost')
return execute(f'Executiong on {host}: {message}' if message else '', command_line, *args, **kwargs)
# NodeGroup = collections.namedtuple('NodeGroup', 'nodes cores')
# @property
# def mpi_topology(self):
# ''' return list of NodeGroup's
# '''
# pass
# @property
# def number_of_cpu_per_node(self): return int( self.config['condor']['mpi_cpu_per_node'] )
# @property
# def maximum_number_of_mpi_cpu(self):
# return self.number_of_cpu_per_node * int( self.config['condor']['mpi_maximum_number_of_nodes'] )
# def complete(self, condor_job_id):
# ''' Return job completion status. Note that single hpc_job may contatin inner list of individual HPC jobs, True should be return if they all run in to completion.
# '''
# execute('Releasing condor jobs...', 'condor_release $USER', return_='tuple')
# s = execute('', 'condor_q $USER | grep $USER | grep {}'.format(condor_job_id), return_='output', terminate_on_failure=False).replace(' ', '').replace('\n', '')
# if s: return False
# # #setDaemonStatusAndPing('[Job #%s] Running... %s condor job(s) in queue...' % (self.id, len(s.split('\n') ) ) )
# # n_jobs = len(s.split('\n'))
# # s, o = execute('', 'condor_userprio -all | grep $USER@', return_='tuple')
# # if s == 0:
# # jobs_running = o.split()
# # jobs_running = 'XX' if len(jobs_running) < 4 else jobs_running[4]
# # self.set_daemon_message("Waiting for condor to finish HPC jobs... [{} jobs in HPC-Queue, {} CPU's used]".format(n_jobs, jobs_running) )
# # print "{} condor jobs in queue... Sleeping 32s... \r".format(n_jobs),
# # sys.stdout.flush()
# # time.sleep(32)
# else:
# #self.tracer('Waiting for condor to finish the jobs... DONE')
# self.jobs.remove(condor_job_id)
# self.cpu_usage += self.get_condor_accumulated_usage()
# return True # jobs already finished, we return empty list to prevent double counting of cpu_usage
def complete(self, slurm_job_id):
''' Return True if job with given id is complete
'''
s = self.head_node_execute('', f'squeue -j {slurm_job_id} --noheader', return_='output', terminate_on_failure=False, silent=True)
if s: return False
else:
#self.tracer('Waiting for condor to finish the jobs... DONE')
self.jobs.remove(slurm_job_id)
return True # jobs already finished, we return empty list to prevent double counting of cpu_usage
def cancel_job(self, slurm_job_id):
self.head_node_execute(f'Slurm_HPC_Driver.canceling job {slurm_job_id}...', f'scancel {slurm_job_id}', terminate_on_failure=False)
# def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
# print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!')
# return self.submit_serial_hpc_job(name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory, time, block, shell_wrapper)
def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
arguments = arguments.format(process='%a') # %a is SLURM array index
time = int( math.ceil(time*60) )
if shell_wrapper:
shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh')
with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
executable, arguments = shell_wrapper_sh, ''
slurm_file = working_dir + f'/.hpc.{name}.slurm'
with open(slurm_file, 'w') as f: f.write( _T_slurm_array_job_template_.format( **vars() ) )
slurm_job_id = self.head_node_execute('Submitting SLURM array job...', f'cd {self.working_dir} && sbatch {slurm_file}',
tracer=self.tracer, return_='output'
).split()[-1] # expecting something like `Submitted batch job 6122` in output
self.jobs.append(slurm_job_id)
if block:
self.wait_until_complete( [slurm_job_id] )
return None
else: return slurm_job_id
def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, ntasks, memory=512, time=12, block=True, shell_wrapper=False):
''' submit jobs as MPI job
'''
arguments = arguments.format(process='0')
time = int( math.ceil(time*60) )
if shell_wrapper:
shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh')
with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
executable, arguments = shell_wrapper_sh, ''
slurm_file = working_dir + f'/.hpc.{name}.slurm'
with open(slurm_file, 'w') as f: f.write( _T_slurm_mpi_job_template_.format( **vars() ) )
slurm_job_id = self.head_node_execute('Submitting SLURM mpi job...', f'cd {self.working_dir} && sbatch {slurm_file}',
tracer=self.tracer, return_='output'
).split()[-1] # expecting something like `Submitted batch job 6122` in output
self.jobs.append(slurm_job_id)
if block:
self.wait_until_complete( [slurm_job_id] )
return None
else: return slurm_job_id
|