dpgen.dispatcher package

Submodules

dpgen.dispatcher.ALI module

dpgen.dispatcher.AWS module

class dpgen.dispatcher.AWS.AWS(context, uuid_names=True)[source]

Bases: Batch

Attributes
job_id

Methods

AWS_check_status([job_id])

to aviod query jobStatus too often, set a time interval query_dict example: {job_id: JobStatus}

do_submit(job_dirs, cmd[, args, res, ...])

submit a single job, assuming that no job is running there.

sub_script(job_dirs, cmd, args, res, outlog, ...)

make submit script

check_finish_tag

check_status

default_resources

map_aws_status_to_dpgen_status

sub_script_cmd

sub_script_head

submit

classmethod AWS_check_status(job_id='')[source]

to aviod query jobStatus too often, set a time interval query_dict example: {job_id: JobStatus}

{‘40fb24b2-d0ca-4443-8e3a-c0906ea03622’: <JobStatus.running: 3>,

‘41bda50c-0a23-4372-806c-87d16a680d85’: <JobStatus.waiting: 2>}

check_status()[source]
default_resources(res)[source]
do_submit(job_dirs, cmd, args=None, res=None, outlog='log', errlog='err')[source]

submit a single job, assuming that no job is running there.

property job_id
static map_aws_status_to_dpgen_status(aws_status)[source]
sub_script(job_dirs, cmd, args, res, outlog, errlog)[source]

make submit script

job_dirs(list): directories of jobs. size: n_job cmd(list): commands to be executed. size: n_cmd args(list of list): args of commands. size of n_cmd x n_job

can be None

res(dict): resources available outlog(str): file name for output errlog(str): file name for error

dpgen.dispatcher.Batch module

class dpgen.dispatcher.Batch.Batch(context, uuid_names=True)[source]

Bases: object

Methods

do_submit(job_dirs, cmd[, args, res, ...])

submit a single job, assuming that no job is running there.

sub_script(job_dirs, cmd[, args, res, ...])

make submit script

check_finish_tag

check_status

default_resources

sub_script_cmd

sub_script_head

submit

check_finish_tag()[source]
check_status()[source]
default_resources(res)[source]
do_submit(job_dirs, cmd, args=None, res=None, outlog='log', errlog='err')[source]

submit a single job, assuming that no job is running there.

sub_script(job_dirs, cmd, args=None, res=None, outlog='log', errlog='err')[source]

make submit script

job_dirs(list): directories of jobs. size: n_job cmd(list): commands to be executed. size: n_cmd args(list of list): args of commands. size of n_cmd x n_job

can be None

res(dict): resources available outlog(str): file name for output errlog(str): file name for error

sub_script_cmd(cmd, res)[source]
sub_script_head(res)[source]
submit(job_dirs, cmd, args=None, res=None, restart=False, outlog='log', errlog='err')[source]

dpgen.dispatcher.Dispatcher module

class dpgen.dispatcher.Dispatcher.Dispatcher(remote_profile, context_type='local', batch_type='slurm', job_record='jr.json')[source]

Bases: object

Methods

all_finished

run_jobs

submit_jobs

all_finished(job_handler, mark_failure, clean=True)[source]
run_jobs(resources, command, work_path, tasks, group_size, forward_common_files, forward_task_files, backward_task_files, forward_task_deference=True, mark_failure=False, outlog='log', errlog='err')[source]
submit_jobs(resources, command, work_path, tasks, group_size, forward_common_files, forward_task_files, backward_task_files, forward_task_deference=True, outlog='log', errlog='err')[source]
class dpgen.dispatcher.Dispatcher.JobRecord(path, task_chunks, fname='job_record.json', ip=None)[source]

Bases: object

Methods

check_all_finished

check_finished

check_nfail

check_submitted

dump

get_uuid

increase_nfail

load

record_finish

record_remote_context

valid_hash

check_all_finished()[source]
check_finished(chunk_hash)[source]
check_nfail(chunk_hash)[source]
check_submitted(chunk_hash)[source]
dump()[source]
get_uuid(chunk_hash)[source]
increase_nfail(chunk_hash)[source]
load()[source]
record_finish(chunk_hash)[source]
record_remote_context(chunk_hash, local_root, remote_root, job_uuid, ip=None, instance_id=None)[source]
valid_hash(chunk_hash)[source]
dpgen.dispatcher.Dispatcher.make_dispatcher(mdata, mdata_resource=None, work_path=None, run_tasks=None, group_size=None)[source]
dpgen.dispatcher.Dispatcher.make_submission(mdata_machine, mdata_resources, commands, work_path, run_tasks, group_size, forward_common_files, forward_files, backward_files, outlog, errlog)[source]
dpgen.dispatcher.Dispatcher.make_submission_compat(machine: dict, resources: dict, commands: List[str], work_path: str, run_tasks: List[str], group_size: int, forward_common_files: List[str], forward_files: List[str], backward_files: List[str], outlog: str = 'log', errlog: str = 'err', api_version: str = '0.9') None[source]

