Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # :noTabs=true: | |
| import os, sys, subprocess, stat | |
| import time as time_module | |
| import signal as signal_module | |
| class NT: # named tuple | |
| def __init__(self, **entries): self.__dict__.update(entries) | |
| def __repr__(self): | |
| r = 'NT: |' | |
| for i in dir(self): | |
| if not i.startswith('__') and not isinstance(getattr(self, i), types.MethodType): r += '{} --> {}, '.format(i, getattr(self, i)) | |
| return r[:-2]+'|' | |
| class HPC_Exception(Exception): | |
| def __init__(self, value): self.value = value | |
| def __str__(self): return self.value | |
| def execute(message, command_line, return_='status', until_successes=False, terminate_on_failure=True, silent=False, silence_output=False, tracer=print): | |
| if not silent: tracer(message); tracer(command_line); sys.stdout.flush(); | |
| while True: | |
| p = subprocess.Popen(command_line, bufsize=0, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| output, errors = p.communicate() | |
| output = output + errors | |
| output = output.decode(encoding="utf-8", errors="replace") | |
| exit_code = p.returncode | |
| if exit_code and not (silent or silence_output): tracer(output); sys.stdout.flush(); | |
| if exit_code and until_successes: pass # Thats right - redability COUNT! | |
| else: break | |
| tracer( "Error while executing {}: {}\n".format(message, output) ) | |
| tracer("Sleeping 60s... then I will retry...") | |
| sys.stdout.flush(); | |
| time.sleep(60) | |
| if return_ == 'tuple': return(exit_code, output) | |
| if exit_code and terminate_on_failure: | |
| tracer("\nEncounter error while executing: " + command_line) | |
| if return_==True: return True | |
| else: print("\nEncounter error while executing: " + command_line + '\n' + output); sys.exit(1) | |
| if return_ == 'output': return output | |
| else: return False | |
| def Sleep(time_, message, dict_={}): | |
| ''' Fancy sleep function ''' | |
| len_ = 0 | |
| for i in range(time_, 0, -1): | |
| #print "Waiting for a new revision:%s... Sleeping...%d \r" % (sc.revision, i), | |
| msg = message.format( **dict(dict_, time_left=i) ) | |
| print( msg, end='' ) | |
| len_ = max(len_, len(msg)) | |
| sys.stdout.flush() | |
| time_module.sleep(1) | |
| print( ' '*len_ + '\r', end='' ) # erazing sleep message | |
| # Abstract class for HPC job submission | |
| class HPC_Driver: | |
| def __init__(self, working_dir, config, tracer=lambda x:None, set_daemon_message=lambda x:None): | |
| self.working_dir = working_dir | |
| self.config = config | |
| self.cpu_usage = 0.0 # cummulative cpu usage in hours | |
| self.tracer = tracer | |
| self.set_daemon_message = set_daemon_message | |
| self.cpu_count = self.config['cpu_count'] if type(config) == dict else self.config.getint('DEFAULT', 'cpu_count') | |
| self.jobs = [] # list of all jobs currently running by this driver, Job class is driver depended, could be just int or something more complex | |
| self.install_signal_handler() | |
| def __del__(self): | |
| self.remove_signal_handler() | |
| def execute(self, executable, arguments, working_dir, log_dir=None, name='_no_name_', memory=256, time=24, shell_wrapper=False, block=True): | |
| ''' Execute given command line on HPC cluster, must accumulate cpu hours in self.cpu_usage ''' | |
| if log_dir==None: log_dir=self.working_dir | |
| if shell_wrapper: | |
| shell_wrapper_sh = os.path.abspath(self.working_dir + '/hpc.{}.shell_wrapper.sh'.format(name)) | |
| with file(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE) | |
| executable, arguments = shell_wrapper_sh, '' | |
| return self.submit_serial_hpc_job(name=name, executable=executable, arguments=arguments, working_dir=working_dir, log_dir=log_dir, jobs_to_queue=1, memory=memory, time=time, block=block, shell_wrapper=shell_wrapper) | |
| def number_of_cpu_per_node(self): | |
| must_be_implemented_in_inherited_classes | |
| def maximum_number_of_mpi_cpu(self): | |
| must_be_implemented_in_inherited_classes | |
| def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False): | |
| print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!') | |
| must_be_implemented_in_inherited_classes | |
| def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False): | |
| must_be_implemented_in_inherited_classes | |
| def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, memory=512, time=12, block=True, process_coefficient="1", requested_nodes=1, requested_processes_per_node=1): | |
| ''' submit jobs as MPI job | |
| process_coefficient should be string representing fraction of process to launch on each node, for example '3 / 4' will start only 75% of MPI process's on each node | |
| ''' | |
| must_be_implemented_in_inherited_classes | |
| def cancel_all_jobs(self): | |
| ''' Cancel all HPC jobs known to this driver, use this as signal handler for script termination ''' | |
| for j in self.jobs: self.cancel_job(j) | |
| def block_until(self, silent, fn, *args, **kwargs): | |
| ''' | |
| **fn must have the driver as the first argument** | |
| example: | |
| def fn(driver): | |
| jobs = list(driver.jobs) | |
| jobs = [job for job in jobs if not driver.complete(job)] | |
| if len(jobs) <= 8: | |
| return False # stops sleeping | |
| return True # continues sleeping | |
| for x in range(100): | |
| hpc_driver.submit_hpc_job(...) | |
| hpc_driver.block_until(False, fn) | |
| ''' | |
| while fn(self, *args, **kwargs): | |
| sys.stdout.flush() | |
| time_module.sleep(60) | |
| if not silent: | |
| Sleep(1, '"Waiting for HPC job(s) to finish, sleeping {time_left}s\r') | |
| def wait_until_complete(self, jobs=None, callback=None, silent=False): | |
| ''' Helper function, wait until given jobs list is finished, if no argument is given waits until all jobs known by driver is finished ''' | |
| jobs = jobs if jobs else self.jobs | |
| while jobs: | |
| for j in jobs[:]: | |
| if self.complete(j): jobs.remove(j) | |
| if jobs: | |
| #total_cpu_queued = sum( [j.jobs_queued for j in jobs] ) | |
| #total_cpu_running = sum( [j.cpu_running for j in jobs] ) | |
| #self.set_daemon_message("Waiting for HPC job(s) to finish... [{} process(es) in queue, {} process(es) running]".format(total_cpu_queued, total_cpu_running) ) | |
| #self.tracer("Waiting for HPC job(s) [{} process(es) in queue, {} process(es) running]... \r".format(total_cpu_queued, total_cpu_running), end='') | |
| #print "Waiting for {} HPC jobs to finish... [{} jobs in queue, {} jobs running]... Sleeping 32s... \r".format(total_cpu_queued, cpu_queued+cpu_running, cpu_running), | |
| self.set_daemon_message("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) ) | |
| #self.tracer("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) ) | |
| sys.stdout.flush() | |
| if callback: callback() | |
| if silent: time_module.sleep(64*1) | |
| else: Sleep(64, '"Waiting for HPC {n_jobs} job(s) to finish, sleeping {time_left}s \r', dict(n_jobs=len(jobs))) | |
| _signals_ = [signal_module.SIGINT, signal_module.SIGTERM, signal_module.SIGABRT] | |
| def install_signal_handler(self): | |
| def signal_handler(signal_, frame): | |
| self.tracer('Recieved signal:{}... Canceling HPC jobs...'.format(signal_) ) | |
| self.cancel_all_jobs() | |
| self.set_daemon_message( 'Remote daemon got terminated with signal:{}'.format(signal_) ) | |
| sys.exit(1) | |
| for s in self._signals_: signal_module.signal(s, signal_handler) | |
| def remove_signal_handler(self): # do we really need this??? | |
| try: | |
| for s in self._signals_: signal_module.signal(s, signal_module.SIG_DFL) | |
| #print('remove_signal_handler: done!') | |
| except TypeError: | |
| #print('remove_signal_handler: interpreted terminating, skipping remove_signal_handler...') | |
| pass | |
| def cancel_job(self, job_id): | |
| must_be_implemented_in_inherited_classes | |
| def complete(self, job_id): | |
| ''' Return job completion status. Return True if job complered and False otherwise | |
| ''' | |
| must_be_implemented_in_inherited_classes | |