Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
# :noTabs=true: | |
import os, sys, subprocess, stat | |
import time as time_module | |
import signal as signal_module | |
class NT: # named tuple | |
def __init__(self, **entries): self.__dict__.update(entries) | |
def __repr__(self): | |
r = 'NT: |' | |
for i in dir(self): | |
if not i.startswith('__') and not isinstance(getattr(self, i), types.MethodType): r += '{} --> {}, '.format(i, getattr(self, i)) | |
return r[:-2]+'|' | |
class HPC_Exception(Exception): | |
def __init__(self, value): self.value = value | |
def __str__(self): return self.value | |
def execute(message, command_line, return_='status', until_successes=False, terminate_on_failure=True, silent=False, silence_output=False, tracer=print): | |
if not silent: tracer(message); tracer(command_line); sys.stdout.flush(); | |
while True: | |
p = subprocess.Popen(command_line, bufsize=0, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
output, errors = p.communicate() | |
output = output + errors | |
output = output.decode(encoding="utf-8", errors="replace") | |
exit_code = p.returncode | |
if exit_code and not (silent or silence_output): tracer(output); sys.stdout.flush(); | |
if exit_code and until_successes: pass # Thats right - redability COUNT! | |
else: break | |
tracer( "Error while executing {}: {}\n".format(message, output) ) | |
tracer("Sleeping 60s... then I will retry...") | |
sys.stdout.flush(); | |
time.sleep(60) | |
if return_ == 'tuple': return(exit_code, output) | |
if exit_code and terminate_on_failure: | |
tracer("\nEncounter error while executing: " + command_line) | |
if return_==True: return True | |
else: print("\nEncounter error while executing: " + command_line + '\n' + output); sys.exit(1) | |
if return_ == 'output': return output | |
else: return False | |
def Sleep(time_, message, dict_={}): | |
''' Fancy sleep function ''' | |
len_ = 0 | |
for i in range(time_, 0, -1): | |
#print "Waiting for a new revision:%s... Sleeping...%d \r" % (sc.revision, i), | |
msg = message.format( **dict(dict_, time_left=i) ) | |
print( msg, end='' ) | |
len_ = max(len_, len(msg)) | |
sys.stdout.flush() | |
time_module.sleep(1) | |
print( ' '*len_ + '\r', end='' ) # erazing sleep message | |
# Abstract class for HPC job submission | |
class HPC_Driver: | |
def __init__(self, working_dir, config, tracer=lambda x:None, set_daemon_message=lambda x:None): | |
self.working_dir = working_dir | |
self.config = config | |
self.cpu_usage = 0.0 # cummulative cpu usage in hours | |
self.tracer = tracer | |
self.set_daemon_message = set_daemon_message | |
self.cpu_count = self.config['cpu_count'] if type(config) == dict else self.config.getint('DEFAULT', 'cpu_count') | |
self.jobs = [] # list of all jobs currently running by this driver, Job class is driver depended, could be just int or something more complex | |
self.install_signal_handler() | |
def __del__(self): | |
self.remove_signal_handler() | |
def execute(self, executable, arguments, working_dir, log_dir=None, name='_no_name_', memory=256, time=24, shell_wrapper=False, block=True): | |
''' Execute given command line on HPC cluster, must accumulate cpu hours in self.cpu_usage ''' | |
if log_dir==None: log_dir=self.working_dir | |
if shell_wrapper: | |
shell_wrapper_sh = os.path.abspath(self.working_dir + '/hpc.{}.shell_wrapper.sh'.format(name)) | |
with file(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE) | |
executable, arguments = shell_wrapper_sh, '' | |
return self.submit_serial_hpc_job(name=name, executable=executable, arguments=arguments, working_dir=working_dir, log_dir=log_dir, jobs_to_queue=1, memory=memory, time=time, block=block, shell_wrapper=shell_wrapper) | |
def number_of_cpu_per_node(self): | |
must_be_implemented_in_inherited_classes | |
def maximum_number_of_mpi_cpu(self): | |
must_be_implemented_in_inherited_classes | |
def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False): | |
print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!') | |
must_be_implemented_in_inherited_classes | |
def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False): | |
must_be_implemented_in_inherited_classes | |
def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, memory=512, time=12, block=True, process_coefficient="1", requested_nodes=1, requested_processes_per_node=1): | |
''' submit jobs as MPI job | |
process_coefficient should be string representing fraction of process to launch on each node, for example '3 / 4' will start only 75% of MPI process's on each node | |
''' | |
must_be_implemented_in_inherited_classes | |
def cancel_all_jobs(self): | |
''' Cancel all HPC jobs known to this driver, use this as signal handler for script termination ''' | |
for j in self.jobs: self.cancel_job(j) | |
def block_until(self, silent, fn, *args, **kwargs): | |
''' | |
**fn must have the driver as the first argument** | |
example: | |
def fn(driver): | |
jobs = list(driver.jobs) | |
jobs = [job for job in jobs if not driver.complete(job)] | |
if len(jobs) <= 8: | |
return False # stops sleeping | |
return True # continues sleeping | |
for x in range(100): | |
hpc_driver.submit_hpc_job(...) | |
hpc_driver.block_until(False, fn) | |
''' | |
while fn(self, *args, **kwargs): | |
sys.stdout.flush() | |
time_module.sleep(60) | |
if not silent: | |
Sleep(1, '"Waiting for HPC job(s) to finish, sleeping {time_left}s\r') | |
def wait_until_complete(self, jobs=None, callback=None, silent=False): | |
''' Helper function, wait until given jobs list is finished, if no argument is given waits until all jobs known by driver is finished ''' | |
jobs = jobs if jobs else self.jobs | |
while jobs: | |
for j in jobs[:]: | |
if self.complete(j): jobs.remove(j) | |
if jobs: | |
#total_cpu_queued = sum( [j.jobs_queued for j in jobs] ) | |
#total_cpu_running = sum( [j.cpu_running for j in jobs] ) | |
#self.set_daemon_message("Waiting for HPC job(s) to finish... [{} process(es) in queue, {} process(es) running]".format(total_cpu_queued, total_cpu_running) ) | |
#self.tracer("Waiting for HPC job(s) [{} process(es) in queue, {} process(es) running]... \r".format(total_cpu_queued, total_cpu_running), end='') | |
#print "Waiting for {} HPC jobs to finish... [{} jobs in queue, {} jobs running]... Sleeping 32s... \r".format(total_cpu_queued, cpu_queued+cpu_running, cpu_running), | |
self.set_daemon_message("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) ) | |
#self.tracer("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) ) | |
sys.stdout.flush() | |
if callback: callback() | |
if silent: time_module.sleep(64*1) | |
else: Sleep(64, '"Waiting for HPC {n_jobs} job(s) to finish, sleeping {time_left}s \r', dict(n_jobs=len(jobs))) | |
_signals_ = [signal_module.SIGINT, signal_module.SIGTERM, signal_module.SIGABRT] | |
def install_signal_handler(self): | |
def signal_handler(signal_, frame): | |
self.tracer('Recieved signal:{}... Canceling HPC jobs...'.format(signal_) ) | |
self.cancel_all_jobs() | |
self.set_daemon_message( 'Remote daemon got terminated with signal:{}'.format(signal_) ) | |
sys.exit(1) | |
for s in self._signals_: signal_module.signal(s, signal_handler) | |
def remove_signal_handler(self): # do we really need this??? | |
try: | |
for s in self._signals_: signal_module.signal(s, signal_module.SIG_DFL) | |
#print('remove_signal_handler: done!') | |
except TypeError: | |
#print('remove_signal_handler: interpreted terminating, skipping remove_signal_handler...') | |
pass | |
def cancel_job(self, job_id): | |
must_be_implemented_in_inherited_classes | |
def complete(self, job_id): | |
''' Return job completion status. Return True if job complered and False otherwise | |
''' | |
must_be_implemented_in_inherited_classes | |