Make submission with compatibility of both dispatcher API v0 and v1.

If api_version is less than 1.0, use make_dispatcher. If api_version is large than 1.0, use make_submission.

Parameters
machinedict

machine dict

resourcesdict

resource dict

commandslist[str]

list of commands

work_pathstr

working directory

run_taskslist[str]

list of paths to running tasks

group_sizeint

group size

forward_common_fileslist[str]

forwarded common files shared for all tasks

forward_fileslist[str]

forwarded files for each task

backward_fileslist[str]

backwarded files for each task

outlogstr, default=log

path to log from stdout

errlogstr, default=err

path to log from stderr

api_versionstr, default=0.9

API version. 1.0 is recommended

dpgen.dispatcher.Dispatcher.mdata_arginfo() List[Argument][source]

This method generates arginfo for a single mdata.

A submission requires the following keys: command, machine, and resources.

Returns
list[Argument]

arginfo

dpgen.dispatcher.DispatcherList module

class dpgen.dispatcher.DispatcherList.DispatcherList(mdata_machine, mdata_resources, work_path, run_tasks, group_size, cloud_resources=None)[source]

Bases: object

Methods

catch_dispatcher_exception(ii)

everything is okay: return 0 ssh not active : return 1 machine callback : return 2

check_all_dispatchers_finished(ratio_failure)

check_dispatcher_status(ii[, allow_failure])

catch running dispatcher exception if no exception occured, check finished

clean()

create(ii)

case1: use existed machine(finished) to make_dispatcher case2: create one machine, then make_dispatcher, change status from unallocated to unsubmitted

delete(ii)

delete one machine if entity is none, means this machine is used by another dispatcher, shouldn't be deleted

exception_handling(ratio_failure)

init()

make_dispatcher(ii)

run_jobs(resources, command, work_path, ...)

update()

catch_dispatcher_exception(ii)[source]

everything is okay: return 0 ssh not active : return 1 machine callback : return 2

check_all_dispatchers_finished(ratio_failure)[source]
check_dispatcher_status(ii, allow_failure=False)[source]

catch running dispatcher exception if no exception occured, check finished

clean()[source]
create(ii)[source]

case1: use existed machine(finished) to make_dispatcher case2: create one machine, then make_dispatcher, change status from unallocated to unsubmitted

delete(ii)[source]

delete one machine if entity is none, means this machine is used by another dispatcher, shouldn’t be deleted

exception_handling(ratio_failure)[source]
init()[source]
make_dispatcher(ii)[source]
run_jobs(resources, command, work_path, tasks, group_size, forward_common_files, forward_task_files, backward_task_files, forward_task_deference=True, mark_failure=False, outlog='log', errlog='err')[source]
update()[source]
class dpgen.dispatcher.DispatcherList.Entity(ip, instance_id, job_record=None, job_handler=None)[source]

Bases: object

dpgen.dispatcher.JobStatus module

class dpgen.dispatcher.JobStatus.JobStatus(value)[source]

Bases: Enum

An enumeration.

completing = 6
finished = 5
running = 3
terminated = 4
unknown = 100
unsubmitted = 1
waiting = 2

dpgen.dispatcher.LSF module

class dpgen.dispatcher.LSF.LSF(context, uuid_names=True)[source]

Bases: Batch

Methods

default_resources(res_)

set default value if a key in res_ is not fhound

do_submit(job_dirs, cmd[, args, res, ...])

submit a single job, assuming that no job is running there.

sub_script(job_dirs, cmd[, args, res, ...])

make submit script

check_finish_tag

check_status

sub_script_cmd

sub_script_head

submit

check_status()[source]
default_resources(res_)[source]

set default value if a key in res_ is not fhound

do_submit(job_dirs, cmd, args=None, res=None, outlog='log', errlog='err')[source]

submit a single job, assuming that no job is running there.

sub_script_cmd(cmd, arg, res)[source]
sub_script_head(res)[source]

dpgen.dispatcher.LazyLocalContext module

class dpgen.dispatcher.LazyLocalContext.LazyLocalContext(local_root, work_profile=None, job_uuid=None)[source]

Bases: object

Methods

block_call

block_checkcall

call

check_file_exists

check_finish

clean

download

get_job_root

get_return

kill

read_file

upload

write_file

block_call(cmd)[source]
block_checkcall(cmd)[source]
call(cmd)[source]
check_file_exists(fname)[source]
check_finish(proc)[source]
clean()[source]
download(job_dirs, remote_down_files, check_exists=False, mark_failure=True, back_error=False)[source]
get_job_root()[source]
get_return(proc)[source]
kill(proc)[source]
read_file(fname)[source]
upload(job_dirs, local_up_files, dereference=True)[source]
write_file(fname, write_str)[source]
class dpgen.dispatcher.LazyLocalContext.SPRetObj(ret)[source]

Bases: object

Methods

read

readlines

read()[source]
readlines()[source]

