import logging
import pathlib
import typing
from collections import deque
import ase
import h5py
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tqdm
import znh5md
import zntrack
from ase import units
from ase.md.langevin import Langevin
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution
from ase.md.verlet import VelocityVerlet
from numpy.random import default_rng
from tqdm import trange
from ipsuite import base, models, utils
from ipsuite.analysis.ensemble import plot_with_uncertainty
from ipsuite.analysis.model.plots import get_histogram_figure
from ipsuite.utils.ase_sim import freeze_copy_atoms
from ipsuite.utils.md import get_energy_terms
log = logging.getLogger(__name__)
[docs]
class RattleAnalysis(base.ProcessSingleAtom):
"""Move particles with a given stdev from a starting configuration and predict.
Attributes
----------
model: The MLModel node that implements the 'predict' method
atoms: list[Atoms] to predict properties for
logspace: bool, default=True
Increase the stdev of rattle with 'np.logspace' instead of 'np.linspace'
stop: float, default = 1.0
The stop value for the generated space of stdev points
num: int, default = 100
The size of the generated space of stdev points
factor: float, default = 0.001
The 'np.linspace(0.0, stop, num) * factor'
atom_id: int, default = 0
The atom to pick from self.atoms as a starting point
"""
model: models.MLModel = zntrack.deps()
model_outs: pathlib.Path = zntrack.outs_path(zntrack.nwd / "model/")
logspace: bool = zntrack.params(True)
stop: float = zntrack.params(3.0)
factor: float = zntrack.params(0.001)
num: int = zntrack.params(100)
seed: int = zntrack.params(1234)
energies: pd.DataFrame = zntrack.plots(
# x="x",
# y="y",
# x_label="stdev of randomly displaced atoms",
# y_label="predicted energy",
)
[docs]
def run(self):
self.model_outs.mkdir(parents=True, exist_ok=True)
(self.model_outs / "outs.txt").write_text("Lorem Ipsum")
if self.logspace:
stdev_space = (
np.logspace(start=0.0, stop=self.stop, num=self.num) * self.factor
)
else:
stdev_space = (
np.linspace(start=0.0, stop=self.stop, num=self.num) * self.factor
)
atoms = self.get_data()
reference = atoms.copy()
atoms.calc = self.model.get_calculator(directory=self.model_outs)
energies = []
self.frames = []
for stdev in tqdm.tqdm(stdev_space, ncols=70):
atoms.positions = reference.positions
atoms.rattle(stdev=stdev, seed=self.seed)
energies.append(atoms.get_potential_energy())
self.frames.append(freeze_copy_atoms(atoms))
self.energies = pd.DataFrame({"y": energies, "x": stdev_space})
[docs]
class BoxScale(base.ProcessSingleAtom):
"""Scale all particles and predict energies.
Attributes
----------
model: The MLModel node that implements the 'predict' method
atoms: list[Atoms] to predict properties for
start: int, default = None
The initial box scale, default value is the original box size.
stop: float, default = 1.0
The stop value for the generated space of stdev points
num: int, default = 100
The size of the generated space of stdev points
"""
model: models.MLModel = zntrack.deps()
model_outs: pathlib.Path = zntrack.outs_path(zntrack.nwd / "model")
mapping: typing.Any | None = zntrack.deps(None)
stop: float = zntrack.params(2.0)
num: int = zntrack.params(100)
start: float = zntrack.params(1)
plot: pathlib.Path = zntrack.outs_path(zntrack.nwd / "energy.png")
energies: pd.DataFrame = zntrack.plots(
x="x",
y="y",
x_label="Scale factor of the initial cell",
y_label="predicted energy",
)
[docs]
def run(self):
self.model_outs.mkdir(parents=True, exist_ok=True)
(self.model_outs / "outs.txt").write_text("Lorem Ipsum")
scale_space = np.linspace(start=self.start, stop=self.stop, num=self.num)
original_atoms = self.get_data()
cell = original_atoms.copy().cell
original_atoms.calc = self.model.get_calculator(directory=self.model_outs)
energies = []
self.frames = []
if self.mapping is None:
scaling_atoms = original_atoms
else:
scaling_atoms, molecules = self.mapping.forward_mapping(original_atoms)
for scale in tqdm.tqdm(scale_space, ncols=70):
scaling_atoms.set_cell(cell=cell * scale, scale_atoms=True)
if self.mapping is None:
eval_atoms = scaling_atoms
else:
eval_atoms = self.mapping.backward_mapping(scaling_atoms, molecules)
# New atoms object, does not have the calculator.
eval_atoms.calc = original_atoms.calc
energies.append(eval_atoms.get_potential_energy())
self.frames.append(freeze_copy_atoms(eval_atoms))
self.energies = pd.DataFrame({"y": energies, "x": scale_space})
if "energy_uncertainty" in self.frames[0].calc.results:
fig, ax, _ = plot_with_uncertainty(
{
"std": np.std(
[a.calc.results["energy_uncertainty"] for a in self.frames]
),
"mean": self.energies["y"],
},
x=self.energies["x"],
ylabel="predicted energy",
xlabel="Scale factor of the initial cell",
)
else:
fig, ax = plt.subplots()
ax.plot(self.energies["x"], self.energies["y"])
ax.set_xlabel("Scale factor of the initial cell")
ax.set_ylabel("predicted energy")
fig.savefig(self.plot, bbox_inches="tight")
[docs]
class BoxHeatUp(base.ProcessSingleAtom):
"""
Perform a heat-up analysis by gradually increasing the temperature of the system
and monitoring the measured temperature and energy.
Parameters
----------
start_temperature : float
The initial temperature (in Kelvin) to start the analysis.
stop_temperature : float
The upper bound of the temperature range (in Kelvin).
steps : int
Number of temperature increments between start and stop temperature.
time_step : float, optional
Time step of the simulation in femtoseconds. Default is 0.5 fs.
friction : float
Langevin friction coefficient.
repeat : tuple of int, optional
Number of repetitions of the unit cell in each direction. Default is (1, 1, 1).
max_temperature : float or None, optional
Maximum allowed temperature before stopping the simulation.
If None, set to 1.5 * stop_temperature.
model : typing.Any
The MLModel node that implements the 'get_calculator' method.
model_outs : pathlib.Path
Output directory for model results.
plots : pathlib.Path
Path to save the temperature plot.
Attributes
----------
flux_data : pd.DataFrame
DataFrame containing measured temperature, total energy,
and set temperature at each step.
steps_before_explosion : int
Number of steps completed before exceeding max_temperature, or -1 if not exceeded.
frames : list of ase.Atoms
List of atomic configurations at each step.
"""
start_temperature: float = zntrack.params()
stop_temperature: float = zntrack.params()
steps: int = zntrack.params()
time_step: float = zntrack.params(0.5)
friction: float = zntrack.params()
repeat: tuple[int, int, int] = zntrack.params((1, 1, 1))
max_temperature: float | None = zntrack.params(None)
flux_data: pd.DataFrame = zntrack.plots()
model: typing.Any = zntrack.deps()
model_outs: pathlib.Path = zntrack.outs_path(zntrack.nwd / "model")
plots: pathlib.Path = zntrack.outs_path(zntrack.nwd / "temperature.png")
[docs]
def get_atoms(self) -> ase.Atoms:
atoms: ase.Atoms = self.get_data()
return atoms.repeat(self.repeat)
[docs]
def plot_temperature(self):
fig, ax = plt.subplots()
ax.plot(self.flux_data["set_temp"], self.flux_data["meassured_temp"])
ax.plot(
self.flux_data["set_temp"],
self.flux_data["set_temp"],
color="grey",
linestyle="--",
)
ax.set_ylim(self.start_temperature, self.stop_temperature)
ax.set_xlabel(r"Target temperature $t_\mathrm{target}$ / K")
ax.set_ylabel(r"Measured temperature $t_\mathrm{exp}$ / K")
fig.savefig(self.plots)
[docs]
def run(self):
self.model_outs.mkdir(parents=True, exist_ok=True)
(self.model_outs / "outs.txt").write_text("Lorem Ipsum")
if self.max_temperature is None:
self.max_temperature = self.stop_temperature * 1.5
atoms = self.get_atoms()
atoms.calc = self.model.get_calculator(directory=self.model_outs)
# Initialize velocities
MaxwellBoltzmannDistribution(atoms, temperature_K=self.start_temperature)
# initialize thermostat
thermostat = Langevin(
atoms,
timestep=self.time_step * units.fs,
temperature_K=self.start_temperature,
friction=self.friction,
)
# Run simulation
temperature, total_energy = utils.ase_sim.get_energy(atoms)
energy = []
self.frames = []
with tqdm.trange(
self.steps,
desc=utils.ase_sim.get_desc(temperature, total_energy),
leave=True,
ncols=120,
) as pbar:
for temp in np.linspace(
self.start_temperature, self.stop_temperature, self.steps
):
thermostat.run(1)
thermostat.set_temperature(temperature_K=temp)
temperature, total_energy = utils.ase_sim.get_energy(atoms)
pbar.set_description(utils.ase_sim.get_desc(temperature, total_energy))
pbar.update()
energy.append([temperature, total_energy, temp])
if temperature > self.max_temperature:
log.critical(
"Temperature of the simulation exceeded"
f" {self.max_temperature} K. Simulation was stopped."
)
break
self.frames.append(freeze_copy_atoms(atoms))
self.flux_data = pd.DataFrame(
energy, columns=["meassured_temp", "energy", "set_temp"]
)
self.flux_data[self.flux_data > self.max_temperature] = self.max_temperature
self.flux_data.index.name = "step"
if temperature > self.max_temperature:
self.steps_before_explosion = len(energy)
else:
self.steps_before_explosion = -1
self.plot_temperature()
[docs]
def run_stability_nve(
atoms: ase.Atoms,
time_step: float,
max_steps: int,
init_temperature: float,
checks: list[base.Check],
save_last_n: int,
rng: typing.Optional[np.random.Generator] = None,
) -> typing.Tuple[int, list[ase.Atoms]]:
"""
Runs an NVE trajectory for a single configuration until either max_steps
is reached or one of the checks fails.
"""
pbar_update = 50
stable_steps = 0
MaxwellBoltzmannDistribution(atoms, temperature_K=init_temperature, rng=rng)
etot, ekin, epot = get_energy_terms(atoms)
last_n_atoms = deque(maxlen=save_last_n)
last_n_atoms.append(freeze_copy_atoms(atoms))
for check in checks:
check.initialize(atoms)
def get_desc():
"""TQDM description."""
return f"Etot: {etot:.3f} eV \t Ekin: {ekin:.3f} eV \t Epot {epot:.3f} eV"
dyn = VelocityVerlet(atoms, timestep=time_step * units.fs)
with trange(
max_steps,
desc=get_desc(),
leave=True,
ncols=120,
position=1,
) as pbar:
for idx in range(max_steps):
dyn.run(1)
if idx % pbar_update == 0:
etot, ekin, epot = get_energy_terms(atoms)
pbar.set_description(get_desc())
pbar.update(pbar_update)
check_results = [check.check(atoms) for check in checks]
unstable = any(check_results)
last_n_atoms.append(freeze_copy_atoms(atoms))
if unstable:
break
stable_steps = idx
return stable_steps, list(last_n_atoms)
[docs]
class MDStability(base.IPSNode):
"""Perform NVE molecular dynamics for all supplied atoms using a trained model.
Several stability checks can be supplied to judge whether a particular
trajectory is stable.
If the check fails, the trajectory is terminated.
After all trajectories are done, a histogram of the duration of stability is created.
Attributes
----------
model: A node which implements the `calc` property. Typically an MLModel instance.
data: list[Atoms] to run MD for for
max_steps: Maximum number of steps for each trajectory
time_step: MD integration time step
initial_temperature: Initial velocities are drawn from a maxwell boltzman
distribution.
save_last_n: how many configurations before the instability should be saved
bins: number of bins in the histogram
seed: seed for the MaxwellBoltzmann distribution
"""
data: list[ase.Atoms] = zntrack.deps()
model: typing.Any = zntrack.deps()
model_outs: pathlib.Path = zntrack.outs_path(zntrack.nwd / "model_outs")
max_steps: int = zntrack.params()
checks: list[zntrack.Node] = zntrack.deps(None)
time_step: float = zntrack.params(0.5)
initial_temperature: float = zntrack.params(300)
save_last_n: int = zntrack.params(1)
bins: int = zntrack.params(None)
seed: int = zntrack.params(0)
traj_file: pathlib.Path = zntrack.outs_path(zntrack.nwd / "structures.h5")
plots_dir: pathlib.Path = zntrack.outs_path(zntrack.nwd / "plots")
stable_steps_df: pd.DataFrame = zntrack.plots()
@property
def frames(self) -> typing.List[ase.Atoms]:
with self.state.fs.open(self.traj_file, "rb") as f:
with h5py.File(f) as file:
return znh5md.IO(file_handle=file)[:]
[docs]
def get_plots(self, stable_steps: int) -> None:
"""Create figures for all available data."""
if self.bins is None:
self.bins = int(np.ceil(len(stable_steps) / 100))
counts, bin_edges = np.histogram(stable_steps, self.bins)
self.plots_dir.mkdir()
label_hist = get_histogram_figure(
bin_edges,
counts,
datalabel="NVE",
xlabel="Number of stable time steps",
ylabel="Occurrences",
)
label_hist.savefig(self.plots_dir / "hist.png")
[docs]
def run(self) -> None:
self.model_outs.mkdir(parents=True, exist_ok=True)
(self.model_outs / "outs.txt").write_text("Lorem Ipsum")
data_lst = self.data
calculator = self.model.get_calculator(directory=self.model_outs)
rng = default_rng(self.seed)
stable_steps = []
db = znh5md.IO(self.traj_file)
unstable_atoms = []
for ii in tqdm.trange(
0, len(data_lst), desc="Atoms", leave=True, ncols=120, position=0
):
atoms = data_lst[ii].copy()
atoms.calc = calculator
n_steps, last_n_atoms = run_stability_nve(
atoms,
self.time_step,
self.max_steps,
self.initial_temperature,
checks=self.checks,
save_last_n=self.save_last_n,
rng=rng,
)
unstable_atoms.extend(last_n_atoms)
stable_steps.append(n_steps)
db.extend(unstable_atoms)
self.get_plots(stable_steps)
self.stable_steps_df = pd.DataFrame({"stable_steps": np.array(stable_steps)})