Source code for pyrid.observables.observables_util

# -*- coding: utf-8 -*-
"""
@author: Moritz F P Becker
"""

import numpy as np
import h5py
import os
import math
from pathlib import Path

from ..evaluation.rdf_util import create_rb_hgrid, radial_distr_function

#%%


[docs]class Observables(object): """ The Observables class keeps track of all the observables. It contains methods to sample specific quantities, e.g. pressure, and to write these to an hdf5 file. Sampling can be done in what is here called a stepwise manner. Thereby, the value that the quantity/property has at the current simulation step (current point in time) is saved. However, sometimes it is more convenient to save not the current value but the average over some time window/bin. This is true in particular for event based properties like the number of reactions. Therefore, in PyRID, observables can also be binned and only then their value will be saved/written to the file. Attributes ---------- binned : `dict` Dictionary containing for each quantity the binned values. Observing : `array like` Structured array with boolean fields for each observable. If a field's value is True, the corresponding quantity is observed. observables_setup : `dict` Dictionary containing for each observed quantity the information needed for the sampling process. This information includes the stride, the items (molecules, particles, reaction type, ...) to be included, the total number of sampling steps (int((Simulation.nsteps)/stride)), whether the quantity is sampled in a stepwise manner or by binning or both, the current sampling step. bond_pairs: `list` List of particle type pairs for which the number of bonds is observed. observing_rdf: `boolean` If True, the radial distribution function is observed. rdf_nsteps: `int` The total number of sampling steps (int((Simulation.nsteps)/stride)) for the RDF. rdf_current_step: `int` Current sampling step of the RDF. rdf_cutoff: `float64[N]` The cutoff distances for each of the N molecule pairs for which the RDF is calculated. rdf_bins: `int64[N]` The number of bins (spatial resolution of the RDF) for each of the N molecule pairs for which the RDF is calculated. rdf_stride : `int` Stride with which the RDF is sampled. rdf_pairs: `list` List of molecule type pairs for which the RDF is calculated. rdf_molecules: `set` Set of molecules types for which the RDF is calculated. rdf_hgrid: `array like` Hierarchical grid data structure for the different molecule types. The hierarchical grid allows for an efficient calculation of the pairwise distances between the molecules. rdf_measure_pair: `int64[N,N]` Matrix of molecule type indices indicating which molecule pairs are observed. rdf_hist: `dict` Dictionary containing for each observed molecule pair the rdf histogram. Methods ------- update_bins(self, Simulation) Updates the values of all quantities which are sampled using data binning. observe(self, Property, stride, Simulation, types = None, reactions = None, stepwise = True, binned= False) Registers a quantity/property for observation. observe_rdf(self, rdf_pairs, rdf_bins, rdf_cutoff, stride, Simulation) Registers a molecule pair for observation of the corresponding radial distribution function. update(self, Simulation, hdf, RBs) Updates the values of all observed quantities by writing these to an hdf5 file. update_rdf(self, Simulation, hdf): Calculates the radial distribution function (rdf histogram) for a range of molecule pairs and writes these to the hdf5 file. """
[docs] def __init__(self, Simulation): try: os.makedirs(Simulation.file_path / 'hdf5') except FileExistsError: # directory already exists pass try: os.makedirs(Simulation.fig_path) except FileExistsError: # directory already exists pass # ------------- # Binning Data # ------------- self.binned = {} self.binned['Volume'] = 0.0 self.binned['Energy'] = np.zeros(Simulation.System.Epot.shape) self.binned['Bonds'] = np.zeros(Simulation.System.N_bonds.shape) self.binned['Number'] = np.zeros(Simulation.System.Nmol.shape) self.binned['Pressure'] = np.zeros(Simulation.System.Pressure.shape) self.binned['Virial'] = np.zeros(Simulation.System.virial_scalar.shape) self.binned['Virial Tensor'] = np.zeros(Simulation.System.virial_tensor.shape) observables_types = np.dtype([('Volume', bool),('Energy', bool),('Number', bool),('Pressure', bool),('Virial', bool),('Virial Tensor', bool),('Reactions', bool),('Bonds', bool),('Force', bool),('Torque', bool),('Position', bool),('Orientation', bool),], align=True) self.Observing = np.zeros(2, dtype = observables_types) # ------------- self.Time_passed = [] self.observables_setup = dict() self.observing_rdf = False hdf = h5py.File(Simulation.file_path / 'hdf5' / (Simulation.file_name+'.h5'), 'w') #------------------ # System Setup #------------------ hdf.create_group('Setup/') # hdf['Setup'].attrs['box_lengths'] = Simulation.System.box_lengths # hdf['Setup'].attrs['Temp'] = Simulation.System.Temp # hdf['Setup'].attrs['eta'] = Simulation.System.eta # hdf['Setup'].attrs['dt'] = Simulation.System.dt hdf['Setup'].create_dataset('box_lengths', data=Simulation.System.box_lengths) hdf['Setup'].create_dataset('Temp', data=Simulation.System.Temp) hdf['Setup'].create_dataset('kbt', data=Simulation.System.kbt) hdf['Setup'].create_dataset('eta', data=Simulation.System.eta) hdf['Setup'].create_dataset('dt', data=Simulation.System.dt) hdf['Setup'].create_dataset('nsteps', data=Simulation.nsteps) if Simulation.System.barostat[0]['active']==True: hdf.create_group('Setup/barostat') hdf['Setup/barostat'].create_dataset('Tau_P', data=Simulation.System.barostat[0]['Tau_P']) hdf['Setup/barostat'].create_dataset('P0', data=Simulation.System.barostat[0]['P0']) hdf.create_group('Setup/units') for measure in Simulation.units: hdf['Setup/units'].create_dataset(measure, data=Simulation.units[measure]) #------------------ # Molecule data #------------------ # hdf.create_group('Molecules/') if len(Simulation.System.molecule_types)==0: raise AttributeError('No molecules defined!') else: for type1 in Simulation.System.molecule_types: hdf.create_group('Molecules/'+str(type1)) hdf['Molecules/'+str(type1)].create_dataset('types', data=np.array(Simulation.System.molecule_types[str(type1)].types, dtype = 'S')) hdf['Molecules/'+str(type1)].create_dataset('radii', data=Simulation.System.molecule_types[type1].radii) hdf['Molecules/'+str(type1)].create_dataset('pos', data=Simulation.System.molecule_types[type1].pos) hdf['Molecules/'+str(type1)].create_dataset('volume', data=Simulation.System.molecule_types[type1].volume) hdf['Molecules/'+str(type1)].create_dataset('mu_tb', data=Simulation.System.molecule_types[type1].mu_tb) hdf['Molecules/'+str(type1)].create_dataset('mu_rb', data=Simulation.System.molecule_types[type1].mu_rb) #------------------ hdf.close()
[docs] def update_bins(self, Simulation): """Updates the values of all quantities which are sampled using data binning. Parameters ---------- Simulation : `object` Instance of the SImulation class. """ if self.Observing[1]['Volume']: self.binned['Volume'] += Simulation.System.volume if self.Observing[1]['Number']: for mol_name in self.observables_setup['Number']['items']: mol_id = Simulation.System.molecule_types[mol_name].type_id self.binned['Number'][mol_id] += Simulation.System.Nmol[mol_id] if self.Observing[1]['Energy']: for mol_name in self.observables_setup['Energy']['items']: mol_id = Simulation.System.molecule_types[mol_name].type_id self.binned['Energy'][mol_id] += Simulation.System.Epot[mol_id] if self.Observing[1]['Bonds']: for type1_id, type2_id in self.bond_pairs: self.binned['Bonds'][type1_id][type2_id] += Simulation.System.N_bonds[type1_id][type2_id] if self.Observing[1]['Pressure']: for mol_name in self.observables_setup['Pressure']['items']: mol_id = Simulation.System.molecule_types[mol_name].type_id self.binned['Pressure'][mol_id] += Simulation.System.Pressure[mol_id] if self.Observing[1]['Virial']: for mol_name in self.observables_setup['Virial']['items']: mol_id = Simulation.System.molecule_types[mol_name].type_id self.binned['Virial'][mol_id] += Simulation.System.virial_scalar[mol_id]+Simulation.System.virial_scalar_Wall[mol_id] if self.Observing[1]['Virial Tensor']: for mol_name in self.observables_setup['Virial Tensor']['items']: mol_id = Simulation.System.molecule_types[mol_name].type_id self.binned['Virial Tensor'][mol_id] += Simulation.System.virial_tensor[mol_id]#+Simulation.System.virial_tensor_Wall[mol_id]
#%%
[docs] def observe(self, Property, stride, Simulation, types = None, reactions = None, stepwise = True, binned= False):#, save = True, keep_list = False): """Registers a quantity/property for observation. Parameters ---------- Property : `string` Name of the quantity/property to observe. Supported are 'Force', 'Torque', 'Orientation', 'Position', 'Volume', 'Energy','Pressure', 'Virial', 'Number', 'Virial Tensor', 'Reactions', 'Bonds'. stride : `ìnt` Stride with which the selected quantity is sampled. Simulation : `object` Instance of the SImulation class. types : `list[string]` Names of the particle or molecule types for which to observe a certain property/quantity. Default = None reactions : `int64[:]` List of reaction type indices. Default = None stepwise : `boolean` Indicates whether the selected quantity is sampled in a stepwise manner. Default = True binned : `boolean` Indicates whether the selected quantity is sampled using data binning. Default = False Raises ------ KeyError ('Molecule type not found') A molecule type name has been passed that has not yet been defined. """ self.Observing[0][Property] = True if binned: self.Observing[1][Property] = True self.observables_setup[Property] = dict() self.observables_setup[Property]['stride'] = stride # self.observables_setup[Property]['save'] = save # self.observables_setup[Property]['keep_list'] = keep_list self.observables_setup[Property]['nsteps'] = int((Simulation.nsteps)/stride) self.observables_setup[Property]['items'] = [] # self.observables_setup[Property]['trace'] = dict() self.observables_setup[Property]['binned'] = binned self.observables_setup[Property]['stepwise'] = stepwise self.observables_setup[Property]['current_step'] = 0 # if save==True: hdf = h5py.File(Simulation.file_path / 'hdf5' / (Simulation.file_name+'.h5'), 'a') if stepwise == True: group = hdf.create_group('stepwise/'+Property, track_order=False) group.attrs['stride'] = stride if binned == True: group = hdf.create_group('binned/'+Property, track_order=False) group.attrs['stride'] = stride if Property in ['Force', 'Torque', 'Orientation', 'Position']: for type1 in types: if str(type1) not in Simulation.System.molecule_types: raise KeyError('Molecule type not found') self.observables_setup[Property]['items'].append(str(type1)) # if keep_list == True: # self.observables_setup[Property]['trace'][str(type1)] = [] # if save==True: if stepwise == True: group = hdf.create_group('stepwise/'+Property+'/'+str(type1), track_order=False) if binned == True: group = hdf.create_group('binned/'+Property+'/'+str(type1), track_order=False) elif Property in ['Volume']: # if keep_list == True: # self.observables_setup[Property]['trace'] = [] # if save== True: if stepwise == True: dataset = hdf.create_dataset('stepwise/Volume/'+Property, shape=(0,), dtype='f8', maxshape=(None,)) if binned == True: dataset = hdf.create_dataset('binned/Volume/'+Property, shape=(0,), dtype='f8', maxshape=(None,)) elif Property in ['Energy','Pressure', 'Virial', 'Number', 'Virial Tensor']: if Property in ['Pressure', 'Virial', 'Energy']: shape = (0,) max_shape = (None,) dtype = 'f8' elif Property in ['Virial Tensor']: shape = (0,3,3) max_shape = (None,3,3) dtype = 'f8' elif Property in ['Number']: shape = (0,) max_shape = (None,) dtype = 'i8' for type1 in types: if str(type1) not in Simulation.System.molecule_types: raise KeyError('Molecule type not found') self.observables_setup[Property]['items'].append(str(type1)) # if keep_list == True: # self.observables_setup[Property]['trace'][str(type1)] = [] # if save== True: if stepwise == True: dataset = hdf.create_dataset('stepwise/'+Property+'/'+str(type1), shape=shape, dtype=dtype, maxshape=max_shape) if binned == True: dataset = hdf.create_dataset('binned/'+Property+'/'+str(type1), shape=shape, dtype='f8', maxshape=max_shape) elif Property in ['Reactions']: for reaction_type_index in reactions: # reaction_type_id = Simulation.System.Reactions_Dict[reaction_type_index].reaction_type_id bimol = Simulation.System.Reactions_Dict[reaction_type_index].bimol if bimol: educts_names = [] for educt_id in Simulation.System.Reactions_Dict[reaction_type_index].educts: if Simulation.System.Reactions_Dict[reaction_type_index].reaction_educt_type == 'Molecule': educts_names.append(Simulation.System.molecule_id_to_name[educt_id]) else: educts_names.append(Simulation.System.particle_id_to_name[educt_id]) educts = educts_names[0]+'+'+educts_names[1] else: educt_id = Simulation.System.Reactions_Dict[reaction_type_index].educts[0] if Simulation.System.Reactions_Dict[reaction_type_index].reaction_educt_type == 'Molecule': educts = Simulation.System.molecule_id_to_name[educt_id] else: educts = Simulation.System.particle_id_to_name[educt_id] self.observables_setup[Property]['items'].append([reaction_type_index,educts]) if Simulation.System.Reactions_Dict[reaction_type_index].reaction_educt_type == 'Molecule' and bimol: particle_educts = Simulation.System.Reactions_Dict[reaction_type_index].particle_educts particle_educts = Simulation.System.particle_id_to_name[particle_educts[0]]+'+'+Simulation.System.particle_id_to_name[particle_educts[1]] if stepwise == True: hdf.create_dataset('stepwise/'+Property+'/'+particle_educts+'/n_total', shape=(0,), dtype='i8', maxshape=(None,)) if binned == True: hdf.create_dataset('binned/'+Property+'/'+particle_educts+'/n_total', shape=(0,), dtype='i8', maxshape=(None,)) else: if stepwise == True: hdf.create_dataset('stepwise/'+Property+'/'+educts+'/n_total', shape=(0,), dtype='i8', maxshape=(None,)) if binned == True: hdf.create_dataset('binned/'+Property+'/'+educts+'/n_total', shape=(0,), dtype='f8', maxshape=(None,)) for i in range(len(Simulation.System.Reactions_Dict[reaction_type_index].paths)): reaction_type = Simulation.System.Reactions_Dict[reaction_type_index].paths[i]['type'] path = 'path_'+str(i) if Simulation.System.Reactions_Dict[reaction_type_index].reaction_educt_type == 'Molecule' and bimol: if stepwise == True: dataset = hdf.create_dataset('stepwise/'+Property+'/'+particle_educts+'/'+reaction_type+'/'+educts+'/'+path, shape=(0,), dtype='i8', maxshape=(None,)) if binned == True: dataset = hdf.create_dataset('binned/'+Property+'/'+particle_educts+'/'+reaction_type+'/'+educts+'/'+path, shape=(0,), dtype='f8', maxshape=(None,)) else: if stepwise == True: dataset = hdf.create_dataset('stepwise/'+Property+'/'+educts+'/'+reaction_type+'/'+path, shape=(0,), dtype='i8', maxshape=(None,)) if binned == True: dataset = hdf.create_dataset('binned/'+Property+'/'+educts+'/'+reaction_type+'/'+path, shape=(0,), dtype='f8', maxshape=(None,)) elif Property in ['Bonds']: ii = np.where(Simulation.System.reaction_args['bond']) self.bond_pairs = list(zip(ii[0],ii[1])) for type1_id, type2_id in self.bond_pairs: type1_name = Simulation.System.particle_id_to_name[type1_id] type2_name = Simulation.System.particle_id_to_name[type2_id] bond_pair = type1_name+'.'+type2_name if stepwise == True: dataset = hdf.create_dataset('stepwise/'+Property+'/'+bond_pair, shape=(0,), dtype='i8', maxshape=(None,)) if binned == True: dataset = hdf.create_dataset('binned/'+Property+'/'+bond_pair, shape=(0,), dtype='i8', maxshape=(None,)) # if save==True: hdf.close() print('Observing {}.'.format(Property))
#%%
[docs] def observe_rdf(self, rdf_pairs, rdf_bins, rdf_cutoff, stride, Simulation): #, save = True, keep_list = False): """Registers a molecule pair for observation of the corresponding radial distribution function. Parameters ---------- rdf_pairs : `list` List of molecule type pairs for which the RDF is calculated. rdf_bins: `int64[N]` The number of bins (spatial resolution of the RDF) for each of the N molecule pairs for which the RDF is calculated. rdf_cutoff: `float64[N]` The cutoff distances for each of the N molecule pairs for which the RDF is calculated. stride : `int` Stride with which the RDF is sampled. Simulation : `object` Instance of the SImulation class. Raises ------ KeyError ('Molecule type not found') A molecule type name has been passed that has not yet been defined. """ hdf = h5py.File(Simulation.file_path / 'hdf5' / (Simulation.file_name+'.h5'), 'a') self.observing_rdf =True self.rdf_nsteps = int((Simulation.nsteps)/stride) self.rdf_current_step = 0 self.rdf_cutoff = np.array(rdf_cutoff) self.rdf_bins = np.array(rdf_bins) self.rdf_stride = stride self.rdf_pairs = rdf_pairs self.rdf_molecules = set(sum(rdf_pairs,[])) # [[mol1, mol2], [mol2, mol3]] -> {mol1, mol2, mol3} self.rdf_hgrid = create_rb_hgrid(Simulation, Simulation.RBs, self.rdf_molecules, Simulation.System.box_lengths, Simulation.System.N, Simulation.System) self.rdf_measure_pair = np.zeros((Simulation.System.ntypes, Simulation.System.ntypes), dtype = np.int64) self.rdf_hist = dict() for i in range(len(rdf_pairs)): type1, type2 = rdf_pairs[i] type1_id = Simulation.System.molecule_types[str(type1)].type_id type2_id = Simulation.System.molecule_types[str(type2)].type_id if (str(type1) not in Simulation.System.molecule_types) or (str(type2) not in Simulation.System.molecule_types): raise KeyError('Molecule type not found') self.rdf_hist[type1+'.'+type2] = np.zeros(rdf_bins[i]) self.rdf_measure_pair[type1_id][type2_id] = True dataset = hdf.create_group('RDF/'+str(type1)+'.'+str(type2), track_order=False) dataset.attrs['stride'] = stride dataset.attrs['rdf_cutoff'] = rdf_cutoff[i] dataset.attrs['rdf_bins'] = rdf_bins[i] hdf.close()
#%%
[docs] def update(self, Simulation, hdf, RBs): """Updates the values of all observed quantities by writing these to an hdf5 file. Parameters ---------- Simulation : `object` Instance of the SImulation class. hdf : `object` hdf5 file. RBs : `object` Instance of the RBs class. """ for Property in self.observables_setup: observable = self.observables_setup[Property] if Simulation.current_step % observable['stride'] == 0: # hdf = h5py.File(Simulation.file_path+'hdf5/'+Simulation.file_name+'.h5', 'a') if Property in ['Force', 'Torque', 'Orientation', 'Position']: digits = int(math.log10(observable['nsteps']))+1 step = observable['current_step'] step_id = f'{step:0{digits}}' for mol_name in observable['items']: mol_id = Simulation.System.molecule_types[mol_name].type_id Mask = RBs[RBs.occupied[0:RBs.occupied.n]]['type_id']==mol_id # if observable['stepwise'] == True: if Property == 'Force': dataset = hdf.create_dataset('stepwise/'+Property+'/'+str(mol_name)+'/'+step_id, data=RBs[RBs.occupied[0:RBs.occupied.n]]['force'][Mask]) elif Property == 'Torque': dataset = hdf.create_dataset('stepwise/'+Property+'/'+str(mol_name)+'/'+step_id, data=RBs[RBs.occupied[0:RBs.occupied.n]]['torque'][Mask]) elif Property == 'Orientation': dataset = hdf.create_dataset('stepwise/'+Property+'/'+str(mol_name)+'/'+step_id, data=RBs[RBs.occupied[0:RBs.occupied.n]]['q'][Mask]) elif Property == 'Position': dataset = hdf.create_dataset('stepwise/'+Property+'/'+str(mol_name)+'/'+step_id, data=RBs[RBs.occupied[0:RBs.occupied.n]]['pos'][Mask]) dataset.attrs['time step'] = Simulation.current_step observable['current_step'] += 1 elif Property in ['Volume']: if observable['stepwise'] == True: Dset = hdf['stepwise/Volume/'+Property] Dset.resize((Dset.shape[0] + 1), axis = 0) if Property == 'Volume': Dset[-1:] = Simulation.System.volume if observable['binned'] == True: Dset = hdf['binned/Volume/'+Property] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = self.binned[Property] self.binned[Property] = 0.0 elif Property in ['Energy', 'Pressure', 'Virial', 'Virtial Tensor', 'Number']: for mol_name in observable['items']: mol_id = Simulation.System.molecule_types[mol_name].type_id if observable['stepwise'] == True: Dset = hdf['stepwise/'+Property+'/'+str(mol_name)] Dset.resize((Dset.shape[0] + 1), axis = 0) if Property == 'Energy': Dset[-1:] = Simulation.System.Epot[mol_id] elif Property == 'Pressure': Dset[-1:] = Simulation.System.Pressure[mol_id] elif Property == 'Virial': Dset[-1:] = Simulation.System.virial_scalar[mol_id]+Simulation.System.virial_scalar_Wall[mol_id] elif Property == 'Virial Tensor': Dset[-1:] = Simulation.System.virial_tensor[mol_id] elif Property == 'Number': Dset[-1:] = Simulation.System.Nmol[mol_id] if observable['binned'] == True: Dset = hdf['binned/'+Property+'/'+str(mol_name)] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = self.binned[Property][mol_id] if Property == 'Virial Tensor': self.binned[Property][:,:,:] = 0.0 else: self.binned[Property][:] = 0.0 elif Property in ['Reactions']: for reaction_type_index,educts in observable['items']: bimol = Simulation.System.Reactions_Dict[reaction_type_index].bimol if Simulation.System.Reactions_Dict[reaction_type_index].reaction_educt_type == 'Molecule' and bimol: particle_educts = Simulation.System.Reactions_Dict[reaction_type_index].particle_educts particle_educts = Simulation.System.particle_id_to_name[particle_educts[0]]+'+'+Simulation.System.particle_id_to_name[particle_educts[1]] if observable['stepwise'] == True: Dset = hdf['stepwise/'+Property+'/'+particle_educts+'/n_total'] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.Reactions_Dict[reaction_type_index].n_total if observable['binned'] == True: Dset = hdf['binned/'+Property+'/'+particle_educts+'/n_total'] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.Reactions_Dict[reaction_type_index].n_total_binned#/observable['stride'] Simulation.System.Reactions_Dict[reaction_type_index].n_total_binned = 0 else: if observable['stepwise'] == True: Dset = hdf['stepwise/'+Property+'/'+educts+'/n_total'] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.Reactions_Dict[reaction_type_index].n_total if observable['binned'] == True: Dset = hdf['binned/'+Property+'/'+educts+'/n_total'] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.Reactions_Dict[reaction_type_index].n_total_binned#/observable['stride'] Simulation.System.Reactions_Dict[reaction_type_index].n_total_binned = 0 for i in range(len(Simulation.System.Reactions_Dict[reaction_type_index].paths)): reaction_type = Simulation.System.Reactions_Dict[reaction_type_index].paths[i]['type'] path = 'path_'+str(i) if Simulation.System.Reactions_Dict[reaction_type_index].reaction_educt_type == 'Molecule' and bimol: if observable['stepwise'] == True: Dset = hdf['stepwise/'+Property+'/'+particle_educts+'/'+reaction_type+'/'+educts+'/'+path] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.Reactions_Dict[reaction_type_index].paths[i]['n_success'] if observable['binned'] == True: Dset = hdf['binned/'+Property+'/'+particle_educts+'/'+reaction_type+'/'+educts+'/'+path] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.Reactions_Dict[reaction_type_index].paths[i]['n_success_binned']#/observable['stride'] Simulation.System.Reactions_Dict[reaction_type_index].paths[i]['n_success_binned'] = 0 else: if observable['stepwise'] == True: Dset = hdf['stepwise/'+Property+'/'+educts+'/'+reaction_type+'/'+path] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.Reactions_Dict[reaction_type_index].paths[i]['n_success'] if observable['binned'] == True: Dset = hdf['binned/'+Property+'/'+educts+'/'+reaction_type+'/'+path] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.Reactions_Dict[reaction_type_index].paths[i]['n_success_binned']#/observable['stride'] Simulation.System.Reactions_Dict[reaction_type_index].paths[i]['n_success_binned'] = 0 elif Property in ['Bonds']: for type1_id, type2_id in self.bond_pairs: type1_name = Simulation.System.particle_id_to_name[type1_id] type2_name = Simulation.System.particle_id_to_name[type2_id] bond_pair = type1_name+'.'+type2_name if observable['stepwise'] == True: Dset = hdf['stepwise/'+Property+'/'+bond_pair] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = Simulation.System.N_bonds[type1_id][type2_id] if observable['binned'] == True: Dset = hdf['binned/'+Property+'/'+bond_pair] Dset.resize((Dset.shape[0] + 1), axis = 0) Dset[-1:] = self.binned[Property][type1_id][type2_id] self.binned[Property][:,:] = 0.0
#%% # hdf.close() #%%
[docs] def update_rdf(self, Simulation, hdf): """Calculates the radial distribution function (rdf histogram) for a range of molecule pairs and writes these to the hdf5 file. Parameters ---------- Simulation : `object` Instance of the SImulation class. hdf : `object` hdf5 file. """ if Simulation.current_step % self.rdf_stride == 0: for i in range(len(self.rdf_pairs)): mol1, mol2 =self.rdf_pairs[i] typeI_id = Simulation.System.molecule_types[str(mol1)].type_id typeJ_id = Simulation.System.molecule_types[str(mol2)].type_id rdf_hist = self.rdf_hist[mol1+'.'+mol2] if Simulation.System.Nmol[typeI_id]>0 and Simulation.System.Nmol[typeJ_id]>0: rdf_hgrid = self.rdf_hgrid rdf_cutoff = self.rdf_cutoff[i] rdf_bins = self.rdf_bins[i] radial_distr_function(Simulation.System, np.array([typeI_id,typeJ_id]), Simulation.RBs, rdf_hgrid, rdf_cutoff, rdf_bins, rdf_hist) dr_rdf = rdf_cutoff / float(rdf_bins) r = np.linspace(0, rdf_cutoff, rdf_bins) rdf_hist = rdf_hist/(4/3*np.pi*(r**3-(r-dr_rdf)**3))/(Simulation.System.Nmol[typeJ_id]/Simulation.System.box_lengths.prod())/(Simulation.System.Nmol[typeI_id]) else: rdf_hist[:] = 0.0 digits = int(math.log10(self.rdf_nsteps))+1 step = self.rdf_current_step step_id = f'{step:0{digits}}' dataset = hdf.create_dataset('RDF/'+str(mol1)+'.'+str(mol2)+'/'+step_id, data=rdf_hist) dataset.attrs['time step'] = Simulation.current_step self.rdf_current_step += 1