Source code for dpgen.auto_test.common_equi

import glob
import os
import shutil
import warnings
from monty.serialization import dumpfn
from multiprocessing import Pool

import dpgen.auto_test.lib.crys as crys
import dpgen.auto_test.lib.util as util
import dpgen.auto_test.lib.abacus as abacus
from dpgen import dlog
from dpgen.auto_test.calculator import make_calculator
from dpgen.auto_test.mpdb import get_structure
from dpgen.dispatcher.Dispatcher import make_dispatcher
from distutils.version import LooseVersion
from dpgen.dispatcher.Dispatcher import make_submission
from dpgen.remote.decide_machine import convert_mdata
from dpgen.auto_test.lib.utils import create_path

lammps_task_type = ['deepmd', 'meam', 'eam_fs', 'eam_alloy']


[docs]def make_equi(confs, inter_param, relax_param): # find all POSCARs and their name like mp-xxx # ... dlog.debug('debug info make equi') if 'type_map' in inter_param: ele_list = [key for key in inter_param['type_map'].keys()] else: ele_list = [key for key in inter_param['potcars'].keys()] # ele_list = inter_param['type_map'] dlog.debug("ele_list %s" % ':'.join(ele_list)) conf_dirs = [] for conf in confs: conf_dirs.extend(glob.glob(conf)) conf_dirs.sort() # generate a list of task names like mp-xxx/relaxation/relax_task # ... cwd = os.getcwd() # generate poscar for single element crystal if len(ele_list) == 1 or 'single' in inter_param: if 'single' in inter_param: element_label = int(inter_param['single']) else: element_label = 0 for ii in conf_dirs: os.chdir(ii) crys_type = ii.split('/')[-1] dlog.debug('crys_type: %s' % crys_type) dlog.debug('pwd: %s' % os.getcwd()) if crys_type == 'std-fcc': if not os.path.exists('POSCAR'): crys.fcc1(ele_list[element_label]).to('POSCAR', 'POSCAR') elif crys_type == 'std-hcp': if not os.path.exists('POSCAR'): crys.hcp(ele_list[element_label]).to('POSCAR', 'POSCAR') elif crys_type == 'std-dhcp': if not os.path.exists('POSCAR'): crys.dhcp(ele_list[element_label]).to('POSCAR', 'POSCAR') elif crys_type == 'std-bcc': if not os.path.exists('POSCAR'): crys.bcc(ele_list[element_label]).to('POSCAR', 'POSCAR') elif crys_type == 'std-diamond': if not os.path.exists('POSCAR'): crys.diamond(ele_list[element_label]).to('POSCAR', 'POSCAR') elif crys_type == 'std-sc': if not os.path.exists('POSCAR'): crys.sc(ele_list[element_label]).to('POSCAR', 'POSCAR') if inter_param['type'] == "abacus" and not os.path.exists('STRU'): abacus.poscar2stru("POSCAR",inter_param,"STRU") os.remove('POSCAR') os.chdir(cwd) task_dirs = [] # make task directories like mp-xxx/relaxation/relax_task # if mp-xxx/exists then print a warning and exit. # ... for ii in conf_dirs: crys_type = ii.split('/')[-1] dlog.debug('crys_type: %s' % crys_type) if 'mp-' in crys_type and not os.path.exists(os.path.join(ii, 'POSCAR')): get_structure(crys_type).to('POSCAR', os.path.join(ii, 'POSCAR')) if inter_param['type'] == "abacus" and not os.path.exists('STRU'): abacus.poscar2stru(os.path.join(ii, 'POSCAR'),inter_param,os.path.join(ii, 'STRU')) os.remove(os.path.join(ii, 'POSCAR')) poscar = os.path.abspath(os.path.join(ii, 'POSCAR')) POSCAR = 'POSCAR' if inter_param['type'] == "abacus": shutil.copyfile(os.path.join(ii, 'STRU'),os.path.join(ii, 'STRU.bk')) abacus.modify_stru_path(os.path.join(ii, 'STRU'),'pp_orb/') poscar = os.path.abspath(os.path.join(ii, 'STRU')) POSCAR = 'STRU' if not os.path.exists(poscar): raise FileNotFoundError('no configuration for autotest') if os.path.exists(os.path.join(ii, 'relaxation', 'jr.json')): os.remove(os.path.join(ii, 'relaxation', 'jr.json')) relax_dirs = os.path.abspath(os.path.join(ii, 'relaxation', 'relax_task')) # to be consistent with property in make dispatcher create_path(relax_dirs) task_dirs.append(relax_dirs) os.chdir(relax_dirs) # copy POSCARs to mp-xxx/relaxation/relax_task # ... if os.path.isfile(POSCAR): os.remove(POSCAR) os.symlink(os.path.relpath(poscar), POSCAR) os.chdir(cwd) task_dirs.sort() # generate task files relax_param['cal_type'] = 'relaxation' if 'cal_setting' not in relax_param: relax_param['cal_setting'] = {"relax_pos": True, "relax_shape": True, "relax_vol": True} else: if "relax_pos" not in relax_param['cal_setting']: relax_param['cal_setting']['relax_pos'] = True if "relax_shape" not in relax_param['cal_setting']: relax_param['cal_setting']['relax_shape'] = True if "relax_vol" not in relax_param['cal_setting']: relax_param['cal_setting']['relax_vol'] = True for ii in task_dirs: poscar = os.path.join(ii, 'POSCAR') dlog.debug('task_dir %s' % ii) inter = make_calculator(inter_param, poscar) inter.make_potential_files(ii) inter.make_input_file(ii, 'relaxation', relax_param)
[docs]def run_equi(confs, inter_param, mdata): # find all POSCARs and their name like mp-xxx # ... conf_dirs = [] for conf in confs: conf_dirs.extend(glob.glob(conf)) conf_dirs.sort() processes = len(conf_dirs) # generate a list of task names like mp-xxx/relaxation/relax_task # ... work_path_list = [] for ii in conf_dirs: work_path_list.append(os.path.join(ii, 'relaxation')) all_task = [] for ii in work_path_list: all_task.append(os.path.join(ii, 'relax_task')) run_tasks = all_task inter_type = inter_param['type'] # vasp if inter_type in ["vasp","abacus"]: mdata = convert_mdata(mdata, ["fp"]) elif inter_type in lammps_task_type: mdata = convert_mdata(mdata, ["model_devi"]) else: raise RuntimeError("unknown task %s, something wrong" % inter_type) # dispatch the tasks # POSCAR here is useless virtual_calculator = make_calculator(inter_param, "POSCAR") forward_files = virtual_calculator.forward_files() forward_common_files = virtual_calculator.forward_common_files() backward_files = virtual_calculator.backward_files() # backward_files += logs machine, resources, command, group_size = util.get_machine_info(mdata, inter_type) work_path = os.getcwd() print("%s --> Runing... " % (work_path)) api_version = mdata.get('api_version', '0.9') if LooseVersion(api_version) < LooseVersion('1.0'): warnings.warn(f"the dpdispatcher will be updated to new version." f"And the interface may be changed. Please check the documents for more details") disp = make_dispatcher(machine, resources, work_path, run_tasks, group_size) disp.run_jobs(resources, command, work_path, run_tasks, group_size, forward_common_files, forward_files, backward_files, outlog='outlog', errlog='errlog') elif LooseVersion(api_version) >= LooseVersion('1.0'): submission = make_submission( mdata_machine=machine, mdata_resources=resources, commands=[command], work_path=work_path, run_tasks=run_tasks, group_size=group_size, forward_common_files=forward_common_files, forward_files=forward_files, backward_files=backward_files, outlog='outlog', errlog='errlog' ) submission.run_submission()
[docs]def post_equi(confs, inter_param): # find all POSCARs and their name like mp-xxx # ... conf_dirs = [] for conf in confs: conf_dirs.extend(glob.glob(conf)) conf_dirs.sort() task_dirs = [] for ii in conf_dirs: task_dirs.append(os.path.abspath(os.path.join(ii, 'relaxation', 'relax_task'))) task_dirs.sort() # generate a list of task names like mp-xxx/relaxation # ... # dump the relaxation result. for ii in task_dirs: poscar = os.path.join(ii, 'POSCAR') inter = make_calculator(inter_param, poscar) res = inter.compute(ii) dumpfn(res, os.path.join(ii, 'result.json'), indent=4)