Source code for dpgen.remote.RemoteJob

#!/usr/bin/env python
# coding: utf-8

import os, sys, paramiko, json, uuid, tarfile, time, stat, shutil
from glob import glob
from enum import Enum
from dpgen import dlog


[docs]class JobStatus (Enum) : unsubmitted = 1 waiting = 2 running = 3 terminated = 4 finished = 5 unknown = 100
[docs]class awsMachineJob(object): def __init__ (self, remote_root, work_path, job_uuid=None, ) : self.remote_root=os.path.join(remote_root,work_path) self.local_root = os.path.abspath(work_path) if job_uuid: self.job_uuid=job_uuid else: self.job_uuid = str(uuid.uuid4()) dlog.info("local_root is %s"% self.local_root) dlog.info("remote_root is %s"% self.remote_root)
[docs] def upload(self, job_dir, local_up_files, dereference = True) : cwd = os.getcwd() print('cwd=',cwd) os.chdir(self.local_root) for ii in local_up_files : print('self.local_root=',self.local_root,'remote_root=',self.remote_root,'job_dir=',job_dir,'ii=',ii) if os.path.isfile(os.path.join(job_dir,ii)): if not os.path.exists(os.path.join(self.remote_root,job_dir)): os.makedirs(os.path.join(self.remote_root,job_dir)) shutil.copyfile(os.path.join(job_dir,ii),os.path.join(self.remote_root,job_dir,ii)) elif os.path.isdir(os.path.join(job_dir,ii)): shutil.copytree(os.path.join(job_dir,ii),os.path.join(self.remote_root,job_dir,ii)) else: print('unknownfile','local_root=',self.local_root,'job_dir=',job_dir,'filename=',ii) os.chdir(cwd)
[docs] def download(self, job_dir, remote_down_files, dereference = True) : for ii in remote_down_files: # print('self.local_root=',self.local_root,'remote_root=',self.remote_root,'job_dir=',job_dir,'ii=',ii) file_succ_copy_flag=False while not file_succ_copy_flag: if os.path.isfile(os.path.join(self.remote_root,job_dir,ii)): shutil.copyfile(os.path.join(self.remote_root,job_dir,ii),os.path.join(self.local_root,job_dir,ii)) file_succ_copy_flag=True elif os.path.isdir(os.path.join(self.remote_root,job_dir,ii)): try: os.rmdir(os.path.join(self.local_root,job_dir,ii)) except Exception: print('dir is not empty '+str(os.path.join(self.local_root,job_dir,ii))) else: shutil.copytree(os.path.join(self.remote_root,job_dir,ii),os.path.join(self.local_root,job_dir,ii)) file_succ_copy_flag=True else: print('unknownfile,maybe need for waiting for a while','local_root=',self.local_root,'job_dir=',job_dir,'filename=',ii) time.sleep(5)
def _default_item(resources, key, value) : if key not in resources : resources[key] = value def _set_default_resource(res) : if res == None : res = {} _default_item(res, 'numb_node', 1) _default_item(res, 'task_per_node', 1) _default_item(res, 'numb_gpu', 0) _default_item(res, 'time_limit', '1:0:0') _default_item(res, 'mem_limit', -1) _default_item(res, 'partition', '') _default_item(res, 'account', '') _default_item(res, 'qos', '') _default_item(res, 'constraint_list', []) _default_item(res, 'license_list', []) _default_item(res, 'exclude_list', []) _default_item(res, 'module_unload_list', []) _default_item(res, 'module_list', []) _default_item(res, 'source_list', []) _default_item(res, 'envs', None) _default_item(res, 'with_mpi', False)
[docs]class SSHSession (object) : def __init__ (self, jdata) : self.remote_profile = jdata # with open(remote_profile) as fp : # self.remote_profile = json.load(fp) self.remote_host = self.remote_profile['hostname'] self.remote_port = self.remote_profile['port'] self.remote_uname = self.remote_profile['username'] self.remote_password = None if 'password' in self.remote_profile : self.remote_password = self.remote_profile['password'] self.local_key_filename = None if 'key_filename' in self.remote_profile: self.local_key_filename = self.remote_profile['key_filename'] self.remote_timeout = None if 'timeout' in self.remote_profile: self.remote_timeout = self.remote_profile['timeout'] self.local_key_passphrase = None if 'passphrase' in self.remote_profile: self.local_key_passphrase = self.remote_profile['passphrase'] self.remote_workpath = self.remote_profile['work_path'] self.ssh = self._setup_ssh(hostname=self.remote_host, port=self.remote_port, username=self.remote_uname, password=self.remote_password, key_filename=self.local_key_filename, timeout=self.remote_timeout, passphrase=self.local_key_passphrase) def _setup_ssh(self, hostname, port=22, username=None, password=None, key_filename=None, timeout=None, passphrase=None ): ssh_client = paramiko.SSHClient() ssh_client.load_system_host_keys() ssh_client.set_missing_host_key_policy(paramiko.WarningPolicy) ssh_client.connect(hostname, port, username, password, key_filename, timeout, passphrase) assert(ssh_client.get_transport().is_active()) return ssh_client
[docs] def get_ssh_client(self) : return self.ssh
[docs] def get_session_root(self) : return self.remote_workpath
[docs] def close(self) : self.ssh.close()
[docs]class RemoteJob (object): def __init__ (self, ssh_session, local_root, job_uuid=None, ) : self.local_root = os.path.abspath(local_root) if job_uuid: self.job_uuid=job_uuid else: self.job_uuid = str(uuid.uuid4()) self.remote_root = os.path.join(ssh_session.get_session_root(), self.job_uuid) dlog.info("local_root is %s"% local_root) dlog.info("remote_root is %s"% self.remote_root) self.ssh = ssh_session.get_ssh_client() # keep ssh alive transport = self.ssh.get_transport() transport.set_keepalive(60) try: sftp = self.ssh.open_sftp() sftp.mkdir(self.remote_root) sftp.close() except Exception: pass # open('job_uuid', 'w').write(self.job_uuid)
[docs] def get_job_root(self) : return self.remote_root
[docs] def upload(self, job_dirs, local_up_files, dereference = True) : cwd = os.getcwd() os.chdir(self.local_root) file_list = [] for ii in job_dirs : for jj in local_up_files : file_list.append(os.path.join(ii,jj)) self._put_files(file_list, dereference = dereference) os.chdir(cwd)
[docs] def download(self, job_dirs, remote_down_files, back_error=False) : cwd = os.getcwd() os.chdir(self.local_root) file_list = [] for ii in job_dirs : for jj in remote_down_files : file_list.append(os.path.join(ii,jj)) if back_error: errors=glob(os.path.join(ii,'error*')) file_list.extend(errors) self._get_files(file_list) os.chdir(cwd)
[docs] def block_checkcall(self, cmd) : stdin, stdout, stderr = self.ssh.exec_command(('cd %s ;' % self.remote_root) + cmd) exit_status = stdout.channel.recv_exit_status() if exit_status != 0: dlog.info("Error info: %s "%(stderr.readlines()[0])) raise RuntimeError("Get error code %d in calling %s through ssh with job: %s "% (exit_status, cmd, self.job_uuid)) return stdin, stdout, stderr
[docs] def block_call(self, cmd) : stdin, stdout, stderr = self.ssh.exec_command(('cd %s ;' % self.remote_root) + cmd) exit_status = stdout.channel.recv_exit_status() return exit_status, stdin, stdout, stderr
[docs] def clean(self) : sftp = self.ssh.open_sftp() self._rmtree(sftp, self.remote_root) sftp.close()
def _rmtree(self, sftp, remotepath, level=0, verbose = False): for f in sftp.listdir_attr(remotepath): rpath = os.path.join(remotepath, f.filename) if stat.S_ISDIR(f.st_mode): self._rmtree(sftp, rpath, level=(level + 1)) else: rpath = os.path.join(remotepath, f.filename) if verbose: dlog.info('removing %s%s' % (' ' * level, rpath)) sftp.remove(rpath) if verbose: dlog.info('removing %s%s' % (' ' * level, remotepath)) sftp.rmdir(remotepath) def _put_files(self, files, dereference = True) : of = self.job_uuid + '.tgz' # local tar cwd = os.getcwd() os.chdir(self.local_root) if os.path.isfile(of) : os.remove(of) with tarfile.open(of, "w:gz", dereference = dereference) as tar: for ii in files : tar.add(ii) os.chdir(cwd) # trans from_f = os.path.join(self.local_root, of) to_f = os.path.join(self.remote_root, of) sftp = self.ssh.open_sftp() sftp.put(from_f, to_f) # remote extract self.block_checkcall('tar xf %s' % of) # clean up os.remove(from_f) sftp.remove(to_f) sftp.close() def _get_files(self, files) : of = self.job_uuid + '.tgz' flist = "" for ii in files : flist += " " + ii # remote tar self.block_checkcall('tar czf %s %s' % (of, flist)) # trans from_f = os.path.join(self.remote_root, of) to_f = os.path.join(self.local_root, of) if os.path.isfile(to_f) : os.remove(to_f) sftp = self.ssh.open_sftp() sftp.get(from_f, to_f) # extract cwd = os.getcwd() os.chdir(self.local_root) with tarfile.open(of, "r:gz") as tar: tar.extractall() os.chdir(cwd) # cleanup os.remove(to_f) sftp.remove(from_f)
[docs]class CloudMachineJob (RemoteJob) :
[docs] def submit(self, job_dirs, cmd, args = None, resources = None) : #dlog.info("Current path is",os.getcwd()) #for ii in job_dirs : # if not os.path.isdir(ii) : # raise RuntimeError("cannot find dir %s" % ii) # dlog.info(self.remote_root) script_name = self._make_script(job_dirs, cmd, args, resources) self.stdin, self.stdout, self.stderr = self.ssh.exec_command(('cd %s; bash %s' % (self.remote_root, script_name)))
# dlog.info(self.stderr.read().decode('utf-8')) # dlog.info(self.stdout.read().decode('utf-8'))
[docs] def check_status(self) : if not self._check_finish(self.stdout) : return JobStatus.running elif self._get_exit_status(self.stdout) == 0 : return JobStatus.finished else : return JobStatus.terminated
def _check_finish(self, stdout) : return stdout.channel.exit_status_ready() def _get_exit_status(self, stdout) : return stdout.channel.recv_exit_status() def _make_script(self, job_dirs, cmd, args = None, resources = None) : _set_default_resource(resources) envs = resources['envs'] module_list = resources['module_list'] module_unload_list = resources['module_unload_list'] task_per_node = resources['task_per_node'] script_name = 'run.sh' if args == None : args = [] for ii in job_dirs: args.append('') script = os.path.join(self.remote_root, script_name) sftp = self.ssh.open_sftp() with sftp.open(script, 'w') as fp : fp.write('#!/bin/bash\n\n') # fp.write('set -euo pipefail\n') if envs != None : for key in envs.keys() : fp.write('export %s=%s\n' % (key, envs[key])) fp.write('\n') if module_unload_list is not None : for ii in module_unload_list : fp.write('module unload %s\n' % ii) fp.write('\n') if module_list is not None : for ii in module_list : fp.write('module load %s\n' % ii) fp.write('\n') for ii,jj in zip(job_dirs, args) : fp.write('cd %s\n' % ii) fp.write('test $? -ne 0 && exit\n') if resources['with_mpi'] == True : fp.write('mpirun -n %d %s %s\n' % (task_per_node, cmd, jj)) else : fp.write('%s %s\n' % (cmd, jj)) if 'allow_failure' not in resources or resources['allow_failure'] is False: fp.write('test $? -ne 0 && exit\n') fp.write('cd %s\n' % self.remote_root) fp.write('test $? -ne 0 && exit\n') fp.write('\ntouch tag_finished\n') sftp.close() return script_name
[docs]class SlurmJob (RemoteJob) :
[docs] def submit(self, job_dirs, cmd, args = None, resources = None, restart=False) : def _submit(): script_name = self._make_script(job_dirs, cmd, args, res = resources) stdin, stdout, stderr = self.block_checkcall(('cd %s; sbatch %s' % (self.remote_root, script_name))) subret = (stdout.readlines()) job_id = subret[0].split()[-1] sftp = self.ssh.open_sftp() with sftp.open(os.path.join(self.remote_root, 'job_id'), 'w') as fp: fp.write(job_id) sftp.close() dlog.debug(restart) if restart: try: status = self.check_status() dlog.debug(status) if status in [ JobStatus.unsubmitted, JobStatus.unknown, JobStatus.terminated ]: dlog.debug('task restart point !!!') _submit() elif status==JobStatus.waiting: dlog.debug('task is waiting') elif status==JobStatus.running: dlog.debug('task is running') else: dlog.debug('task is finished') except Exception: dlog.debug('no job_id file') dlog.debug('task restart point !!!') _submit() else: dlog.debug('new task!!!') _submit()
[docs] def check_status(self) : job_id = self._get_job_id() if job_id == "" : raise RuntimeError("job %s has not been submitted" % self.remote_root) ret, stdin, stdout, stderr\ = self.block_call ("squeue --job " + job_id) err_str = stderr.read().decode('utf-8') if (ret != 0) : if str("Invalid job id specified") in err_str : if self._check_finish_tag() : return JobStatus.finished else : return JobStatus.terminated else : raise RuntimeError\ ("status command squeue fails to execute\nerror message:%s\nreturn code %d\n" % (err_str, ret)) status_line = stdout.read().decode('utf-8').split ('\n')[-2] status_word = status_line.split ()[-4] if status_word in ["PD","CF","S"] : return JobStatus.waiting elif status_word in ["R","CG"] : return JobStatus.running elif status_word in ["C","E","K","BF","CA","CD","F","NF","PR","SE","ST","TO"] : if self._check_finish_tag() : return JobStatus.finished else : return JobStatus.terminated else : return JobStatus.unknown
def _get_job_id(self) : sftp = self.ssh.open_sftp() with sftp.open(os.path.join(self.remote_root, 'job_id'), 'r') as fp: ret = fp.read().decode('utf-8') sftp.close() return ret def _check_finish_tag(self) : sftp = self.ssh.open_sftp() try: sftp.stat(os.path.join(self.remote_root, 'tag_finished')) ret = True except IOError: ret = False sftp.close() return ret def _make_squeue(self,mdata1, res): ret = '' ret += 'squeue -u %s ' % mdata1['username'] ret += '-p %s ' % res['partition'] ret += '| grep PD' return ret def _make_script(self, job_dirs, cmd, args = None, res = None) : _set_default_resource(res) ret = '' ret += "#!/bin/bash -l\n" ret += "#SBATCH -N %d\n" % res['numb_node'] ret += "#SBATCH --ntasks-per-node %d\n" % res['task_per_node'] ret += "#SBATCH -t %s\n" % res['time_limit'] if res['mem_limit'] > 0 : ret += "#SBATCH --mem %dG \n" % res['mem_limit'] if len(res['account']) > 0 : ret += "#SBATCH --account %s \n" % res['account'] if len(res['partition']) > 0 : ret += "#SBATCH --partition %s \n" % res['partition'] if len(res['qos']) > 0 : ret += "#SBATCH --qos %s \n" % res['qos'] if res['numb_gpu'] > 0 : ret += "#SBATCH --gres=gpu:%d\n" % res['numb_gpu'] for ii in res['constraint_list'] : ret += '#SBATCH -C %s \n' % ii for ii in res['license_list'] : ret += '#SBATCH -L %s \n' % ii if len(res['exclude_list']) >0: temp_exclude = "" for ii in res['exclude_list'] : temp_exclude += ii temp_exclude += "," temp_exclude = temp_exclude[:-1] ret += '#SBATCH --exclude %s \n' % temp_exclude ret += "\n" # ret += 'set -euo pipefail\n\n' for ii in res['module_unload_list'] : ret += "module unload %s\n" % ii for ii in res['module_list'] : ret += "module load %s\n" % ii ret += "\n" for ii in res['source_list'] : ret += "source %s\n" %ii ret += "\n" envs = res['envs'] if envs != None : for key in envs.keys() : ret += 'export %s=%s\n' % (key, envs[key]) ret += '\n' if args == None : args = [] for ii in job_dirs: args.append('') try: cvasp=res['cvasp'] try: fp_max_errors = res['fp_max_errors'] except Exception: fp_max_errors = 3 except Exception: cvasp=False for ii,jj in zip(job_dirs, args) : ret += 'cd %s\n' % ii ret += 'test $? -ne 0 && exit\n\n' if cvasp: cmd=cmd.split('1>')[0].strip() if res['with_mpi'] : ret += 'if [ -f tag_finished ] ;then\n' ret += ' echo gogogo \n' ret += 'else\n' ret += ' python ../cvasp.py "srun %s" %s %s 1>log 2>log\n' % (cmd, fp_max_errors, jj) ret += ' if test $? -ne 0 \n' ret += ' then\n' ret += ' exit\n' ret += ' else\n' ret += ' touch tag_finished\n' ret += ' fi\n' ret += 'fi\n\n' else : ret += 'if [ -f tag_finished ] ;then\n' ret += ' echo gogogo \n' ret += 'else\n' ret += ' python ../cvasp.py "%s" %s %s 1>log 2>log\n' % (cmd, fp_max_errors, jj) ret += ' if test $? -ne 0 \n' ret += ' then\n' ret += ' exit\n' ret += ' else\n' ret += ' touch tag_finished\n' ret += ' fi\n' ret += 'fi\n\n' else: if res['with_mpi'] : ret += 'if [ -f tag_finished ] ;then\n' ret += ' echo gogogo \n' ret += 'else\n' ret += ' srun %s %s\n' % (cmd, jj) ret += ' if test $? -ne 0 \n' ret += ' then\n' ret += ' exit\n' ret += ' else\n' ret += ' touch tag_finished\n' ret += ' fi\n' ret += 'fi\n\n' else : ret += 'if [ -f tag_finished ] ;then\n' ret += ' echo gogogo \n' ret += 'else\n' ret += ' %s %s\n' % (cmd, jj) ret += ' if test $? -ne 0 \n' ret += ' then\n' ret += ' exit\n' ret += ' else\n' ret += ' touch tag_finished\n' ret += ' fi\n' ret += 'fi\n\n' if 'allow_failure' not in res or res['allow_failure'] is False: ret += 'test $? -ne 0 && exit\n' ret += 'cd %s\n' % self.remote_root ret += 'test $? -ne 0 && exit\n' ret += '\ntouch tag_finished\n' script_name = 'run.sub' script = os.path.join(self.remote_root, script_name) sftp = self.ssh.open_sftp() with sftp.open(script, 'w') as fp : fp.write(ret) sftp.close() return script_name
[docs]class PBSJob (RemoteJob) :
[docs] def submit(self, job_dirs, cmd, args = None, resources = None) : script_name = self._make_script(job_dirs, cmd, args, res = resources) stdin, stdout, stderr = self.block_checkcall(('cd %s; qsub %s' % (self.remote_root, script_name))) subret = (stdout.readlines()) job_id = subret[0].split()[0] sftp = self.ssh.open_sftp() with sftp.open(os.path.join(self.remote_root, 'job_id'), 'w') as fp: fp.write(job_id) sftp.close()
[docs] def check_status(self) : job_id = self._get_job_id() if job_id == "" : raise RuntimeError("job %s is has not been submitted" % self.remote_root) ret, stdin, stdout, stderr\ = self.block_call ("qstat " + job_id) err_str = stderr.read().decode('utf-8') if (ret != 0) : if str("qstat: Unknown Job Id") in err_str : if self._check_finish_tag() : return JobStatus.finished else : return JobStatus.terminated else : raise RuntimeError ("status command qstat fails to execute. erro info: %s return code %d" % (err_str, ret)) status_line = stdout.read().decode('utf-8').split ('\n')[-2] status_word = status_line.split ()[-2] # dlog.info (status_word) if status_word in ["Q","H"] : return JobStatus.waiting elif status_word in ["R"] : return JobStatus.running elif status_word in ["C","E","K"] : if self._check_finish_tag() : return JobStatus.finished else : return JobStatus.terminated else : return JobStatus.unknown
def _get_job_id(self) : sftp = self.ssh.open_sftp() with sftp.open(os.path.join(self.remote_root, 'job_id'), 'r') as fp: ret = fp.read().decode('utf-8') sftp.close() return ret def _check_finish_tag(self) : sftp = self.ssh.open_sftp() try: sftp.stat(os.path.join(self.remote_root, 'tag_finished')) ret = True except IOError: ret = False sftp.close() return ret def _make_script(self, job_dirs, cmd, args = None, res = None) : _set_default_resource(res) ret = '' ret += "#!/bin/bash -l\n" if res['numb_gpu'] == 0: ret += '#PBS -l nodes=%d:ppn=%d\n' % (res['numb_node'], res['task_per_node']) else : ret += '#PBS -l nodes=%d:ppn=%d:gpus=%d\n' % (res['numb_node'], res['task_per_node'], res['numb_gpu']) ret += '#PBS -l walltime=%s\n' % (res['time_limit']) if res['mem_limit'] > 0 : ret += "#PBS -l mem=%dG \n" % res['mem_limit'] ret += '#PBS -j oe\n' if len(res['partition']) > 0 : ret += '#PBS -q %s\n' % res['partition'] ret += "\n" for ii in res['module_unload_list'] : ret += "module unload %s\n" % ii for ii in res['module_list'] : ret += "module load %s\n" % ii ret += "\n" for ii in res['source_list'] : ret += "source %s\n" %ii ret += "\n" envs = res['envs'] if envs != None : for key in envs.keys() : ret += 'export %s=%s\n' % (key, envs[key]) ret += '\n' ret += 'cd $PBS_O_WORKDIR\n\n' if args == None : args = [] for ii in job_dirs: args.append('') for ii,jj in zip(job_dirs, args) : ret += 'cd %s\n' % ii ret += 'test $? -ne 0 && exit\n' if res['with_mpi'] : ret += 'mpirun -machinefile $PBS_NODEFILE -n %d %s %s\n' % (res['numb_node'] * res['task_per_node'], cmd, jj) else : ret += '%s %s\n' % (cmd, jj) if 'allow_failure' not in res or res['allow_failure'] is False: ret += 'test $? -ne 0 && exit\n' ret += 'cd %s\n' % self.remote_root ret += 'test $? -ne 0 && exit\n' ret += '\ntouch tag_finished\n' script_name = 'run.sub' script = os.path.join(self.remote_root, script_name) sftp = self.ssh.open_sftp() with sftp.open(script, 'w') as fp : fp.write(ret) sftp.close() return script_name
# ssh_session = SSHSession('localhost.json') # rjob = CloudMachineJob(ssh_session, '.') # # can upload dirs and normal files # rjob.upload(['job0', 'job1'], ['batch_exec.py', 'test']) # rjob.submit(['job0', 'job1'], 'touch a; sleep 2') # while rjob.check_status() == JobStatus.running : # dlog.info('checked') # time.sleep(2) # dlog.info(rjob.check_status()) # # can download dirs and normal files # rjob.download(['job0', 'job1'], ['a']) # # rjob.clean()
[docs]class LSFJob (RemoteJob) :
[docs] def submit(self, job_dirs, cmd, args = None, resources = None, restart = False): dlog.debug(restart) if restart: status = self.check_status() if status in [ JobStatus.unsubmitted, JobStatus.unknown, JobStatus.terminated ]: dlog.debug('task restart point !!!') if 'task_max' in resources and resources['task_max'] > 0: while self.check_limit(task_max=resources['task_max']): time.sleep(60) self._submit(job_dirs, cmd, args, resources) elif status==JobStatus.waiting: dlog.debug('task is waiting') elif status==JobStatus.running: dlog.debug('task is running') else: dlog.debug('task is finished') #except Exception: #dlog.debug('no job_id file') #dlog.debug('task restart point !!!') #self._submit(job_dirs, cmd, args, resources) else: dlog.debug('new task!!!') if 'task_max' in resources and resources['task_max'] > 0: while self.check_limit(task_max=resources['task_max']): time.sleep(60) self._submit(job_dirs, cmd, args, resources) if resources.get('wait_time', False): time.sleep(resources['wait_time']) # For preventing the crash of the tasks while submitting.
def _submit(self, job_dirs, cmd, args = None, resources = None) : script_name = self._make_script(job_dirs, cmd, args, res = resources) stdin, stdout, stderr = self.block_checkcall(('cd %s; bsub < %s' % (self.remote_root, script_name))) subret = (stdout.readlines()) job_id = subret[0].split()[1][1:-1] sftp = self.ssh.open_sftp() with sftp.open(os.path.join(self.remote_root, 'job_id'), 'w') as fp: fp.write(job_id) sftp.close()
[docs] def check_limit(self, task_max): stdin_run, stdout_run, stderr_run = self.block_checkcall("bjobs | grep RUN | wc -l") njobs_run = int(stdout_run.read().decode('utf-8').split ('\n')[0]) stdin_pend, stdout_pend, stderr_pend = self.block_checkcall("bjobs | grep PEND | wc -l") njobs_pend = int(stdout_pend.read().decode('utf-8').split ('\n')[0]) if (njobs_pend + njobs_run) < task_max: return False else: return True
[docs] def check_status(self) : try: job_id = self._get_job_id() except Exception: return JobStatus.terminated if job_id == "" : raise RuntimeError("job %s is has not been submitted" % self.remote_root) ret, stdin, stdout, stderr\ = self.block_call ("bjobs " + job_id) err_str = stderr.read().decode('utf-8') if ("Job <%s> is not found" % job_id) in err_str : if self._check_finish_tag() : return JobStatus.finished else : return JobStatus.terminated elif ret != 0 : raise RuntimeError ("status command bjobs fails to execute. erro info: %s return code %d" % (err_str, ret)) status_out = stdout.read().decode('utf-8').split('\n') if len(status_out) < 2: return JobStatus.unknown else: status_line = status_out[1] status_word = status_line.split()[2] # ref: https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html if status_word in ["PEND", "WAIT", "PSUSP"] : return JobStatus.waiting elif status_word in ["RUN", "USUSP"] : return JobStatus.running elif status_word in ["DONE","EXIT"] : if self._check_finish_tag() : return JobStatus.finished else : return JobStatus.terminated else : return JobStatus.unknown
def _get_job_id(self) : sftp = self.ssh.open_sftp() with sftp.open(os.path.join(self.remote_root, 'job_id'), 'r') as fp: ret = fp.read().decode('utf-8') sftp.close() return ret def _check_finish_tag(self) : sftp = self.ssh.open_sftp() try: sftp.stat(os.path.join(self.remote_root, 'tag_finished')) ret = True except IOError: ret = False sftp.close() return ret def _make_script(self, job_dirs, cmd, args = None, res = None) : _set_default_resource(res) ret = '' ret += "#!/bin/bash -l\n#BSUB -e %J.err\n#BSUB -o %J.out\n" if res['numb_gpu'] == 0: ret += '#BSUB -n %d\n#BSUB -R span[ptile=%d]\n' % ( res['numb_node'] * res['task_per_node'], res['node_cpu']) else: if res['node_cpu']: ret += '#BSUB -R span[ptile=%d]\n' % res['node_cpu'] if 'new_lsf_gpu' in res and res['new_lsf_gpu'] == True: # supportted in LSF >= 10.1.0 SP6 # ref: https://www.ibm.com/support/knowledgecenter/en/SSWRJV_10.1.0/lsf_resource_sharing/use_gpu_res_reqs.html ret += '#BSUB -n %d\n#BSUB -gpu "num=%d:mode=shared:j_exclusive=yes"\n' % ( res['numb_gpu'], res['task_per_node']) else: ret += '#BSUB -n %d\n#BSUB -R "select[ngpus >0] rusage[ngpus_excl_p=%d]"\n' % ( res['numb_gpu'], res['task_per_node']) if res['time_limit']: ret += '#BSUB -W %s\n' % (res['time_limit'].split(':')[ 0] + ':' + res['time_limit'].split(':')[1]) if res['mem_limit'] > 0 : ret += "#BSUB -M %d \n" % (res['mem_limit']) ret += '#BSUB -J %s\n' % (res['job_name'] if 'job_name' in res else 'dpgen') if len(res['partition']) > 0 : ret += '#BSUB -q %s\n' % res['partition'] ret += "\n" for ii in res['module_unload_list'] : ret += "module unload %s\n" % ii for ii in res['module_list'] : ret += "module load %s\n" % ii ret += "\n" for ii in res['source_list'] : ret += "source %s\n" %ii ret += "\n" envs = res['envs'] if envs != None : for key in envs.keys() : ret += 'export %s=%s\n' % (key, envs[key]) ret += '\n' if args == None : args = [] for ii in job_dirs: args.append('') for ii,jj in zip(job_dirs, args) : ret += 'cd %s\n' % ii ret += 'test $? -ne 0 && exit\n' if res['with_mpi']: ret += 'mpirun -machinefile $LSB_DJOB_HOSTFILE -n %d %s %s\n' % ( res['numb_node'] * res['task_per_node'], cmd, jj) else : ret += '%s %s\n' % (cmd, jj) if 'allow_failure' not in res or res['allow_failure'] is False: ret += 'test $? -ne 0 && exit\n' ret += 'cd %s\n' % self.remote_root ret += 'test $? -ne 0 && exit\n' ret += '\ntouch tag_finished\n' script_name = 'run.sub' script = os.path.join(self.remote_root, script_name) sftp = self.ssh.open_sftp() with sftp.open(script, 'w') as fp : fp.write(ret) sftp.close() return script_name