import logging
from typing import (
Optional,
)
import numpy as np
from deepmd.env import (
op_module,
tf,
)
from deepmd.nvnmd.data.data import (
jdata_deepmd_input,
jdata_sys,
)
from deepmd.nvnmd.utils.config import (
nvnmd_cfg,
)
from deepmd.nvnmd.utils.fio import (
FioDic,
)
from deepmd.nvnmd.utils.network import (
get_sess,
)
from deepmd.nvnmd.utils.weight import (
get_filter_weight,
get_normalize,
)
from deepmd.utils.sess import (
run_sess,
)
log = logging.getLogger(__name__)
[docs]class MapTable:
r"""Generate the mapping table describing the relastionship of
atomic distance, cutoff function, and embedding matrix.
three mapping table will be built:
| :math:`r^2_{ji} \rightarrow s_{ji}`
| :math:`r^2_{ji} \rightarrow h_{ji}`
| :math:`r^2_{ji} \rightarrow \mathcal{G}_{ji}`
where :math:`s_{ji}` is cut-off function,
:math:`h_{ji} = \frac{s(r_{ji})}{r_{ji}}`, and
:math:`\mathcal{G}_{ji}` is embedding matrix.
The mapping funciton can be define as:
| :math:`y = f(x) = y_{k} + (x - x_{k}) * dy_{k}`
| :math:`y_{k} = f(x_{k})`
| :math:`dy_{k} = \frac{f(x_{k+1}) - f(x_{k})}{dx}`
| :math:`x_{k} \leq x < x_{k+1}`
| :math:`x_{k} = k * dx`
where :math:`dx` is interpolation interval.
Parameters
----------
config_file
input file name
an .npy file containing the configuration information of NVNMD model
weight_file
input file name
an .npy file containing the weights of NVNMD model
map_file
output file name
an .npy file containing the mapping tables of NVNMD model
References
----------
DOI: 10.1038/s41524-022-00773-z
"""
def __init__(self, config_file: str, weight_file: str, map_file: str):
self.config_file = config_file
self.weight_file = weight_file
self.map_file = map_file
jdata = jdata_deepmd_input["nvnmd"]
jdata["config_file"] = config_file
jdata["weight_file"] = weight_file
jdata["enable"] = True
nvnmd_cfg.init_from_jdata(jdata)
[docs] def build_map(self):
ntypex = nvnmd_cfg.dscp["ntypex"]
ntype = nvnmd_cfg.dscp["ntype"]
# calculate grid point
dic_u2s, dic_u2s_ref = self.run_u2s()
dic_s2g, dic_s2g_ref = self.run_s2g()
# build mapping table
rank = 4
dic_map = {}
u = dic_u2s["u"]
cfg_u2s = [[u[0], u[512], u[1] - u[0], 0, 512]]
dic_map["s"], dic_map["s_grad"] = self.build_map_coef(
cfg_u2s,
u,
dic_u2s["s"],
dic_u2s["s_grad"],
dic_u2s["s_grad_grad"],
ntype,
1,
rank,
)
dic_map["h"], dic_map["h_grad"] = self.build_map_coef(
cfg_u2s,
u,
dic_u2s["h"],
dic_u2s["h_grad"],
dic_u2s["h_grad_grad"],
ntype,
1,
rank,
)
dic_map2 = {}
s = dic_s2g["s"]
cfg_s2g = [
[s[0], s[256], s[1] - s[0], 0, 256],
[s[0], s[4096], s[16] - s[0], 256, 512],
]
dic_map2["g"], dic_map2["g_grad"] = self.build_map_coef(
cfg_s2g,
s,
dic_s2g["g"],
dic_s2g["g_grad"],
dic_s2g["g_grad_grad"],
ntype,
32,
rank,
)
# run mapping to test
if jdata_sys["debug"]:
dic_u2s_prd = self.mapping2(dic_u2s_ref["u"], dic_map, cfg_u2s, rank)
dic_s2g_prd = self.mapping2(dic_s2g_ref["s"], dic_map2, cfg_s2g, rank)
self.plot_lines(dic_u2s_ref["u"], dic_u2s_prd, dic_u2s_ref)
self.plot_lines(dic_s2g_ref["s"], dic_s2g_prd, dic_s2g_ref)
# save
self.map = {}
self.map["cfg_u2s"] = cfg_u2s
self.map["cfg_s2g"] = cfg_s2g
self.map.update(dic_map)
self.map.update(dic_map2)
FioDic().save(self.map_file, self.map)
log.info("NVNMD: finish building mapping table")
return self.map
[docs] def mapping(self, x, dic_map, cfgs, rank=4):
r"""Evaluate value by mapping table operation of tensorflow."""
n = len(x)
dic_val = {}
for key in dic_map.keys():
val = dic_map[key]
if isinstance(val, list):
dats = []
for ii in range(len(val)):
val_i = val[ii]
nr = np.shape(val_i)[0]
nc = np.shape(val_i)[1] // rank
dat_i = np.zeros([n, nc])
for kk in range(n):
xk = x[kk]
for cfg in cfgs:
x0, x1, dx, N0, N1 = cfg
if (xk >= x0) and (xk <= x1):
break
idx = np.int32(np.floor((xk - x0) / dx))
dxx = xk - idx * dx - x0
idx_k = idx + N0
dxx_k = dxx
if idx_k >= N1:
idx_k = N1 - 1
dxx_k = dx
coef = val_i[idx_k]
if rank == 4:
coef = np.reshape(coef, [nc, 4])
a, b, c, d = coef[:, 0], coef[:, 1], coef[:, 2], coef[:, 3]
dat_i[kk, :] = d + (c + (b + a * dxx_k) * dxx_k) * dxx_k
elif rank == 2:
coef = np.reshape(coef, [nc, 2])
a, b = coef[:, 0], coef[:, 1]
dat_i[kk, :] = b + a * dxx_k
dats.append(dat_i)
dic_val[key] = dats
return dic_val
[docs] def mapping2(self, x, dic_map, cfgs, rank=4):
r"""Evaluate value by mapping table of numpy."""
tf.reset_default_graph()
t_x = tf.placeholder(tf.float64, [None, 1], "t_x")
t_table = tf.placeholder(tf.float64, [None, None], "t_table")
t_table_grad = tf.placeholder(tf.float64, [None, None], "t_table_grad")
t_table_info = tf.placeholder(tf.float64, [None], "t_table_info")
t_y = op_module.map_flt_nvnmd(t_x, t_table, t_table_grad, t_table_info)
sess = get_sess()
#
n = len(x)
dic_val = {}
for key in dic_map.keys():
val = dic_map[key]
if isinstance(val, list):
dats = []
for ii in range(len(val)):
val_i = val[ii]
feed_dict = {
t_x: x,
t_table: val_i,
t_table_grad: val_i * 0.0,
t_table_info: np.reshape(np.array(cfgs), [-1]),
}
dat_i = run_sess(sess, t_y, feed_dict=feed_dict)
dat_i = np.reshape(dat_i, [n, -1])
dats.append(dat_i)
dic_val[key] = dats
return dic_val
[docs] def plot_lines(self, x, dic1, dic2=None):
r"""Plot lines to see accuracy."""
for key in dic1.keys():
val1 = dic1[key]
if dic2 is None:
val2 = dic1[key]
else:
val2 = dic2[key]
#
if isinstance(val1, list):
for ii in range(len(val1)):
val1_i = val1[ii]
val2_i = val2[ii]
nc = np.shape(val1_i)[1]
[docs] def build_map_coef(self, cfgs, x, ys, grads, grad_grads, Nr, Nc, rank=4):
r"""Build mapping table coefficient
cfgs: cfg list
cfg = x0, x1, dx.
coef2:
a x + b = y
/ b = y0
\ a = (y1 - y0) / L
coef4:
a x^3 + b x^2 + c x + d = y:
/ d = y0
| c = y0'
| b = (3 y1 - dx dy' - 2dx y0' - 3y0) / dx^2
\ a = (dx y1' - 2 y1 + dx y0' + 2 y0) / dx^3
"""
def cal_coef2(cfg, x, y, dy):
x = np.reshape(x, [-1])
coefs = []
for cfg in cfgs:
x0, x1, dx, N0, N1 = cfg
Nd = N1 - N0
idx = np.logical_and(
x >= x0,
x <= x1,
np.abs((x - x0) - np.floor((x - x0) / dx) * dx) < 1e-4,
)
y0 = y[idx][:-1]
y1 = y[idx][1:]
y0 = y0[:Nd]
y1 = y1[:Nd]
a = (y1 - y0) / dx
b = y0
coef = np.concatenate([a, b])
coef = np.transpose(np.reshape(coef, [2, -1]))
coefs.append(coef)
coefs = np.concatenate(coefs)
return coefs
def cal_coef4(cfg, x, y, dy):
x = np.reshape(x, [-1])
coefs = []
for cfg in cfgs:
x0, x1, dx, N0, N1 = cfg
Nd = N1 - N0
diff_x = np.abs((x - x0) - np.round((x - x0) / dx) * dx)
idx = np.logical_and(np.logical_and(x >= x0, x <= x1), diff_x < 1.0e-4)
y0 = y[idx][:-1]
y1 = y[idx][1:]
dy0 = dy[idx][:-1]
dy1 = dy[idx][1:]
y0 = y0[:Nd]
y1 = y1[:Nd]
dy0 = dy0[:Nd]
dy1 = dy1[:Nd]
#
a = (dx * dy1 - 2 * y1 + dx * dy0 + 2 * y0) / dx**3
b = (3 * y1 - dx * dy1 - 2 * dx * dy0 - 3 * y0) / dx**2
c = dy0
d = y0
coef = np.concatenate([a, b, c, d])
coef = np.transpose(np.reshape(coef, [4, -1]))
coefs.append(coef)
coefs = np.concatenate(coefs)
return coefs
#
cal_coef = cal_coef4 if (rank == 4) else cal_coef2
coefs = []
coef_grads = []
for ii in range(Nr):
y_i = ys[ii]
grad_i = grads[ii]
grad_grad_i = grad_grads[ii]
#
coef_i = []
coef_grad_i = []
for jj in range(Nc):
y_ij = y_i[:, jj]
grad_ij = grad_i[:, jj]
grad_grad_ij = grad_grad_i[:, jj]
coef_ij = cal_coef(cfgs, x, y_ij, grad_ij)
coef_grad_ij = cal_coef(cfgs, x, grad_ij, grad_grad_ij)
coef_i.append(coef_ij)
coef_grad_i.append(coef_grad_ij)
coef_i = np.concatenate(coef_i, axis=1)
coef_grad_i = np.concatenate(coef_grad_i, axis=1)
coefs.append(coef_i)
coef_grads.append(coef_grad_i)
return coefs, coef_grads
[docs] def build_grad(self, x, y, Nr, Nc):
r""": Build gradient of tensor y of x."""
grads = []
grad_grads = []
for ii in range(Nr):
y_i = y[ii]
grad_i = []
grad_grad_i = []
for jj in range(Nc):
y_ij = y_i[:, jj]
grad_ij = tf.gradients(y_ij, x)[0]
grad_grad_ij = tf.gradients(grad_ij, x)[0]
grad_i.append(grad_ij)
grad_grad_i.append(grad_grad_ij)
grad_i = tf.concat(grad_i, axis=1)
grad_grad_i = tf.concat(grad_grad_i, axis=1)
grads.append(grad_i)
grad_grads.append(grad_grad_i)
return grads, grad_grads
[docs] def build_u2s(self, r2):
r"""Build tensor s, s=s(r2)."""
rmin = nvnmd_cfg.dscp["rcut_smth"]
rmax = nvnmd_cfg.dscp["rcut"]
ntype = nvnmd_cfg.dscp["ntype"]
if "train_attr.min_nbor_dist" in nvnmd_cfg.weight.keys():
min_dist = nvnmd_cfg.weight["train_attr.min_nbor_dist"]
else:
min_dist = rmin
min_dist = 0.5 if (min_dist > 0.5) else (min_dist - 0.1)
#
avg, std = get_normalize(nvnmd_cfg.weight)
avg, std = np.float64(avg), np.float64(std)
r = tf.sqrt(r2)
r_ = tf.clip_by_value(r, rmin, rmax)
r__ = tf.clip_by_value(r, min_dist, rmax)
uu = (r_ - rmin) / (rmax - rmin)
vv = uu * uu * uu * (-6 * uu * uu + 15 * uu - 10) + 1
sl = []
hl = []
for tt in range(ntype):
s = vv / r__
h = s / r__
s = tf.reshape(s, [-1, 1])
h = tf.reshape(h, [-1, 1])
s = (s - avg[tt, 0]) / std[tt, 0]
h = h / std[tt, 1]
sl.append(s)
hl.append(h)
return sl, hl
[docs] def build_u2s_grad(self):
r"""Build gradient of s with respect to u (r^2)."""
ntype = nvnmd_cfg.dscp["ntype"]
#
dic_ph = {}
dic_ph["u"] = tf.placeholder(tf.float64, [None, 1], "t_u")
dic_ph["s"], dic_ph["h"] = self.build_u2s(dic_ph["u"])
dic_ph["s_grad"], dic_ph["s_grad_grad"] = self.build_grad(
dic_ph["u"], dic_ph["s"], ntype, 1
)
dic_ph["h_grad"], dic_ph["h_grad_grad"] = self.build_grad(
dic_ph["u"], dic_ph["h"], ntype, 1
)
return dic_ph
[docs] def run_u2s(self):
r"""Build u->s graph and run it to get value of mapping table."""
# ntypex = nvnmd_cfg.dscp['ntypex']
ntype = nvnmd_cfg.dscp["ntype"]
avg, std = get_normalize(nvnmd_cfg.weight)
avg, std = np.float64(avg), np.float64(std)
rc_max = nvnmd_cfg.dscp["rc_max"]
tf.reset_default_graph()
dic_ph = self.build_u2s_grad()
sess = get_sess()
# N = NUM_MAPT
N = 512
N2 = int(rc_max**2)
# N+1 ranther than N for calculating defference
keys = list(dic_ph.keys())
vals = list(dic_ph.values())
u = N2 * np.reshape(np.arange(0, N + 1) / N, [-1, 1])
res_lst = run_sess(sess, vals, feed_dict={dic_ph["u"]: u})
res_dic = dict(zip(keys, res_lst))
u2 = N2 * np.reshape(np.arange(0, N * 16 + 1) / (N * 16), [-1, 1])
res_lst2 = run_sess(sess, vals, feed_dict={dic_ph["u"]: u2})
res_dic2 = dict(zip(keys, res_lst2)) # reference for commpare
# change value
for tt in range(ntype):
res_dic["s"][tt][0] = -avg[tt, 0] / std[tt, 0]
res_dic["s_grad"][tt][0] = 0
res_dic["s_grad_grad"][tt][0] = 0
res_dic["h"][tt][0] = 0
res_dic["h_grad"][tt][0] = 0
res_dic["h_grad_grad"][tt][0] = 0
#
res_dic2["s"][tt][0] = -avg[tt, 0] / std[tt, 0]
res_dic2["s_grad"][tt][0] = 0
res_dic2["s_grad_grad"][tt][0] = 0
res_dic2["h"][tt][0] = 0
res_dic2["h_grad"][tt][0] = 0
res_dic2["h_grad_grad"][tt][0] = 0
sess.close()
return res_dic, res_dic2
[docs] def build_s2g(self, s):
r"""Build s->G
s is switch function
G is embedding net output.
"""
ntypex = nvnmd_cfg.dscp["ntypex"]
ntype = nvnmd_cfg.dscp["ntype"]
activation_fn = tf.tanh
outputs_size = nvnmd_cfg.dscp["NNODE_FEAS"]
xyz_scatters = []
for tt in range(ntypex):
for tt2 in range(ntype):
xyz_scatter = s
for ll in range(1, len(outputs_size)):
w, b = get_filter_weight(nvnmd_cfg.weight, tt, tt2, ll)
w, b = np.float64(w), np.float64(b)
if outputs_size[ll] == outputs_size[ll - 1]:
xyz_scatter += activation_fn(tf.matmul(xyz_scatter, w) + b)
elif outputs_size[ll] == outputs_size[ll - 1] * 2:
xyz_scatter = tf.concat(
[xyz_scatter, xyz_scatter], 1
) + activation_fn(tf.matmul(xyz_scatter, w) + b)
else:
xyz_scatter = activation_fn(tf.matmul(xyz_scatter, w) + b)
xyz_scatters.append(xyz_scatter)
return xyz_scatters
[docs] def build_s2g_grad(self):
r"""Build gradient of G with respect to s."""
ntypex = nvnmd_cfg.dscp["ntypex"]
ntype = nvnmd_cfg.dscp["ntype"]
M1 = nvnmd_cfg.dscp["M1"]
#
dic_ph = {}
dic_ph["s"] = tf.placeholder(tf.float64, [None, 1], "t_s")
dic_ph["g"] = self.build_s2g(dic_ph["s"])
dic_ph["g_grad"], dic_ph["g_grad_grad"] = self.build_grad(
dic_ph["s"], dic_ph["g"], ntypex * ntype, M1
)
return dic_ph
[docs] def run_s2g(self):
r"""Build s-> graph and run it to get value of mapping table."""
smin = nvnmd_cfg.dscp["smin"]
smax = nvnmd_cfg.dscp["smax"]
tf.reset_default_graph()
dic_ph = self.build_s2g_grad()
sess = get_sess()
N = 4096
N2 = 16
log.info(f"the range of s is [{smin}, {smax}]")
# check
if (smax - smin) > 16.0:
log.warning("the range of s is over the limit (smax - smin) > 16.0")
prec = N / N2
smin_ = np.floor(smin * prec - 1) / prec
#
keys = list(dic_ph.keys())
vals = list(dic_ph.values())
s = N2 * np.reshape(np.arange(0, N + 1) / N, [-1, 1]) + smin_
res_lst = run_sess(sess, vals, feed_dict={dic_ph["s"]: s})
res_dic = dict(zip(keys, res_lst))
s2 = N2 * np.reshape(np.arange(0, N * 16 + 1) / (N * 16), [-1, 1]) + smin_
res_lst2 = run_sess(sess, vals, feed_dict={dic_ph["s"]: s2})
res_dic2 = dict(zip(keys, res_lst2))
sess.close()
return res_dic, res_dic2
[docs]def mapt(
*,
nvnmd_config: Optional[str] = "nvnmd/config.npy",
nvnmd_weight: Optional[str] = "nvnmd/weight.npy",
nvnmd_map: Optional[str] = "nvnmd/map.npy",
**kwargs,
):
# build mapping table
mapObj = MapTable(nvnmd_config, nvnmd_weight, nvnmd_map)
mapObj.build_map()