Source code for dpgen.auto_test.Lammps

import os
import warnings
import dpgen.auto_test.lib.lammps as lammps
from dpgen import dlog
from monty.serialization import loadfn, dumpfn
from dpgen.auto_test.Task import Task
from dpgen.auto_test.lib.lammps import inter_deepmd, inter_meam, inter_eam_fs, inter_eam_alloy

supported_inter = ["deepmd", 'meam', 'eam_fs', 'eam_alloy']


[docs]class Lammps(Task): def __init__(self, inter_parameter, path_to_poscar): self.inter = inter_parameter self.inter_type = inter_parameter['type'] self.type_map = inter_parameter['type_map'] self.in_lammps = inter_parameter.get('in_lammps', 'auto') if self.inter_type == 'meam': self.model = list(map(os.path.abspath, inter_parameter['model'])) else: self.model = os.path.abspath(inter_parameter['model']) self.path_to_poscar = path_to_poscar assert self.inter_type in supported_inter self.set_inter_type_func()
[docs] def set_inter_type_func(self): if self.inter_type == "deepmd": self.inter_func = inter_deepmd elif self.inter_type == 'meam': self.inter_func = inter_meam elif self.inter_type == 'eam_fs': self.inter_func = inter_eam_fs else: self.inter_func = inter_eam_alloy
[docs] def set_model_param(self): if self.inter_type == "deepmd": model_name = os.path.basename(self.model) deepmd_version = self.inter.get("deepmd_version", "1.2.0") self.model_param = {'model_name': [model_name], 'param_type': self.type_map, 'deepmd_version': deepmd_version} elif self.inter_type == 'meam': model_name = list(map(os.path.basename, self.model)) self.model_param = {'model_name': [model_name], 'param_type': self.type_map} else: model_name = os.path.basename(self.model) self.model_param = {'model_name': [model_name], 'param_type': self.type_map}
[docs] def make_potential_files(self, output_dir): cwd = os.getcwd() if self.inter_type == 'meam': model_lib = os.path.basename(self.model[0]) model_file = os.path.basename(self.model[1]) os.chdir(os.path.join(output_dir, '../')) if os.path.islink(model_lib): link_lib = os.readlink(model_lib) if not os.path.abspath(link_lib) == self.model[0]: os.remove(model_lib) os.symlink(os.path.relpath(self.model[0]), model_lib) else: os.symlink(os.path.relpath(self.model[0]), model_lib) if os.path.islink(model_file): link_file = os.readlink(model_file) if not os.path.abspath(link_file) == self.model[1]: os.remove(model_file) os.symlink(os.path.relpath(self.model[1]), model_file) else: os.symlink(os.path.relpath(self.model[1]), model_file) os.chdir(output_dir) if not os.path.islink(model_lib): os.symlink(os.path.join('..', model_lib), model_lib) elif not os.path.join('..', model_lib) == os.readlink(model_lib): os.remove(model_lib) os.symlink(os.path.join('..', model_lib), model_lib) if not os.path.islink(model_file): os.symlink(os.path.join('..', model_file), model_file) elif not os.path.join('..', model_file) == os.readlink(model_file): os.remove(model_file) os.symlink(os.path.join('..', model_file), model_file) os.chdir(cwd) else: model_file = os.path.basename(self.model) os.chdir(os.path.join(output_dir, '../')) if os.path.islink(model_file): link_file = os.readlink(model_file) if not os.path.abspath(link_file) == self.model: os.remove(model_file) os.symlink(os.path.relpath(self.model), model_file) else: os.symlink(os.path.relpath(self.model), model_file) os.chdir(output_dir) if not os.path.islink(model_file): os.symlink(os.path.join('..', model_file), model_file) elif not os.path.join('..', model_file) == os.readlink(model_file): os.remove(model_file) os.symlink(os.path.join('..', model_file), model_file) os.chdir(cwd) dumpfn(self.inter, os.path.join(output_dir, 'inter.json'), indent=4)
[docs] def make_input_file(self, output_dir, task_type, task_param): lammps.cvt_lammps_conf(os.path.join(output_dir, 'POSCAR'), os.path.join(output_dir, 'conf.lmp'), lammps.element_list(self.type_map)) # dumpfn(task_param, os.path.join(output_dir, 'task.json'), indent=4) etol = 0 ftol = 1e-10 maxiter = 5000 maxeval = 500000 B0 = 70 bp = 0 ntypes = len(self.type_map) cal_type = task_param['cal_type'] cal_setting = task_param['cal_setting'] self.set_model_param() # deal with user input in.lammps for relaxation if os.path.isfile(self.in_lammps) and task_type == 'relaxation': with open(self.in_lammps, 'r') as fin: fc = fin.read() # user input in.lammps for property calculation if 'input_prop' in cal_setting and os.path.isfile(cal_setting['input_prop']): with open(os.path.abspath(cal_setting['input_prop']), 'r') as fin: fc = fin.read() else: if 'etol' in cal_setting: dlog.info("%s setting etol to %s" % (self.make_input_file.__name__, cal_setting['etol'])) etol = cal_setting['etol'] if 'ftol' in cal_setting: dlog.info("%s setting ftol to %s" % (self.make_input_file.__name__, cal_setting['ftol'])) ftol = cal_setting['ftol'] if 'maxiter' in cal_setting: dlog.info("%s setting maxiter to %s" % (self.make_input_file.__name__, cal_setting['maxiter'])) maxiter = cal_setting['maxiter'] if 'maxeval' in cal_setting: dlog.info("%s setting maxeval to %s" % (self.make_input_file.__name__, cal_setting['maxeval'])) maxeval = cal_setting['maxeval'] if cal_type == 'relaxation': relax_pos = cal_setting['relax_pos'] relax_shape = cal_setting['relax_shape'] relax_vol = cal_setting['relax_vol'] if [relax_pos, relax_shape, relax_vol] == [True, False, False]: fc = lammps.make_lammps_equi('conf.lmp', self.type_map, self.inter_func, self.model_param, etol, ftol, maxiter, maxeval, False) elif [relax_pos, relax_shape, relax_vol] == [True, True, True]: fc = lammps.make_lammps_equi('conf.lmp', self.type_map, self.inter_func, self.model_param, etol, ftol, maxiter, maxeval, True) elif [relax_pos, relax_shape, relax_vol] == [True, True, False] and not task_type == 'eos': if 'scale2equi' in task_param: scale2equi = task_param['scale2equi'] fc = lammps.make_lammps_press_relax('conf.lmp', self.type_map, scale2equi[int(output_dir[-6:])], self.inter_func, self.model_param, B0, bp, etol, ftol, maxiter, maxeval) else: fc = lammps.make_lammps_equi('conf.lmp', self.type_map, self.inter_func, self.model_param, etol, ftol, maxiter, maxeval, True) elif [relax_pos, relax_shape, relax_vol] == [True, True, False] and task_type == 'eos': task_param['cal_setting']['relax_shape'] = False fc = lammps.make_lammps_equi('conf.lmp', self.type_map, self.inter_func, self.model_param, etol, ftol, maxiter, maxeval, False) elif [relax_pos, relax_shape, relax_vol] == [False, False, False]: fc = lammps.make_lammps_eval('conf.lmp', self.type_map, self.inter_func, self.model_param) else: raise RuntimeError("not supported calculation setting for LAMMPS") elif cal_type == 'static': fc = lammps.make_lammps_eval('conf.lmp', self.type_map, self.inter_func, self.model_param) else: raise RuntimeError("not supported calculation type for LAMMPS") dumpfn(task_param, os.path.join(output_dir, 'task.json'), indent=4) in_lammps_not_link_list = ['eos'] if task_type not in in_lammps_not_link_list: with open(os.path.join(output_dir, '../in.lammps'), 'w') as fp: fp.write(fc) cwd = os.getcwd() os.chdir(output_dir) if not (os.path.islink('in.lammps') or os.path.isfile('in.lammps')): os.symlink('../in.lammps', 'in.lammps') else: os.remove('in.lammps') os.symlink('../in.lammps', 'in.lammps') os.chdir(cwd) else: with open(os.path.join(output_dir, 'in.lammps'), 'w') as fp: fp.write(fc)
[docs] def compute(self, output_dir): log_lammps = os.path.join(output_dir, 'log.lammps') dump_lammps = os.path.join(output_dir, 'dump.relax') if not os.path.isfile(log_lammps): warnings.warn("cannot find log.lammps in " + output_dir + " skip") return None if not os.path.isfile(dump_lammps): warnings.warn("cannot find dump.relax in " + output_dir + " skip") return None else: box = [] coord = [] vol = [] energy = [] force = [] virial = [] stress = [] with open(dump_lammps, 'r') as fin: dump = fin.read().split('\n') dumptime = [] for idx, ii in enumerate(dump): if ii == 'ITEM: TIMESTEP': box.append([]) coord.append([]) force.append([]) dumptime.append(int(dump[idx + 1])) natom = int(dump[idx + 3]) xlo_bound = float(dump[idx + 5].split()[0]) xhi_bound = float(dump[idx + 5].split()[1]) xy = float(dump[idx + 5].split()[2]) ylo_bound = float(dump[idx + 6].split()[0]) yhi_bound = float(dump[idx + 6].split()[1]) xz = float(dump[idx + 6].split()[2]) zlo = float(dump[idx + 7].split()[0]) zhi = float(dump[idx + 7].split()[1]) yz = float(dump[idx + 7].split()[2]) xx = xhi_bound - max([0, xy, xz, xy + xz]) - (xlo_bound - min([0, xy, xz, xy + xz])) yy = yhi_bound - max([0, yz]) - (ylo_bound - min([0, yz])) zz = zhi - zlo box[-1].append([xx, 0.0, 0.0]) box[-1].append([xy, yy, 0.0]) box[-1].append([xz, yz, zz]) vol.append(xx * yy * zz) type_list = [] for jj in range(natom): type_list.append(int(dump[idx + 9 + jj].split()[1]) - 1) if 'xs ys zs' in dump[idx + 8]: a_x = float(dump[idx + 9 + jj].split()[2]) * xx + float( dump[idx + 9 + jj].split()[3]) * xy \ + float(dump[idx + 9 + jj].split()[4]) * xz a_y = float(dump[idx + 9 + jj].split()[3]) * yy + float( dump[idx + 9 + jj].split()[4]) * yz a_z = float(dump[idx + 9 + jj].split()[4]) * zz else: a_x = float(dump[idx + 9 + jj].split()[2]) a_y = float(dump[idx + 9 + jj].split()[3]) a_z = float(dump[idx + 9 + jj].split()[4]) coord[-1].append([a_x, a_y, a_z]) fx = float(dump[idx + 9 + jj].split()[5]) fy = float(dump[idx + 9 + jj].split()[6]) fz = float(dump[idx + 9 + jj].split()[7]) force[-1].append([fx, fy, fz]) with open(log_lammps, 'r') as fp: if 'Total wall time:' not in fp.read(): warnings.warn("lammps not finished " + log_lammps + " skip") return None else: fp.seek(0) lines = fp.read().split('\n') idid = -1 for ii in dumptime: idid += 1 for jj in lines: line = jj.split() if len(line) and str(ii) == line[0]: try: [float(kk) for kk in line] except Exception: continue stress.append([]) virial.append([]) energy.append(float(line[1])) # virials = stress * vol * 1e5 *1e-30 * 1e19/1.6021766208 stress[-1].append([float(line[2]) / 1000.0, float(line[5]) / 1000.0, float(line[6]) / 1000.0]) stress[-1].append([float(line[5]) / 1000.0, float(line[3]) / 1000.0, float(line[7]) / 1000.0]) stress[-1].append([float(line[6]) / 1000.0, float(line[7]) / 1000.0, float(line[4]) / 1000.0]) stress_to_virial = vol[idid] * 1e5 * 1e-30 * 1e19 / 1.6021766208 virial[-1].append([float(line[2]) * stress_to_virial, float(line[5]) * stress_to_virial, float(line[6]) * stress_to_virial]) virial[-1].append([float(line[5]) * stress_to_virial, float(line[3]) * stress_to_virial, float(line[7]) * stress_to_virial]) virial[-1].append([float(line[6]) * stress_to_virial, float(line[7]) * stress_to_virial, float(line[4]) * stress_to_virial]) break _tmp = self.type_map dlog.debug(_tmp) type_map_list = lammps.element_list(self.type_map) type_map_idx = list(range(len(type_map_list))) atom_numbs = [] for ii in type_map_idx: count = 0 for jj in type_list: if jj == ii: count += 1 atom_numbs.append(count) # d_dump = dpdata.System(dump_lammps, fmt='lammps/dump', type_map=type_map_list) # d_dump.to('vasp/poscar', contcar, frame_idx=-1) result_dict = {"@module": "dpdata.system", "@class": "LabeledSystem", "data": {"atom_numbs": atom_numbs, "atom_names": type_map_list, "atom_types": { "@module": "numpy", "@class": "array", "dtype": "int64", "data": type_list}, "orig": {"@module": "numpy", "@class": "array", "dtype": "int64", "data": [0, 0, 0]}, "cells": {"@module": "numpy", "@class": "array", "dtype": "float64", "data": box}, "coords": { "@module": "numpy", "@class": "array", "dtype": "float64", "data": coord}, "energies": { "@module": "numpy", "@class": "array", "dtype": "float64", "data": energy}, "forces": { "@module": "numpy", "@class": "array", "dtype": "float64", "data": force}, "virials": { "@module": "numpy", "@class": "array", "dtype": "float64", "data": virial}, "stress": { "@module": "numpy", "@class": "array", "dtype": "float64", "data": stress}}} contcar = os.path.join(output_dir, 'CONTCAR') dumpfn(result_dict, contcar, indent=4) d_dump = loadfn(contcar) d_dump.to('vasp/poscar', contcar, frame_idx=-1) return result_dict
[docs] def forward_files(self, property_type='relaxation'): if self.inter_type == 'meam': return ['conf.lmp', 'in.lammps'] + list(map(os.path.basename, self.model)) else: return ['conf.lmp', 'in.lammps', os.path.basename(self.model)]
[docs] def forward_common_files(self, property_type='relaxation'): if property_type not in ['eos']: if self.inter_type == 'meam': return ['in.lammps'] + list(map(os.path.basename, self.model)) else: return ['in.lammps', os.path.basename(self.model)] else: if self.inter_type == 'meam': return list(map(os.path.basename, self.model)) else: return [os.path.basename(self.model)]
[docs] def backward_files(self, property_type='relaxation'): return ['log.lammps', 'outlog', 'dump.relax']