#!/usr/bin/env python
# coding: utf-8
import os,sys,glob,time
import numpy as np
import subprocess as sp
from monty.serialization import dumpfn,loadfn
from dpgen.remote.RemoteJob import SlurmJob, PBSJob, CloudMachineJob, JobStatus, awsMachineJob,SSHSession
from dpgen import dlog
import requests
from hashlib import sha1
def _verfy_ac(private_key, params):
items= sorted(params.items())
params_data = ""
for key, value in items:
params_data = params_data + str(key) + str(value)
params_data = params_data + private_key
sign = sha1()
sign.update(params_data.encode())
signature = sign.hexdigest()
return signature
[docs]def aws_submit_jobs(machine,
resources,
command,
work_path,
tasks,
group_size,
forward_common_files,
forward_task_files,
backward_task_files,
forward_task_deference = True):
import boto3
task_chunks = [
[os.path.basename(j) for j in tasks[i:i + group_size]] \
for i in range(0, len(tasks), group_size)
]
task_chunks = (str(task_chunks).translate((str.maketrans('','',' \'"[]'))).split(','))
# flatten the task_chunks
print('task_chunks=',task_chunks)
njob = len(task_chunks)
print('njob=',njob)
continue_status = False
ecs=boto3.client('ecs')
ec2=boto3.client('ec2')
status_list=[]
containerInstanceArns=ecs.list_container_instances(cluster="tensorflow")
if containerInstanceArns['containerInstanceArns']:
containerInstances=ecs.describe_container_instances(cluster="tensorflow", \
containerInstances=containerInstanceArns['containerInstanceArns'])['containerInstances']
status_list=[container['status'] for container in containerInstances]
need_apply_num=group_size-len(status_list)
print('need_apply_num=',need_apply_num)
if need_apply_num>0:
for ii in range(need_apply_num) : #apply for machines,
ec2.run_instances(**machine['run_instances'])
machine_fin = False
status_list=[]
while not len(status_list)>=group_size:
containerInstanceArns=ecs.list_container_instances(cluster="tensorflow")
if containerInstanceArns['containerInstanceArns']:
containerInstances=ecs.describe_container_instances(cluster="tensorflow", \
containerInstances=containerInstanceArns['containerInstanceArns'])['containerInstances']
status_list=[container['status'] for container in containerInstances]
if len(status_list)>=group_size:
break
else:
time.sleep(20)
print('current available containers status_list=',status_list)
print('remote_root=',machine['remote_root'])
rjob = awsMachineJob(machine['remote_root'],work_path)
taskARNs=[]
taskstatus=[]
running_job_num=0
rjob.upload('.', forward_common_files)
for ijob in range(njob) : #uplaod && submit job
containerInstanceArns=ecs.list_container_instances(cluster="tensorflow")
containerInstances=ecs.describe_container_instances(cluster="tensorflow", \
containerInstances=containerInstanceArns['containerInstanceArns'])['containerInstances']
status_list=[container['status'] for container in containerInstances]
print('current available containers status_list=',status_list)
while running_job_num>=group_size:
taskstatus=[task['lastStatus'] for task in ecs.describe_tasks(cluster='tensorflow',tasks=taskARNs)['tasks']]
running_job_num=len(list(filter(lambda str:(str=='PENDING' or str =='RUNNING'),taskstatus)))
print('waiting for running job finished, taskstatus=',taskstatus,'running_job_num=',running_job_num)
time.sleep(10)
chunk = str(task_chunks[ijob])
print('current task chunk=',chunk)
task_definition=command['task_definition']
concrete_command=(command['concrete_command'] %(work_path,chunk))
command_override=command['command_override']
command_override['containerOverrides'][0]['command'][0]=concrete_command
print('concrete_command=',concrete_command)
rjob.upload(chunk, forward_task_files,
dereference = forward_task_deference)
taskres=ecs.run_task(cluster='tensorflow',\
taskDefinition=task_definition,overrides=command_override)
while not taskres['tasks'][0]:
print('task submit failed,taskres=',taskres,'trying to re-submit'+str(chunk),)
time.sleep(10)
taskres=ecs.run_task(cluster='tensorflow',\
taskDefinition=task_definition,overrides=command_override)
taskARNs.append(taskres['tasks'][0]['taskArn'])
taskstatus=[task['lastStatus'] for task in ecs.describe_tasks(cluster='tensorflow',tasks=taskARNs)['tasks']]
running_job_num=len(list(filter(lambda str:(str=='PENDING' or str =='RUNNING'),taskstatus)))
print('have submitted %s/%s,taskstatus=' %(work_path,chunk) ,taskstatus,'running_job_num=',running_job_num )
task_fin_flag=False
while not task_fin_flag:
taskstatus=[task['lastStatus'] for task in ecs.describe_tasks(cluster='tensorflow',tasks=taskARNs)['tasks']]
task_fin_flag=all([status=='STOPPED' for status in taskstatus])
if task_fin_flag:
print('task finished,next step:copy files to local && taskstatus=',taskstatus)
else:
print('all tasks submitted,task running && taskstatus=',taskstatus)
time.sleep(20)
for ii in range(njob):
chunk = task_chunks[ii]
print('downloading '+str(chunk),backward_task_files)
rjob.download(chunk,backward_task_files)
def _ucloud_remove_machine(machine, UHostId):
ucloud_url = machine['url']
ucloud_stop_param = {}
ucloud_stop_param['Action'] = "StopUHostInstance"
ucloud_stop_param['Region'] = machine['ucloud_param']['Region']
ucloud_stop_param['UHostId'] = UHostId
ucloud_stop_param['PublicKey'] = machine['ucloud_param']['PublicKey']
ucloud_stop_param['Signature'] = _verfy_ac(machine['Private'], ucloud_stop_param)
req = requests.get(ucloud_url, ucloud_stop_param)
if req.json()['RetCode'] != 0 :
raise RuntimeError ("failed to stop ucloud machine")
terminate_fin = False
try_time = 0
while not terminate_fin:
ucloud_delete_param = {}
ucloud_delete_param['Action'] = "TerminateUHostInstance"
ucloud_delete_param['Region'] = machine['ucloud_param']['Region']
ucloud_delete_param['UHostId'] = UHostId
ucloud_delete_param['PublicKey'] = machine['ucloud_param']['PublicKey']
ucloud_delete_param['Signature'] = _verfy_ac(machine['Private'], ucloud_delete_param)
req = requests.get(ucloud_url, ucloud_delete_param)
if req.json()['RetCode'] == 0 :
terminate_fin = True
try_time = try_time + 1
if try_time >= 200:
raise RuntimeError ("failed to terminate ucloud machine")
time.sleep(10)
print("Machine ",UHostId,"has been successfully terminated!")
[docs]def ucloud_submit_jobs(machine,
resources,
command,
work_path,
tasks,
group_size,
forward_common_files,
forward_task_files,
backward_task_files,
forward_task_deference = True) :
task_chunks = [
[os.path.basename(j) for j in tasks[i:i + group_size]] \
for i in range(0, len(tasks), group_size)
]
njob = len(task_chunks)
continue_status = False
if os.path.isfile("record.machine"):
with open ("record.machine", "r") as fr:
record_machine = json.load(fr)
if record_machine["purpose"] == machine["purpose"] and record_machine["njob"] == njob:
continue_status = True
ucloud_machines = record_machine["ucloud_machines"]
ucloud_hostids = record_machine["ucloud_hostids"]
fr.close()
ucloud_url = machine['url']
if continue_status == False:
assert machine['machine_type'] == 'ucloud'
ucloud_start_param = machine['ucloud_param']
ucloud_start_param['Action'] = "CreateUHostInstance"
ucloud_start_param['Name'] = "train"
ucloud_start_param['Signature'] = _verfy_ac(machine['Private'], ucloud_start_param)
ucloud_machines = []
ucloud_hostids = []
for ii in range(njob) :
req = requests.get(ucloud_url, ucloud_start_param)
if req.json()['RetCode'] != 0 :
print(json.dumps(req.json(),indent=2, sort_keys=True))
raise RuntimeError ("failed to start ucloud machine")
ucloud_machines.append(str(req.json()["IPs"][0]))
ucloud_hostids.append(str(req.json()["UHostIds"][0]))
new_record_machine = {}
new_record_machine["purpose"] = machine["purpose"]
new_record_machine["njob"] = njob
new_record_machine["ucloud_machines"] = ucloud_machines
new_record_machine["ucloud_hostids"] = ucloud_hostids
with open ("record.machine", "w") as fw:
json.dump(new_record_machine, fw)
fw.close()
machine_fin = [False for ii in ucloud_machines]
total_machine_num = len(ucloud_machines)
fin_machine_num = 0
while not all(machine_fin):
for idx,mac in enumerate(ucloud_machines):
if not machine_fin[idx]:
ucloud_check_param = {}
ucloud_check_param['Action'] = "GetUHostInstanceVncInfo"
ucloud_check_param['Region'] = machine['ucloud_param']['Region']
ucloud_check_param['UHostId'] = ucloud_hostids[idx]
ucloud_check_param['PublicKey'] = machine['ucloud_param']['PublicKey']
ucloud_check_param['Signature'] = _verfy_ac(machine['Private'], ucloud_check_param)
req = requests.get(ucloud_url, ucloud_check_param)
print("the UHostId is", ucloud_hostids[idx])
print(json.dumps(req.json(),indent=2, sort_keys=True))
if req.json()['RetCode'] == 0 :
machine_fin[idx] = True
fin_machine_num = fin_machine_num + 1
print("Current finish",fin_machine_num,"/", total_machine_num)
ucloud_check_param1 = {}
ucloud_check_param1['Action'] = "DescribeUHostInstance"
ucloud_check_param1['Region'] = machine['ucloud_param']['Region']
ucloud_check_param1["Limit"] = 100
ucloud_check_param1['PublicKey'] = machine['ucloud_param']['PublicKey']
ucloud_check_param1['Signature'] = _verfy_ac(machine['Private'], ucloud_check_param1)
req1 = requests.get(ucloud_url, ucloud_check_param1).json()
machine_all_fin = True
for idx1 in range(int(req1["TotalCount"])):
if req1["UHostSet"][idx1]["State"] != "Running":
machine_all_fin = False
break
if machine_all_fin == True:
machine_fin = [True for i in machine_fin]
time.sleep(10)
ssh_sess = []
ssh_param = {}
ssh_param['port'] = 22
ssh_param['username'] = 'root'
ssh_param['work_path'] = machine['work_path']
for ii in ucloud_machines :
ssh_param['hostname'] = ii
ssh_sess.append(SSHSession(ssh_param))
job_list = []
for ii in range(njob) :
chunk = task_chunks[ii]
print("Current machine is", ucloud_machines[ii])
rjob = CloudMachineJob(ssh_sess[ii], work_path)
rjob.upload('.', forward_common_files)
rjob.upload(chunk, forward_task_files,
dereference = forward_task_deference)
rjob.submit(chunk, command, resources = resources)
job_list.append(rjob)
job_fin = [False for ii in job_list]
while not all(job_fin) :
for idx,rjob in enumerate(job_list) :
if not job_fin[idx] :
status = rjob.check_status()
if status == JobStatus.terminated :
raise RuntimeError("find unsuccessfully terminated job on machine" % ucloud_machines[idx])
elif status == JobStatus.finished :
rjob.download(task_chunks[idx], backward_task_files)
rjob.clean()
_ucloud_remove_machine(machine, ucloud_hostids[idx])
job_fin[idx] = True
time.sleep(10)
os.remove("record.machine")
[docs]def group_slurm_jobs(ssh_sess,
resources,
command,
work_path,
tasks,
group_size,
forward_common_files,
forward_task_files,
backward_task_files,
remote_job = SlurmJob,
forward_task_deference = True) :
task_chunks = [
[os.path.basename(j) for j in tasks[i:i + group_size]] \
for i in range(0, len(tasks), group_size)
]
cwd=os.getcwd()
_pmap=PMap(cwd)
path_map=_pmap.load()
dlog.debug("work_path: %s"% work_path)
dlog.debug("curr_path: %s"% cwd)
job_list = []
task_chunks_=['+'.join(ii) for ii in task_chunks]
for ii in task_chunks_:
dlog.debug("task_chunk %s" % ii)
#dlog.debug(path_map)
for ii,chunk in enumerate(task_chunks) :
# map chunk info. to uniq id
chunk_uni=task_chunks_[ii].encode('utf-8')
chunk_sha1=sha1(chunk_uni).hexdigest()
if chunk_sha1 in path_map:
job_uuid=path_map[chunk_sha1][1].split('/')[-1]
dlog.debug("load uuid %s" % job_uuid)
else:
job_uuid=None
rjob = remote_job(ssh_sess, work_path, job_uuid)
dlog.debug('uuid %s'%job_uuid)
rjob.upload('.', forward_common_files)
rjob.upload(chunk, forward_task_files,
dereference = forward_task_deference)
if job_uuid:
rjob.submit(chunk, command, resources = resources,restart=True)
else:
rjob.submit(chunk, command, resources = resources)
job_list.append(rjob)
path_map[chunk_sha1]=[rjob.local_root,rjob.remote_root]
_pmap.dump(path_map)
job_fin = [False for ii in job_list]
lcount=[0]*len(job_list)
count_fail = 0
while not all(job_fin) :
for idx,rjob in enumerate(job_list) :
if not job_fin[idx] :
try:
status = rjob.check_status()
except Exception:
ssh_sess = SSHSession(ssh_sess.remote_profile)
for _idx,_rjob in enumerate(job_list):
job_list[_idx] = SlurmJob(ssh_sess, work_path, _rjob.job_uuid)
count_fail = count_fail +1
dlog.info("ssh_sess failed for %d times"%count_fail)
break
if status == JobStatus.terminated :
lcount[idx]+=1
_job_uuid=rjob.remote_root.split('/')[-1]
dlog.info('Job at %s terminated, submit again'% _job_uuid)
dlog.debug('try %s times for %s'% (lcount[idx], _job_uuid))
rjob.submit(task_chunks[idx], command, resources = resources,restart=True)
if lcount[idx]>3:
dlog.info('Too many errors for ! %s ' % _job_uuid)
rjob.download(task_chunks[idx], backward_task_files,back_error=True)
rjob.clean()
job_fin[idx] = True
elif status == JobStatus.finished :
rjob.download(task_chunks[idx], backward_task_files)
rjob.clean()
job_fin[idx] = True
time.sleep(10)
dlog.debug('error count')
dlog.debug(lcount)
# delete path map file when job finish
_pmap.delete()
[docs]def group_local_jobs(ssh_sess,
resources,
command,
work_path,
tasks,
group_size,
forward_common_files,
forward_task_files,
backward_task_files,
forward_task_deference = True) :
task_chunks = [
[os.path.basename(j) for j in tasks[i:i + group_size]] \
for i in range(0, len(tasks), group_size)
]
job_list = []
for chunk in task_chunks :
rjob = CloudMachineJob(ssh_sess, work_path)
rjob.upload('.', forward_common_files)
rjob.upload(chunk, forward_task_files,
dereference = forward_task_deference)
rjob.submit(chunk, command, resources = resources)
job_list.append(rjob)
job_fin = False
while not job_fin :
status = rjob.check_status()
if status == JobStatus.terminated :
raise RuntimeError("find unsuccessfully terminated job in %s" % rjob.get_job_root())
elif status == JobStatus.finished :
rjob.download(chunk, backward_task_files)
rjob.clean()
job_fin = True
time.sleep(10)
[docs]class PMap(object):
'''
Path map class to operate {read,write,delte} the pmap.json file
'''
def __init__(self,path,fname="pmap.json"):
self.f_path_map=os.path.join(path,fname)
[docs] def load(self):
f_path_map=self.f_path_map
if os.path.isfile(f_path_map):
path_map=loadfn(f_path_map)
else:
path_map={}
return path_map
[docs] def dump(self,pmap,indent=4):
f_path_map=self.f_path_map
dumpfn(pmap,f_path_map,indent=indent)
[docs] def delete(self):
f_path_map=self.f_path_map
try:
os.remove(f_path_map)
except Exception:
pass