Source code for atomdnn.network

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import backend as K
import time
import os
import atomdnn
from atomdnn.data import *
import matplotlib.pyplot as plt
import matplotlib
from atomdnn.descriptor import create_descriptors, get_num_fingerprints
import shutil
import math

if atomdnn.compute_force:
    input_signature_dict = [{"fingerprints": tf.TensorSpec(shape=[None,None,None], dtype=atomdnn.data_type, name="fingerprints"),
                    "atom_type": tf.TensorSpec(shape=[None,None], dtype=tf.int32, name="atom_type"),
                    "dGdr": tf.TensorSpec(shape=[None,None,None,None], dtype=atomdnn.data_type, name="dgdr"),
                    "center_atom_id": tf.TensorSpec(shape=[None,None], dtype=tf.int32, name="center_atom_id"),
                    "neighbor_atom_id": tf.TensorSpec(shape=[None,None], dtype=tf.int32, name="neighbor_atom_id"),
                    "neighbor_atom_coord": tf.TensorSpec(shape=[None,None,None,None], dtype=atomdnn.data_type, name="neighbor_atom_coord")}]
else:
    input_signature_dict = [{"fingerprints": tf.TensorSpec(shape=[None,None,None], dtype=atomdnn.data_type, name="fingerprints"),
                         "atom_type": tf.TensorSpec(shape=[None,None], dtype=tf.int32, name="atom_type")}]






  
