Spaces:
Runtime error
Runtime error
File size: 8,787 Bytes
6b89792 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# -*- coding: utf-8 -*-
# :noTabs=true:
import os, sys, subprocess, stat
import time as time_module
import signal as signal_module
class NT: # named tuple
def __init__(self, **entries): self.__dict__.update(entries)
def __repr__(self):
r = 'NT: |'
for i in dir(self):
if not i.startswith('__') and not isinstance(getattr(self, i), types.MethodType): r += '{} --> {}, '.format(i, getattr(self, i))
return r[:-2]+'|'
class HPC_Exception(Exception):
def __init__(self, value): self.value = value
def __str__(self): return self.value
def execute(message, command_line, return_='status', until_successes=False, terminate_on_failure=True, silent=False, silence_output=False, tracer=print):
if not silent: tracer(message); tracer(command_line); sys.stdout.flush();
while True:
p = subprocess.Popen(command_line, bufsize=0, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, errors = p.communicate()
output = output + errors
output = output.decode(encoding="utf-8", errors="replace")
exit_code = p.returncode
if exit_code and not (silent or silence_output): tracer(output); sys.stdout.flush();
if exit_code and until_successes: pass # Thats right - redability COUNT!
else: break
tracer( "Error while executing {}: {}\n".format(message, output) )
tracer("Sleeping 60s... then I will retry...")
sys.stdout.flush();
time.sleep(60)
if return_ == 'tuple': return(exit_code, output)
if exit_code and terminate_on_failure:
tracer("\nEncounter error while executing: " + command_line)
if return_==True: return True
else: print("\nEncounter error while executing: " + command_line + '\n' + output); sys.exit(1)
if return_ == 'output': return output
else: return False
def Sleep(time_, message, dict_={}):
''' Fancy sleep function '''
len_ = 0
for i in range(time_, 0, -1):
#print "Waiting for a new revision:%s... Sleeping...%d \r" % (sc.revision, i),
msg = message.format( **dict(dict_, time_left=i) )
print( msg, end='' )
len_ = max(len_, len(msg))
sys.stdout.flush()
time_module.sleep(1)
print( ' '*len_ + '\r', end='' ) # erazing sleep message
# Abstract class for HPC job submission
class HPC_Driver:
def __init__(self, working_dir, config, tracer=lambda x:None, set_daemon_message=lambda x:None):
self.working_dir = working_dir
self.config = config
self.cpu_usage = 0.0 # cummulative cpu usage in hours
self.tracer = tracer
self.set_daemon_message = set_daemon_message
self.cpu_count = self.config['cpu_count'] if type(config) == dict else self.config.getint('DEFAULT', 'cpu_count')
self.jobs = [] # list of all jobs currently running by this driver, Job class is driver depended, could be just int or something more complex
self.install_signal_handler()
def __del__(self):
self.remove_signal_handler()
def execute(self, executable, arguments, working_dir, log_dir=None, name='_no_name_', memory=256, time=24, shell_wrapper=False, block=True):
''' Execute given command line on HPC cluster, must accumulate cpu hours in self.cpu_usage '''
if log_dir==None: log_dir=self.working_dir
if shell_wrapper:
shell_wrapper_sh = os.path.abspath(self.working_dir + '/hpc.{}.shell_wrapper.sh'.format(name))
with file(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
executable, arguments = shell_wrapper_sh, ''
return self.submit_serial_hpc_job(name=name, executable=executable, arguments=arguments, working_dir=working_dir, log_dir=log_dir, jobs_to_queue=1, memory=memory, time=time, block=block, shell_wrapper=shell_wrapper)
@property
def number_of_cpu_per_node(self):
must_be_implemented_in_inherited_classes
@property
def maximum_number_of_mpi_cpu(self):
must_be_implemented_in_inherited_classes
def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!')
must_be_implemented_in_inherited_classes
def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
must_be_implemented_in_inherited_classes
def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, memory=512, time=12, block=True, process_coefficient="1", requested_nodes=1, requested_processes_per_node=1):
''' submit jobs as MPI job
process_coefficient should be string representing fraction of process to launch on each node, for example '3 / 4' will start only 75% of MPI process's on each node
'''
must_be_implemented_in_inherited_classes
def cancel_all_jobs(self):
''' Cancel all HPC jobs known to this driver, use this as signal handler for script termination '''
for j in self.jobs: self.cancel_job(j)
def block_until(self, silent, fn, *args, **kwargs):
'''
**fn must have the driver as the first argument**
example:
def fn(driver):
jobs = list(driver.jobs)
jobs = [job for job in jobs if not driver.complete(job)]
if len(jobs) <= 8:
return False # stops sleeping
return True # continues sleeping
for x in range(100):
hpc_driver.submit_hpc_job(...)
hpc_driver.block_until(False, fn)
'''
while fn(self, *args, **kwargs):
sys.stdout.flush()
time_module.sleep(60)
if not silent:
Sleep(1, '"Waiting for HPC job(s) to finish, sleeping {time_left}s\r')
def wait_until_complete(self, jobs=None, callback=None, silent=False):
''' Helper function, wait until given jobs list is finished, if no argument is given waits until all jobs known by driver is finished '''
jobs = jobs if jobs else self.jobs
while jobs:
for j in jobs[:]:
if self.complete(j): jobs.remove(j)
if jobs:
#total_cpu_queued = sum( [j.jobs_queued for j in jobs] )
#total_cpu_running = sum( [j.cpu_running for j in jobs] )
#self.set_daemon_message("Waiting for HPC job(s) to finish... [{} process(es) in queue, {} process(es) running]".format(total_cpu_queued, total_cpu_running) )
#self.tracer("Waiting for HPC job(s) [{} process(es) in queue, {} process(es) running]... \r".format(total_cpu_queued, total_cpu_running), end='')
#print "Waiting for {} HPC jobs to finish... [{} jobs in queue, {} jobs running]... Sleeping 32s... \r".format(total_cpu_queued, cpu_queued+cpu_running, cpu_running),
self.set_daemon_message("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) )
#self.tracer("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) )
sys.stdout.flush()
if callback: callback()
if silent: time_module.sleep(64*1)
else: Sleep(64, '"Waiting for HPC {n_jobs} job(s) to finish, sleeping {time_left}s \r', dict(n_jobs=len(jobs)))
_signals_ = [signal_module.SIGINT, signal_module.SIGTERM, signal_module.SIGABRT]
def install_signal_handler(self):
def signal_handler(signal_, frame):
self.tracer('Recieved signal:{}... Canceling HPC jobs...'.format(signal_) )
self.cancel_all_jobs()
self.set_daemon_message( 'Remote daemon got terminated with signal:{}'.format(signal_) )
sys.exit(1)
for s in self._signals_: signal_module.signal(s, signal_handler)
def remove_signal_handler(self): # do we really need this???
try:
for s in self._signals_: signal_module.signal(s, signal_module.SIG_DFL)
#print('remove_signal_handler: done!')
except TypeError:
#print('remove_signal_handler: interpreted terminating, skipping remove_signal_handler...')
pass
def cancel_job(self, job_id):
must_be_implemented_in_inherited_classes
def complete(self, job_id):
''' Return job completion status. Return True if job complered and False otherwise
'''
must_be_implemented_in_inherited_classes
|