dpgen.dispatcher.LocalContext module

class dpgen.dispatcher.LocalContext.LocalContext(local_root, work_profile, job_uuid=None)[source]

Bases: object

Methods

block_call

block_checkcall

call

check_file_exists

check_finish

clean

download

get_job_root

get_return

kill

read_file

upload

write_file

block_call(cmd)[source]
block_checkcall(cmd)[source]
call(cmd)[source]
check_file_exists(fname)[source]
check_finish(proc)[source]
clean()[source]
download(job_dirs, remote_down_files, check_exists=False, mark_failure=True, back_error=False)[source]
get_job_root()[source]
get_return(proc)[source]
kill(proc)[source]
read_file(fname)[source]
upload(job_dirs, local_up_files, dereference=True)[source]
write_file(fname, write_str)[source]
class dpgen.dispatcher.LocalContext.LocalSession(jdata)[source]

Bases: object

Methods

get_work_root

get_work_root()[source]
class dpgen.dispatcher.LocalContext.SPRetObj(ret)[source]

Bases: object

Methods

read

readlines

read()[source]
readlines()[source]

dpgen.dispatcher.PBS module

class dpgen.dispatcher.PBS.PBS(context, uuid_names=True)[source]

Bases: Batch

Methods

default_resources(res_)

set default value if a key in res_ is not fhound

do_submit(job_dirs, cmd[, args, res, ...])

submit a single job, assuming that no job is running there.

sub_script(job_dirs, cmd[, args, res, ...])

make submit script

check_finish_tag

check_status

sub_script_cmd

sub_script_head

submit

check_status()[source]
default_resources(res_)[source]

set default value if a key in res_ is not fhound

do_submit(job_dirs, cmd, args=None, res=None, outlog='log', errlog='err')[source]

submit a single job, assuming that no job is running there.

sub_script_cmd(cmd, arg, res)[source]
sub_script_head(res)[source]

dpgen.dispatcher.SSHContext module

class dpgen.dispatcher.SSHContext.SSHContext(local_root, ssh_session, job_uuid=None)[source]

Bases: object

Attributes
sftp
ssh

Methods

block_call

block_checkcall

call

check_file_exists

check_finish

clean

close

download

get_job_root

get_return

kill

read_file

upload

write_file

block_call(cmd)[source]
block_checkcall(cmd, retry=0)[source]
call(cmd)[source]
check_file_exists(fname)[source]
check_finish(cmd_pipes)[source]
clean()[source]
close()[source]
download(job_dirs, remote_down_files, check_exists=False, mark_failure=True, back_error=False)[source]
get_job_root()[source]
get_return(cmd_pipes)[source]
kill(cmd_pipes)[source]
read_file(fname)[source]
property sftp
property ssh
upload(job_dirs, local_up_files, dereference=True)[source]
write_file(fname, write_str)[source]
class dpgen.dispatcher.SSHContext.SSHSession(jdata)[source]

Bases: object

Attributes
sftp

Returns sftp.

Methods

exec_command(cmd[, retry])

Calling self.ssh.exec_command but has an exception check.

close

ensure_alive

get_session_root

get_ssh_client

close()[source]
ensure_alive(max_check=10, sleep_time=10)[source]
exec_command(cmd, retry=0)[source]

Calling self.ssh.exec_command but has an exception check.

get_session_root()[source]
get_ssh_client()[source]
property sftp

Returns sftp. Open a new one if not existing.

dpgen.dispatcher.Shell module

class dpgen.dispatcher.Shell.Shell(context, uuid_names=True)[source]

Bases: Batch

Methods

do_submit(job_dirs, cmd[, args, res, ...])

submit a single job, assuming that no job is running there.

sub_script(job_dirs, cmd[, args, res, ...])

make submit script

check_finish_tag

check_running

check_status

default_resources

sub_script_cmd

sub_script_head

submit

check_running()[source]
check_status()[source]
default_resources(res_)[source]
do_submit(job_dirs, cmd, args=None, res=None, outlog='log', errlog='err')[source]

submit a single job, assuming that no job is running there.

sub_script_cmd(cmd, arg, res)[source]
sub_script_head(resources)[source]

dpgen.dispatcher.Slurm module

class dpgen.dispatcher.Slurm.Slurm(context, uuid_names=True)[source]

Bases: Batch

Methods

check_status()

check the status of a job

default_resources(res_)

set default value if a key in res_ is not fhound

do_submit(job_dirs, cmd[, args, res, ...])

submit a single job, assuming that no job is running there.

sub_script(job_dirs, cmd[, args, res, ...])

make submit script

check_finish_tag

sub_script_cmd

sub_script_head

submit

check_status()[source]

check the status of a job

default_resources(res_)[source]

set default value if a key in res_ is not fhound

do_submit(job_dirs, cmd, args=None, res=None, outlog='log', errlog='err')[source]

submit a single job, assuming that no job is running there.

sub_script_cmd(cmd, arg, res)[source]
sub_script_head(res)[source]