[docs]class Network(tf.Module): """ The nueral network is built based on tf.Module. Args: elements(python list): list of element in the system, e.g. [C,O,H] descriptor(dictionary): descriptor parameters arch: network architecture, e.g. [10,10,10], 3 dense layers each with 10 neurons activation_function: any avialiable Tensorflow activation function, e.g. 'relu' weighs_initializer: the way to initialize weights, default one is tf.random.normal bias_initializer: the way to initialize bias, default one is tf.zeros import_dir: the directory of a saved model to be loaded """ def __init__(self, elements=None, descriptor=None, arch=None, \ activation_function=None, weights_initializer=tf.random.normal, bias_initializer=tf.zeros, import_dir=None): """ Initialize Network object. """ super(Network,self).__init__() if import_dir: imported = tf.saved_model.load(import_dir) self.inflate_from_file(imported) else: if arch: self.arch = arch else: self.arch = [10] print ('network arch is set to [10] by default.') if activation_function: self.activation_function = activation_function else: self.activation_function = 'tanh' print ('activation function is set to tanh by default.') self.tf_activation_function = tf.keras.activations.get(self.activation_function) if descriptor is not None: self.descriptor = descriptor else: raise ValueError ('Network has no descriptor input.') if elements is not None: self.elements = elements else: raise ValueError ('Network has no elements input.') self.num_fingerprints = get_num_fingerprints(descriptor,elements) self.data_type = atomdnn.data_type self.weights_initializer = weights_initializer self.bias_initializer = bias_initializer self.scaling = None self.validation = False self.built = False self.params = [] # layer and node parameters # training parameter self.loss_fun=None # string self.optimizer=None # string self.tf_optimizer=None self.lr=None self.saved_descriptor = {} for key, value in descriptor.items(): self.saved_descriptor[key] = tf.Variable(value) self.saved_num_fingerprints = tf.Variable(self.num_fingerprints) self.saved_arch = tf.Variable(self.arch) self.saved_activation_function = tf.Variable(self.activation_function) self.saved_data_type = tf.Variable(self.data_type) self.saved_elements = tf.Variable(self.elements)
[docs] def inflate_from_file(self, imported): ''' Inflate network object from a SavedModel. Args: imported: a saved tensorflow neural network model ''' self.built = True self.arch = imported.saved_arch.numpy() self.num_fingerprints = imported.saved_num_fingerprints.numpy() self.descriptor = {} for key, value in imported.saved_descriptor.items(): value = value.numpy() if isinstance(value, bytes): value = value.decode() self.descriptor[key] = value if hasattr(imported,'scaling'): self.scaling = imported.scaling self.scaling_factor = imported.scaling_factor else: self.scaling = None self.activation_function = imported.saved_activation_function.numpy().decode() self.tf_activation_function = tf.keras.activations.get(self.activation_function) self.data_type = imported.saved_data_type.numpy().decode() self.elements = [imported.saved_elements.numpy()[i].decode() for i in range(imported.saved_elements.shape[0])] self.loss_fun = imported.saved_loss_fun.numpy().decode() self.optimizer = imported.saved_optimizer.numpy().decode() self.tf_optimizer = tf.keras.optimizers.get(self.optimizer) if hasattr(imported,'saved_decay'): self.lr_history = self.saved_lr_history.numpy() self.lr = self.lr_history[-1] else: self.lr = imported.saved_lr.numpy() K.set_value(self.tf_optimizer.learning_rate, self.lr) self.train_loss = imported.train_loss self.val_loss = imported.val_loss self.loss_weights = imported.loss_weights try: self.params = [] for param in imported.params: self.params.append(param) except AttributeError: raise AttributeError('imported_object does not have params attribute.') print('Network has been inflated! self.built:',self.built)
@tf.function(input_signature=input_signature_dict) def __call__(self, x): """ This is called when evaluate the network using C API for LAMMPS. """ if len(x)==0: print('__call__ method - input is empty.') return {'pe':None,'force': None, 'stress':None} else: return self._predict(x) def _build(self): """ Initialize the weigths and biases. """ # Declare layer-wise weights and biases for element in self.elements: self.W1 = tf.Variable( self.weights_initializer(shape=(self.num_fingerprints,self. arch[0]), dtype=self.data_type)) self.b1 = tf.Variable(self.bias_initializer(shape=(1,self.arch[0]), dtype=self.data_type)) self.params.append(self.W1) self.params.append(self.b1) nlayer = 1 for nneuron_layer in self.arch[1:]: self.params.append(tf.Variable(self.weights_initializer(shape=(self.arch[nlayer], self.arch[nlayer-1]), dtype=self.data_type))) self.params.append(tf.Variable(self.bias_initializer([1, self.arch[nlayer]], dtype=self.data_type))) nlayer+=1 self.params.append(tf.Variable(self.weights_initializer(shape=(self.arch[-1], 1), dtype=self.data_type))) self.params.append(tf.Variable(self.bias_initializer([1,], dtype=self.data_type)))
[docs] def compute_pe (self, fingerprints, atom_type): ''' Forward pass of the network to compute potential energy. Parallel networks are used for multiple atom types. Args: fingerprints: 3D array *[batch_size,atom_num,fingerprints_num]* atom_type: 2D array *[batch_size, atom_num]* Returns: total potential energy and per-atom potential energy ''' nimages = tf.shape(atom_type)[0] natoms = tf.shape(atom_type)[1] atom_pe = tf.zeros([nimages,natoms,1],dtype=self.data_type) nlayer = len(self.arch)+1 # include one linear layer at the end nelements = len(self.elements) for i in range(nelements): type_id = i + 1 type_array = tf.ones([nimages,natoms],dtype='int32')*type_id type_mask = tf.cast(tf.math.equal(atom_type,type_array),dtype=self.data_type) type_onehot = tf.linalg.diag(type_mask) fingerprints_i = tf.matmul(type_onehot,fingerprints) # first dense layer Z = tf.matmul(fingerprints_i,self.params[nlayer*2*i]) + self.params[nlayer*2*i+1] Z = self.tf_activation_function(Z) # sequential dense layer for j in range(1,nlayer-1): Z = tf.matmul(Z,self.params[nlayer*2*i+j*2]) + self.params[nlayer*2*i+j*2+1] Z = self.tf_activation_function(Z) # linear layer Z = tf.matmul(Z, self.params[nlayer*2*(i+1)-2]) + self.params[nlayer*2*(i+1)-1] # apply the mask mask = tf.reshape(type_mask,[nimages,natoms,1]) atom_pe += Z * mask total_pe = tf.reshape(tf.math.reduce_sum(atom_pe, axis=1),[nimages]) return total_pe, atom_pe
[docs] def compute_force_stress (self, dEdG, input_dict): """ Compute force and stress. Args: dEdG: derivatives of per-atom potential energy w.r.t. descriptors input_dict: input dictionary data Returns: force per atom and stress tensor """ if self.scaling=='std': dEdG = dEdG/self.scaling_factor[1] elif self.scaling=='norm': dEdG = dEdG/(self.scaling_factor[1] - self.scaling_factor[0]) dGdr = input_dict['dGdr'] if not tf.is_tensor(dGdr): tf.convert_to_tensor(dGdr,dtype=self.data_type) center_atom_id = input_dict['center_atom_id'] if not tf.is_tensor(center_atom_id): tf.convert_to_tensor(center_atom_id,dtype='int32') neighbor_atom_id = input_dict['neighbor_atom_id'] if not tf.is_tensor(neighbor_atom_id): tf.convert_to_tensor(neighbor_atom_id,dtype='int32') neighbor_atom_coord = input_dict['neighbor_atom_coord'] if not tf.is_tensor(neighbor_atom_coord): tf.convert_to_tensor(neighbor_atom_coord,dtype=self.data_type) num_image = tf.shape(center_atom_id)[0] max_block = tf.shape(center_atom_id)[1] num_atoms = len(input_dict['fingerprints'][0]) # form dEdG matrix corresponding to the blocks in dGdr data file center_atom_id_reshape = tf.reshape(center_atom_id,shape=[num_image,-1,1]) dEdG_block = tf.gather_nd(dEdG,center_atom_id_reshape,batch_dims=1) dEdG_block = tf.reshape(dEdG_block, [num_image,max_block,1,self.num_fingerprints]) # compute force force_block = tf.matmul(dEdG_block, dGdr) force_block_reshape = tf.reshape(force_block,[num_image,max_block,3]) neighbor_onehot = tf.one_hot(neighbor_atom_id, depth=num_atoms,axis=1, dtype=self.data_type) force = -tf.matmul(neighbor_onehot,force_block_reshape) # compute stress stress_block = tf.reshape(tf.matmul(neighbor_atom_coord,force_block),[num_image,max_block,9]) stress = tf.reduce_sum(stress_block,axis=1) stress = tf.math.divide(stress,input_dict['volume']) * atomdnn.stress_unit_convert return force, stress
def _compute_force_stress (self, dEdG, input_dict): """ Only used for __call__ method and C API. The same as compute_force_stress but return force and stress for derivative pairs. """ if self.scaling is not None: if tf.math.equal(self.scaling, 'std'): dEdG = dEdG/self.scaling_factor[1] elif tf.math.equal(self.scaling, 'norm'): dEdG = dEdG/(self.scaling_factor[1] - self.scaling_factor[0]) fingerprints = input_dict['fingerprints'] dGdr = input_dict['dGdr'] center_atom_id = input_dict['center_atom_id'] neighbor_atom_coord = input_dict['neighbor_atom_coord'] num_image = tf.shape(center_atom_id)[0] max_block = tf.shape(center_atom_id)[1] num_atoms = tf.shape(fingerprints)[1] # form dEdG matrix corresponding to the blocks in dGdr data file center_atom_id_reshape = tf.reshape(center_atom_id,shape=[num_image,-1,1]) dEdG_block = tf.gather_nd(dEdG,center_atom_id_reshape,batch_dims=1) dEdG_block = tf.reshape(dEdG_block, [num_image,max_block,1,self.num_fingerprints]) # compute force block force_block = -tf.matmul(dEdG_block, dGdr) # compute stress block stress_block = tf.reshape(tf.matmul(neighbor_atom_coord,force_block),[num_image,max_block,9]) return force_block, stress_block
[docs] def predict(self, input_dict,compute_force=atomdnn.compute_force,pe_peratom=False, training=False): """ Predict energy, force and stress. Args: input_dict: input dictionary data compute_force(bool): True to compute force and stress pe_peratom(bool): True to output per atom potential energy training(bool): True when used during training Returns: dictionary: potential energy, force and stress """ if not self.built: self._build() self.built = True fingerprints = input_dict['fingerprints'] if self.num_fingerprints!=len(fingerprints[0][0]): raise ValueError('Network and inputdata have different number of fingerprints, check descriptor parameters.') if not tf.is_tensor(fingerprints): fingerprints = tf.convert_to_tensor(fingerprints) if not training: if self.scaling == 'std': # standardize with the mean and deviation fingerprints = (fingerprints - self.scaling_factor[0])/self.scaling_factor[1] elif self.scaling == 'norm': # normalize with the minimum and maximum fingerprints = (fingerprints - self.scaling_factor[0])/(self.scaling_factor[1] - self.scaling_factor[0]) if not compute_force: total_pe, atom_pe = self.compute_pe(fingerprints,input_dict['atom_type']) if training: return {'pe':total_pe} else: return {'pe':total_pe.numpy()} else: with tf.GradientTape() as dEdG_tape: dEdG_tape.watch(fingerprints) total_pe, atom_pe = self.compute_pe(fingerprints,input_dict['atom_type']) dEdG = dEdG_tape.gradient(atom_pe, fingerprints) force, stress = self.compute_force_stress(dEdG, input_dict) if training: return {'pe':total_pe, 'force':force, 'stress':stress} else: if not pe_peratom: return {'pe':total_pe.numpy(), 'force':force.numpy(), 'stress':stress.numpy()} else: return {'pe':total_pe.numpy(), 'pe_peratom':atom_pe.numpy(), 'force':force.numpy(), 'stress':stress.numpy()}
[docs] def inference(self, filename, format, **kwargs): """ Predict potential energy, force and stress directly from one atomic structure input file. This function first computes descriptors and then call predict function. Arg: filename: name of the atomic structure input file format: 'lammp-data','extxyz','vasp' etc. See complete list on https://wiki.fysik.dtu.dk/ase/ase/io/io.html#ase.io.read kwargs: used to pass optional file styles """ atomdnn_data = create_descriptors(self.elements,filename,self.descriptor,format,descriptors_path='./descriptor_temp',descriptor_filename='dump_fp.temp', der_filename='dump_der.temp',silent=True,**kwargs) shutil.rmtree('./descriptor_temp') print(self.predict(atomdnn_data.get_input_dict(), pe_peratom=True))
def _predict(self, input_dict,compute_force=atomdnn.compute_force): """ Only used for __call__ method and C API. Same as :func:`~atomdnn.network.Network.predict`, but returns per-atom potential energy, force and stress for derivative pairs """ fingerprints = input_dict['fingerprints'] if self.scaling is not None: if tf.math.equal(self.scaling,'std'): # standardize with the mean and deviation fingerprints = (fingerprints - self.scaling_factor[0])/self.scaling_factor[1] elif tf.math.equal(self.scaling,'norm'): # normalize with the minimum and maximum fingerprints = (fingerprints - self.scaling_factor[0])/(self.scaling_factor[1] - self.scaling_factor[0]) if not compute_force: total_pe, atom_pe = self.compute_pe(fingerprints,input_dict['atom_type']) return {'pe':atom_pe} else: with tf.GradientTape() as dEdG_tape: dEdG_tape.watch(fingerprints) total_pe, atom_pe = self.compute_pe(fingerprints,input_dict['atom_type']) dEdG = dEdG_tape.gradient(atom_pe, fingerprints) force_block, stress_block = self._compute_force_stress(dEdG, input_dict) return {'atom_pe':atom_pe, 'force':force_block, 'stress':stress_block}
[docs] def evaluate(self,dataset,batch_size=None,return_prediction=True,compute_force=atomdnn.compute_force): """ Do evaluation on trained model. Args: dataset: tensorflow dataset used for evaluation batch_size: default is total data size return_prediction(bool): True for returning prediction resutls compute_force(bool): True for computing force and stress """ if batch_size is None: batch_size = dataset.cardinality().numpy() for step, (input_dict, output_dict) in enumerate(dataset.batch(batch_size)): y_predict = self.predict(input_dict, compute_force=compute_force) batch_loss = self.loss(output_dict,y_predict) if step==0: eval_loss = batch_loss else: for key in batch_loss: eval_loss[key] = batch_loss[key] + eval_loss[key] for key in batch_loss: eval_loss[key] = eval_loss[key]/(step+1) print('Evaluation loss is:') for key in eval_loss: print('%15s: %15.4e' % (key, eval_loss[key])) if 'total_loss' in eval_loss: print('The total loss is computed using the loss weights', *['- %s: %.2f' % (key,value.numpy()) for key, value in self.loss_weights.items()]) if return_prediction: print('\nThe prediction is returned.') return y_predict
[docs] def loss_function (self,true, pred): """ Build-in tensorflow loss functions can be used. Customized loss function can be also defined here. Args: true (tensorflow tensor): true output data pred (tensorflow tensor): predicted results """ if self.loss_fun == 'rmse': tf_loss_fun = tf.keras.losses.get('mse') return tf.sqrt(tf.maximum(tf_loss_fun(true,pred), 1e-9)) else: tf_loss_fun = tf.keras.losses.get(self.loss_fun) return tf_loss_fun(true, pred)
[docs] def loss(self, true_dict, pred_dict, training=False, validation=False): """ Compute losses for energy, force and stress. Args: true_dict(dictionary): dictionary of outputs data pred_dict(dictionary): dictionary of predicted results training(bool): True for loss calculation during training validation(bool): True for loss calculation during validation Returns: if training is true, return total_loss and loss_dict(loss dictionary), otherwise(for evaluation) return loss_dict """ loss_dict={} loss_dict['pe_loss'] = self.loss_function(true_dict['pe'], pred_dict['pe']) if 'force' in pred_dict and 'force' in true_dict: # compute force loss # when evaluation OR train/validation using force OR all losses are requested if ((not training and not validation) or ((training or validation) and self.loss_weights['force']!=0)) or self.compute_all_loss: loss_dict['force_loss'] = tf.reduce_mean(self.loss_function(true_dict['force'],pred_dict['force'])) if 'stress' in pred_dict and 'stress' in true_dict: # compute stress loss # when evaluation OR train/validation using stress OR all losses are requested if ((not training and not validation) or ((training or validation) and (self.loss_weights['stress']!=0))) or self.compute_all_loss: indices = [[[0],[4],[8],[1],[2],[5]]] * len(pred_dict['stress']) # select upper triangle elements (pxx,pyy,pzz,pxy,pxz,pyz) of the stress tensor stress = tf.gather_nd(pred_dict['stress'], indices=indices, batch_dims=1) loss_dict['stress_loss'] = tf.reduce_mean(self.loss_function(true_dict['stress'],stress)) if self.loss_weights['force']==0 and self.loss_weights['stress']==0: # only pe used for training total_loss = loss_dict['pe_loss'] elif self.loss_weights['force']!=0 and self.loss_weights['stress']!=0: # pe, force and stress used for training total_loss = loss_dict['pe_loss'] * self.loss_weights['pe'] + loss_dict['force_loss'] * self.loss_weights['force']\ + loss_dict['stress_loss'] * self.loss_weights['stress'] loss_dict['total_loss'] = total_loss elif self.loss_weights['force']!=0: # pe and force used for training total_loss = loss_dict['pe_loss'] * self.loss_weights['pe'] + loss_dict['force_loss'] * self.loss_weights['force'] loss_dict['total_loss'] = total_loss elif self.loss_weights['stress']!=0: # pe and stress used for training total_loss = loss_dict['pe_loss'] * self.loss_weights['pe'] + loss_dict['stress_loss'] * self.loss_weights['stress'] loss_dict['total_loss'] = total_loss else: raise ValueError('loss_weights is not set correctly.') if training: return total_loss,loss_dict else: return loss_dict
def decay_lr(self,epoch): self.lr = self.decay['initial_lr'] * math.pow(self.decay['decay_rate'],np.floor((1+epoch)/self.decay['decay_steps'])) K.set_value(self.tf_optimizer.learning_rate, self.lr) self.lr_history.append(self.lr)
[docs] def train_step(self, input_dict, output_dict): """ A single training step. Args: input_dict: input dictionary data output_dict: output dictionary data Returns: loss dictionary """ with tf.GradientTape() as tape: # automatically watch all tensorflow variables if self.loss_weights['force']==0 and self.loss_weights['stress']==0 and not self.compute_all_loss: pred_dict = self.predict(input_dict, training=True,compute_force=False) else: pred_dict = self.predict(input_dict, training=True,compute_force=True) total_loss,loss_dict = self.loss(output_dict, pred_dict,training=True) grads = tape.gradient(total_loss, self.params) self.tf_optimizer.apply_gradients(zip(grads, self.params)) return loss_dict
[docs] def validation_step(self, input_dict, output_dict): """ A single validation step. Args: input_dict: input dictionary data output_dict: output dictionary data Returns: loss dictionary """ if self.loss_weights['force']==0 and self.loss_weights['stress']==0 and not self.compute_all_loss: pred_dict = self.predict(input_dict,training=True,compute_force=False) else: pred_dict = self.predict(input_dict,training=True,compute_force=True) loss_dict = self.loss(output_dict, pred_dict,validation=True) return loss_dict
[docs] def compute_scaling_factors(self,train_dataset): """ Compute scaling factors using training dataset. """ self.scaling_factor = [] fingerprints = get_input_dict(train_dataset)['fingerprints'] if self.scaling == 'std': self.scaling_factor.append(tf.math.reduce_mean(fingerprints,axis=[0,1])) self.scaling_factor.append(tf.math.reduce_std(fingerprints,axis=[0,1])) if self.scaling == 'norm': self.scaling_factor.append(tf.math.reduce_min(fingerprints,axis=[0,1])) self.scaling_factor.append(tf.math.reduce_max(fingerprints,axis=[0,1])) self.scaling_factor = tf.Variable(self.scaling_factor) # convert to tf.Variable for saving
[docs] def scaling_dataset(self,dataset): """ Scaling a dataset with the scaling factors calculated with :func:`~atomdnn.network.Network.compute_scaling_factors`. """ def map_fun(input_dict,output_dict): if self.scaling == 'std': input_dict['fingerprints'] = (input_dict['fingerprints'] - self.scaling_factor[0])/self.scaling_factor[1] if self.scaling == 'norm': input_dict['fingerprints'] = (input_dict['fingerprints'] - self.scaling_factor[0])/(self.scaling_factor[1] - self.scaling_factor[0]) return input_dict,output_dict return dataset.map(map_fun)
[docs] def train(self, train_dataset, validation_dataset=None, early_stop=None, nepochs_checkpoint=None, scaling=None, batch_size=None, epochs=None, loss_fun=None, \ optimizer=None, lr=None, decay=None, loss_weights=None, compute_all_loss=False, shuffle=True, append_loss=False): """ Train the neural network. Args: train_dataset(tfdataset): dataset used for training validation_dataset(tfdataset): dataset used for validation early_stop(dictionary): condition for stop training, \ e.g. {'train_loss':[0.01,3]} means stop when train loss is less than 0.01 for 3 times scaling(string): = None, 'std' or 'norm' batch_size: the training batch size epochs: training epochs loss_fun(string): loss function, 'mae', 'mse', 'rmse' and others opimizer(string): any Tensorflow optimizer such as 'Adam' lr(float): learning rate, it is ignored if decay is provided decay(dictionary): parameters for exponentialDecay, keys are: 'initial_lr','decay_steps' and 'decay_rate' loss_weight(dictionary): weights assigned to loss function, e.g. {'pe':1,'force':1,'stress':0.1} compute_all_loss(bool): compute loss for force and stress even when they are not used for training shuffle(bool): shuffle training dataset during training append_loss(bool): append loss history """ if not self.built: self._build() self.built = True if scaling: if scaling != 'std' and scaling != 'norm': raise ValueError('Scaling needs to be \'std\' (standardization) or \'norm\'(normalization).') else: self.scaling = tf.Variable(scaling) else: self.scaling = None if decay: for key in {'initial_lr','decay_steps','decay_rate'}: if decay.get(key)==None: raise ValueError(key + ' is not in decay.') self.decay = decay if not hasattr(self,'lr_histroy'): self.lr_history = [] self.lr = decay['initial_lr'] print("Initial learning rate is set to %.3f."%self.lr) else: if lr: self.lr = lr print ("Learning rate is set to %.3f."%self.lr) elif not hasattr(self,'lr'): self.lr = 0.01 print ("Learning rate is set to 0.01 by default.") if optimizer: self.optimizer = optimizer self.tf_optimizer = tf.keras.optimizers.get(optimizer) K.set_value(self.tf_optimizer.learning_rate, self.lr) elif not hasattr(self,'optimizer') and not hasattr(self,'tf_optimizer'): self.optimizer = 'Adam' self.tf_optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr) print ("optimizer is set to Adam by default.") if loss_fun: self.loss_fun = loss_fun elif self.loss_fun is not None: self.loss_fun = 'rmse' print ("loss_fun is set to rmse by default.") if not epochs: epochs = 1 print ("epochs is set to 1 by default.") if batch_size is None: batch_size = 30 print ("batch_size is set to 30 by default.") if early_stop: if 'val_loss' in early_stop and not validation_dataset: raise ValueError('No validation data for early stopping.') if not hasattr(self,'train_loss') or not append_loss: self.train_loss={} if validation_dataset and (not hasattr(self,'val_loss') or not append_loss): self.val_loss={} if loss_weights is None: self.loss_weights = {'pe':1,'force':0,'stress':0} print('loss_weights is set to default value:',self.loss_weights) else: self.loss_weights = loss_weights for key in self.loss_weights: self.loss_weights[key] = tf.Variable(loss_weights[key],dtype=self.data_type) if compute_all_loss: self.compute_all_loss=True else: self.compute_all_loss=False if self.loss_weights['pe']!=0 or self.compute_all_loss: if 'pe_loss' not in self.train_loss or not append_loss: self.train_loss['pe_loss']=[] if validation_dataset and ('pe_loss' not in self.val_loss or not append_loss): self.val_loss['pe_loss']=[] if self.loss_weights['force']!=0 or self.compute_all_loss: # when force is used for training/validation OR compute force_loss is requested if 'force_loss' not in self.train_loss or not append_loss: self.train_loss['force_loss']=[] if validation_dataset and ('force_loss' not in self.val_loss or not append_loss): self.val_loss['force_loss']=[] if self.loss_weights['force']!=0: print ("Forces are used for training.") else: print ("Forces are not used for training.") if self.loss_weights['stress']!=0 or self.compute_all_loss: # when stress is used for traning/validation OR compute stress_loss is requested if 'stress_loss' not in self.train_loss or not append_loss: self.train_loss['stress_loss']=[] if validation_dataset and ('stress_loss' not in self.val_loss or not append_loss): self.val_loss['stress_loss']=[] if self.loss_weights['stress']!=0: print ("Stresses are used for training.") else: print ("Stresses are not used for training.") if self.loss_weights['force']!=0 or self.loss_weights['stress']!=0 or self.compute_all_loss: if 'total_loss' not in self.train_loss or not append_loss: self.train_loss['total_loss']=[] if validation_dataset and ('total_loss' not in self.val_loss or not append_loss): self.val_loss['total_loss']=[] # convert to tf.variable so that they can be saved to network self.saved_optimizer = tf.Variable(self.optimizer) self.saved_loss_fun = tf.Variable(self.loss_fun) if decay: self.saved_decay = {} for key, value in decay.items(): self.saved_decay[key] = tf.Variable(value) self.saved_lr_history = tf.Variable(self.lr_history) else: self.saved_lr = tf.Variable(self.lr) # normalize or starndardize input data if self.scaling: self.compute_scaling_factors(train_dataset) print('Scaling factors are computed using training dataset.') train_dataset = self.scaling_dataset(train_dataset) print('Training dataset are %s.' % ('standardized' if self.scaling =='std' else 'normalized')) if validation_dataset: validation_dataset = self.scaling_dataset(validation_dataset) print('Validation dataset are %s.' % ('standardized' if self.scaling =='std' else 'normalized')) if shuffle: train_dataset = train_dataset.shuffle(buffer_size=train_dataset.cardinality().numpy()) print('Training dataset will be shuffled during training.') train_start_time = time.time() early_stop_repeats = 0 # start training for epoch in range(epochs): epoch_start_time = time.time() # iterate over the batches of the training dataset, step traces batch for step, (input_dict, output_dict) in enumerate(train_dataset.batch(batch_size)): batch_loss = self.train_step(input_dict, output_dict) if step==0: train_epoch_loss = batch_loss else: for key in batch_loss: train_epoch_loss[key] = batch_loss[key] + train_epoch_loss[key] for key in batch_loss: train_epoch_loss[key] = train_epoch_loss[key]/(step+1) self.train_loss[key].append(tf.Variable(train_epoch_loss[key])) # Iterate over the batches of the validation dataset if validation_dataset is not None: for step, (input_dict, output_dict) in enumerate(validation_dataset.batch(batch_size)): batch_loss = self.validation_step(input_dict, output_dict) if step==0: val_epoch_loss = batch_loss else: for key in batch_loss: val_epoch_loss[key] = batch_loss[key] + val_epoch_loss[key] for key in batch_loss: val_epoch_loss[key] = val_epoch_loss[key]/(step+1) self.val_loss[key].append(tf.Variable(val_epoch_loss[key])) epoch_end_time = time.time() time_per_epoch = (epoch_end_time - epoch_start_time) print('\n===> Epoch %i/%i - %.3fs/epoch' % (epoch+1, epochs, time_per_epoch)) print(' training_loss ',*["- %s: %5.3f" % (key,value) for key,value in train_epoch_loss.items()]) if validation_dataset: print(' validation_loss ',*["- %s: %5.3f" % (key,value) for key,value in val_epoch_loss.items()]) if early_stop: if 'train_loss' in early_stop: if 'total_loss' in train_epoch_loss: if train_epoch_loss['total_loss']<=early_stop['train_loss'][0]: early_stop_repeats += 1 else: early_stop_repeats = 0 elif 'pe_loss' in train_epoch_loss: if train_epoch_loss['pe_loss']<=early_stop['train_loss'][0]: early_stop_repeats += 1 else: early_stop_repeats = 0 if early_stop_repeats == early_stop['train_loss'][1]: print('\nTraining is stopped when train_loss <= %.3f for %i time.'%(early_stop['train_loss'][0],early_stop['train_loss'][1])) break elif 'val_loss' in early_stop: if 'total_loss' in val_epoch_loss: if val_epoch_loss['total_loss']<=early_stop['val_loss'][0]: early_stop_repeats += 1 else: early_stop_repeats = 0 elif 'pe_loss' in val_epoch_loss: if val_epoch_loss['pe_loss']<=early_stop['val_loss'][0]: early_stop_repeats += 1 else: early_stop_repeats = 0 if early_stop_repeats == early_stop['val_loss'][1]: print('\nTraining is stopped when val_loss <= %.3f for %i time.'%(early_stop['val_loss'][0],early_stop['val_loss'][1])) break if nepochs_checkpoint: if (epoch+1) % nepochs_checkpoint ==0: save(self, 'saved_model_at_epoch'+str(epoch+1)) self.plot_loss(saveplot=True) if decay: if (epoch+1) % decay['decay_steps']==0 and (epoch+1)<epochs: self.decay_lr(epoch) print('\nLearning rate is decreased to %.3f'% self.lr) elapsed_time = (epoch_end_time - train_start_time) print('\nEnd of training, elapsed time: ',time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))
[docs] def plot_loss(self,start_epoch=1,saveplot=False,**kwargs): """ Plot the losses. Args: start_epoch: plotting starts from start_epoch figsize: set the fingersize, e.g. (8,4) saveplot(bool): if true, save the plots to "plot_loss" folder kwargs: optional parameters for figures, default values are: figfolder = './loss_figures', the folder name for saving figures figsize = (8,5) linewidth = [1,1] for train and validation loss plot color =['blue','darkgreen'] label = ['train loss','validation loss'] linestyle = ['-','-'] markersize = [5,5] xlabel = 'epoch' ylabel = {'pe':'loss(eV)', 'force':'loss(eV/A),'stress':'loss(GPa)'} format = 'pdf' """ if 'figfolder' in kwargs.keys(): figfolder = kwargs['figfolder'] else: figfolder = './loss_figures' if 'figsize' in kwargs.keys(): figsize = kwargs['figsize'] else: figsize = (8,4) if 'linewidth' in kwargs.keys(): linewidth = kwargs['linewidth'] else: linewidth = [1,1] if 'color' in kwargs.keys(): color = kwargs['color'] else: color = ['blue','darkgreen'] if 'label' in kwargs.keys(): label = kwargs['label'] else: label =['train loss','validation loss'] if 'linestyle' in kwargs.keys(): linestyle = kwargs['linestyle'] else: linestyle = ['-','-'] if 'markersize' in kwargs.keys(): markersize = kwargs['markersize'] else: markersize = [5,5] if 'xlabel' in kwargs.keys(): xlabel = kwargs['xlabel'] else: xlabel = 'epoch' if 'ylabel' in kwargs.keys(): ylabel = kwargs['ylabel'] else: ylabel = {'pe_loss':'pe loss (eV)', 'force_loss':'force loss (eV/'+r'$\AA$'+')','stress_loss':'stress loss (GPa)','total_loss':'total loss'} if 'format' in kwargs.keys(): format = kwargs['format'] else: format = 'pdf' matplotlib.rc('legend', fontsize=15) matplotlib.rc('xtick', labelsize=15) matplotlib.rc('ytick', labelsize=15) matplotlib.rc('axes', labelsize=15) matplotlib.rc('figure', titlesize=15) for key in self.train_loss: fig, axs = plt.subplots(1, 1 ,figsize=figsize) end_epoch = len(self.train_loss[key]) epoch = np.arange(start_epoch,end_epoch+1) axs.plot(epoch,self.train_loss[key][start_epoch-1:],linestyle[0], markersize=markersize[0], \ fillstyle='none', linewidth=linewidth[0], color= color[0], label=label[0]) if hasattr(self,'val_loss'): axs.plot(epoch,self.val_loss[key][start_epoch-1:],linestyle[1], markersize=markersize[1], \ fillstyle='none', linewidth=linewidth[1], color=color[1], label=label[1]) axs.set_xlabel(xlabel) axs.set_ylabel(ylabel[key]) plt.legend(loc='upper right',frameon=True,borderaxespad=1) fig.suptitle(key) if not os.path.isdir(figfolder): os.mkdir(figfolder) figname = key +'_at_epoch_'+str(end_epoch+1)+'.'+format if saveplot: fig.savefig(os.path.join(figfolder,figname),bbox_inches = 'tight',format=format, dpi=500) plt.show()
[docs]def get_signature(model_dir): """ Run shell command 'saved_model_cli show --dir model_dir --tag_set serve --signature_def serving_default', and get the signature of the network from the shell outputs. SavedModel Command Line Interface (CLI) is a Tensorflow tool to inspect a SavedModel. Args: model_dir: directory for a saved neural network model """ stream = os.popen('saved_model_cli show --dir '+ model_dir +' --tag_set serve --signature_def serving_default') output = stream.read() lines = output.split('\n') input_tag=[] output_tag=[] input_name=[] output_name=[] check_in = 0 check_out = 0 for line in lines[0:-2]: if 'inputs' in line: input_tag.append(line.split('[')[1].split(']')[0].split("'")[1]) check_in = 1 check_out = 0 if 'outputs' in line: output_tag.append(line.split('[')[1].split(']')[0].split("'")[1]) check_out = 1 check_in = 0 if 'name' in line: if check_in: input_name.append(line.split()[1]) if check_out: output_name.append(line.split()[1]) return input_tag, input_name, output_tag, output_name
[docs]def save(obj, model_dir): """ Save a trained model. Args: model_dir: directory for the saved neural network model descriptor(dictionary): descriptor parameters, used for LAMMPS prediction """ tf.saved_model.save(obj, model_dir) input_tag, input_name, output_tag, output_name = get_signature(model_dir) file = open(model_dir+'/parameters','w') file.write('%-20s ' % 'element') file.write(' '.join(j for j in obj.elements)) file.write('\n\n') file.write('%-20s %d\n' % ('input',len(input_tag))) for i in range(len(input_tag)): file.write('%-20s %-20s\n' % (input_tag[i],input_name[i])) file.write('\n') file.write('%-20s %d\n' % ('output',len(output_tag))) for i in range(len(output_tag)): file.write('%-20s %-20s\n' % (output_tag[i],output_name[i])) file.write('\n') file.write('%-20s %f\n' % ('cutoff',obj.descriptor['cutoff'])) file.write('\n') file.write('%-20s %s %d\n' % ('descriptor',obj.descriptor['name'], len(obj.descriptor)-2)) for i in range(2,len(obj.descriptor)): key = list(obj.descriptor.keys())[i] file.write('%-20s '% key) file.write(' '.join(str(j) for j in obj.descriptor[key])) file.write('\n') file.close()
# print('Network signatures and descriptor are written to %s for LAMMPS simulation.'% (model_dir+'/parameters'))
[docs]def load(model_dir): """ Load a saved model. Args: model_dir: directory of a saved neural network model """ if model_dir: return Network(import_dir = model_dir) else: raise ValueError('Load function has no model directory.')