Source code for ipsuite.configuration_generation.gmx

import contextlib
import json
import os
import pathlib
import re
import shutil
import subprocess
import typing as t

import h5py
import MDAnalysis as mda
import yaml
import znh5md
import zntrack
from ase import Atoms
from ase.calculators.singlepoint import SinglePointCalculator
from ase.data import atomic_numbers
from MDAnalysis.coordinates.timestep import Timestep
from pint import UnitRegistry
from rdkit import Chem
from rdkit.Chem import AllChem, rdMolDescriptors

from ipsuite import base

# Initialize pint unit registry
ureg = UnitRegistry()


def dict_to_mdp(data: dict) -> str:
    """Convert a dictionary to a Gromacs .mdp file."""
    # convert all values that are lists to strings
    # e.g. for `annealing-time`
    for key, value in data.items():
        if isinstance(value, list):
            data[key] = " ".join([str(v) for v in value])

    return "\n".join([f"{key} = {value}" for key, value in data.items()])


def params_to_mdp(file: pathlib.Path, target: pathlib.Path):
    """Convert yaml/json files to mdp files."""
    if file.suffix in [".yaml", ".yml"]:
        with file.open("r") as f:
            data = yaml.safe_load(f)
            data = dict_to_mdp(data)
    elif file.suffix == ".json":
        with file.open("r") as f:
            data = json.load(f)
            data = dict_to_mdp(data)
    else:
        data = file.read_text()

    target.write_text(data)


def timestep_to_atoms(u: mda.Universe, ts: Timestep) -> t.Tuple[Atoms, float]:
    """Convert an MDAnalysis timestep to an ASE atoms object.

    Parameters
    ----------
    u : MDAnalysis.Universe
        The MDAnalysis Universe object containing topology information.
    ts : MDAnalysis.coordinates.Timestep
        The MDAnalysis Timestep object to convert.

    Returns
    -------
    ase.Atoms
        The converted ASE Atoms object.
    float
        The time of the timestep in femtoseconds.
    """
    # Set the universe's trajectory to the provided timestep
    u.trajectory.ts = ts

    # Extract positions from timestep
    positions = ts.positions

    # Extract masses and elements from the universe
    names = u.atoms.names
    cell = u.dimensions

    # Adapted from ASE gro reader
    symbols = []
    for name in names:
        if name in atomic_numbers:
            symbols.append(name)
        elif name[0] in atomic_numbers:
            symbols.append(name[0])
        elif name[-1] in atomic_numbers:
            symbols.append(name[-1])
        else:
            # not an atomic symbol
            # if we can not determine the symbol, we use
            # the dummy symbol X
            symbols.append("X")

    forces = ts.forces * ureg.kilocalories / ureg.mol / ureg.angstrom
    # convert to eV/Å
    forces.ito(ureg.eV / ureg.angstrom / ureg.particle)
    forces = forces.magnitude
    energy = 0
    with contextlib.suppress(KeyError):
        energy = ts.aux["Potential"]
        energy = energy * ureg.kilocalories / ureg.mol
        energy.ito(ureg.eV / ureg.particle)
        energy = energy.magnitude

    atoms = Atoms(symbols, positions=positions, cell=cell, pbc=True)
    atoms.calc = SinglePointCalculator(atoms, energy=energy, forces=forces)

    with contextlib.suppress(KeyError):
        atoms.info["temperature"] = ts.aux["Temperature"].item()
    with contextlib.suppress(KeyError):
        atoms.info["pressure"] = ts.aux["Pressure"].item()
    with contextlib.suppress(KeyError):
        atoms.info["density"] = ts.aux["Density"].item()

    return atoms, (ts.time * ureg.picosecond).to(ureg.femtosecond).magnitude


def smiles_to_pdb(
    smiles: str,
    file: str,
    identifier: str | None = None,
    cwd: pathlib.Path = pathlib.Path(),
) -> Chem.Mol:
    """Convert a SMILES string to a PDB file and return the RDKit molecule object."""
    m = Chem.MolFromSmiles(smiles)
    m = Chem.AddHs(m)
    AllChem.EmbedMolecule(m)
    AllChem.UFFOptimizeMolecule(m)
    pdb_block = Chem.MolToPDBBlock(m)

    if identifier is not None:
        if identifier in pdb_block:
            raise ValueError(
                f"Can not use '{identifier}' because it is present in the file."
            )
        if len(identifier) != 3:
            raise ValueError("Identifier must be 3 characters long")
        pdb_block = pdb_block.replace("UNL", identifier)

    working_directory = pathlib.Path(cwd)

    with (working_directory / file).open("w") as f:
        f.write(pdb_block)
    return m


def get_box(density: float, molecules: list[Chem.Mol], counts: list[int]) -> float:
    """
    Compute the box size for a cubic box with the desired density and given molecules.

    Args:
        density (float): Desired density in g/cm^3.
        molecules (Chem.Mol): Variable number of RDKit molecule objects.

    Returns:
        float: The side length of the cubic box in angstroms.
    """
    total_mass = (
        sum(
            rdMolDescriptors.CalcExactMolWt(mol) * count
            for mol, count in zip(molecules, counts)
        )
        * ureg.dalton
    )  # in atomic mass units (amu)
    total_mass_g = total_mass.to(ureg.gram)  # convert amu to grams
    density_g_per_cm3 = density * (ureg.gram / ureg.centimeter**3)

    volume_cm3 = total_mass_g / density_g_per_cm3  # volume in cm^3
    volume_angstrom3 = volume_cm3.to(ureg.angstrom**3)  # convert cm^3 to angstrom^3
    side_length = volume_angstrom3 ** (1 / 3)  # side length of the cubic box in angstroms

    return side_length.magnitude


def create_pack_script(
    files: list[str],
    counts: list[int],
    box_size: float,
    tolerance: float,
    cwd: pathlib.Path = pathlib.Path(),
):
    """
    Create a PACKMOL input script to pack molecules into a box.

    Attributes
    ----------
    files: list[str]
        List of file names for the molecules to pack.
    counts: list[int]
        Number of each molecule to pack.
    box_size: float
        Side length of the cubic box in angstroms.
    """
    if len(files) != len(counts):
        raise ValueError("The number of files must match the number of counts")

    box_size -= tolerance

    script = f"""tolerance {tolerance}
output box.pdb
filetype pdb
"""

    for file, count in zip(files, counts):
        structure_block = f"""structure {file}
  number {count}
  inside box 0. 0. 0. {box_size} {box_size} {box_size}
end structure
"""
        script += structure_block

    with (cwd / "packmol.inp").open("w") as f:
        f.write(script)


def extract_atomtypes(input_file: pathlib.Path, output_file: pathlib.Path):
    content = input_file.read_text()

    # Regular expression to match the [ atomtypes ] section
    atomtypes_regex = re.compile(r"(\[ atomtypes \].*?)(?=\n\[|\Z)", re.DOTALL)

    # Find the atomtypes section
    atomtypes_section = atomtypes_regex.search(content)

    if atomtypes_section:
        atomtypes_section_text = atomtypes_section.group(0)

        # Write the atomtypes section to the output file
        output_file.write_text(atomtypes_section_text)

        # Remove the atomtypes section from the original content
        modified_content = atomtypes_regex.sub("", content)

        # Write the modified content back to the input file
        input_file.write_text(modified_content)


def combine_atomtype_files(files: list[pathlib.Path], output_file: pathlib.Path):
    """Read all the files and write a single output file. Removes duplicates."""
    header = []
    atomtypes = []
    for file in files:
        with file.open("r") as f:
            lines = f.readlines()
            for idx, line in enumerate(lines):
                if idx in [0, 1]:
                    if len(header) < 2:
                        header.append(line)
                else:
                    atomtypes.append(line)

    atomtypes = list(set(atomtypes))
    with output_file.open("w") as f:
        f.writelines(header)
        f.writelines(atomtypes)


def validate_mdp(path):
    necessary_keys = ["nstxout", "nstfout"]
    path = pathlib.Path(path)
    with path.open("r") as f:
        content = f.read()
        for key in necessary_keys:
            if key not in content:
                raise ValueError(
                    f"Key '{key}' is required in {path.name} for writing a trajectory"
                )


[docs] class Smiles2Gromacs(base.IPSNode): """Gromacs Node. Attributes ---------- smiles: list[str] List of SMILES strings for the molecules to be packed. count: list[int] Number of each molecule to pack. labels: list[str] List of 3-letter labels for each molecule. density: float Density of the packed box in g/cm^3. mdp_files: list[str | pathlib.Path] List of paths to the Gromacs MDP files. config_files: list[str | pathlib.Path] Same like mdp_files but in the json or yaml format. These will run BEFORE the MDP files. itp_files: list[str | None]|None if given, for each label either the path to the ITP file or None. The order must match the order of the labels. pdb_files: list[str | pathlib.Path]|None if given, for each label either the path to the PDB file or None. The order must match the order of the labels. production_indices: list[int]|None The gromacs runs that should be stored in the trajectory file. If None, the last run is stored. The order is always the same as the order of the MDP files. Installation ------------ To install the required software, run the following commands: .. code-block:: bash conda install conda-forge::gromacs conda install conda-forge::acpype pip install MDAnalysis pyedr """ smiles: list[str] = zntrack.params() count: list[int] = zntrack.params() labels: list[str] = zntrack.params() density: float = zntrack.params() fudgeLJ: float = zntrack.params(1.0) fudgeQQ: float = zntrack.params(1.0) tolerance: float = zntrack.params(2.0) production_indices: list[int] = zntrack.params(None) cleanup: bool = zntrack.params(True) maxwarn: int = zntrack.params(0) mdp_files: t.Sequence[str | pathlib.Path] = zntrack.deps_path(default_factory=list) config_files: t.Sequence[str | pathlib.Path] = zntrack.params_path( default_factory=list ) itp_files: t.Sequence[str | pathlib.Path | None] = zntrack.deps_path( default_factory=list ) pdb_files: t.Sequence[str | pathlib.Path | None] = zntrack.deps_path( default_factory=list ) traj_file: list[Atoms] = zntrack.outs_path(zntrack.nwd / "structures.h5") output_dir: pathlib.Path = zntrack.outs_path(zntrack.nwd / "gromacs") def __post_init__(self): if len(self.smiles) != len(self.count): raise ValueError("The number of smiles must match the number of counts") if len(self.smiles) != len(self.labels): raise ValueError("The number of smiles must match the number of labels") if self.production_indices is None: self.production_indices = [len(self.mdp_files) + len(self.config_files) - 1] if isinstance(self.output_dir, str): self.output_dir = pathlib.Path(self.output_dir) self.mdp_files = [pathlib.Path(mdp_file) for mdp_file in self.mdp_files] self.config_files = [ pathlib.Path(config_file) for config_file in self.config_files ] # check that the file name without suffix is unique between all files names = [file.stem for file in self.mdp_files + self.config_files] if len(names) != len(set(names)): raise ValueError("The file names must be unique") @property def frames(self) -> list[Atoms]: with self.state.fs.open(self.traj_file, "rb") as f: with h5py.File(f) as file: return znh5md.IO(file_handle=file)[:] def _run_acpype(self): for idx, (label, charge) in enumerate(zip(self.labels, self.charges)): if len(self.itp_files) and self.itp_files[idx] is not None: path = self.output_dir / f"{label}.acpype" path.mkdir(exist_ok=True) shutil.copy(self.itp_files[idx], path / f"{label}_GMX.itp") else: cmd = ["acpype", "-i", f"{label}.pdb", "-n", str(charge), "-b", label] subprocess.run(cmd, check=True, cwd=self.output_dir) def _create_box_gro(self): cmd = [ "echo", "0", "|", "gmx", "editconf", "-f", self.box, "-o", "box.gro", "-box", str((self.box_size * ureg.angstrom).to(ureg.nanometer).magnitude), ] subprocess.run(" ".join(cmd), shell=True, check=True, cwd=self.output_dir) def _create_species_top_atomtypes(self): for idx, label in enumerate(self.labels): if len(self.itp_files) and self.itp_files[idx] is not None: file = self.itp_files[idx] else: file = self.output_dir / f"{label}.acpype/{label}_GMX.itp" shutil.copy(file, self.output_dir / f"{label}.itp") # shutil.copy(file.with_suffix(".top"), self.output_dir / f"{label}.top") extract_atomtypes( self.output_dir / f"{label}.itp", self.output_dir / f"{label}_atomtypes.itp", ) combine_atomtype_files( [self.output_dir / f"{label}_atomtypes.itp" for label in self.labels], self.output_dir / "atomtypes.itp", ) def _create_box_top(self): with (self.output_dir / "box.top").open("w") as f: f.write("[ defaults ]") f.write("\n") f.write("; nbfunc comb-rule gen-pairs fudgeLJ fudgeQQ\n") f.write( f"1 2 yes {self.fudgeLJ} " f" {self.fudgeQQ}\n" ) f.write("\n") f.write("; Include atomtypes\n") f.write('#include "atomtypes.itp"\n') f.write("\n") f.write("; Include topology\n") for label in self.labels: f.write(f'#include "{label}.itp"\n') f.write("\n") f.write("[ system ]\n") f.write(" GMX\n") f.write("\n") f.write("[ molecules ]\n") for label, count in zip(self.labels, self.count): f.write(f"{label} {count}\n") def _run_gmx(self): for file in self.config_files + self.mdp_files: params_to_mdp(file, self.output_dir / file.with_suffix(".mdp").name) for mdp_file in self.config_files + self.mdp_files: cmd = [ "gmx", "grompp", "-f", mdp_file.with_suffix(".mdp").name, "-c", "box.gro", "-p", "box.top", "-o", "box.tpr", "-v", "-maxwarn", str(self.maxwarn), ] print(f"Running {' '.join(cmd)}") subprocess.run(cmd, check=True, cwd=self.output_dir) NTMPI = os.environ.get("IPSUITE_GMX_NTMPI", "1") cmd = ["gmx", "mdrun", "-ntmpi", NTMPI, "-v", "-deffnm", "box"] subprocess.run(cmd, check=True, cwd=self.output_dir) def _pack_box(self): mols = [] charges = [] for idx, (smiles, label) in enumerate(zip(self.smiles, self.labels)): if len(self.pdb_files) and self.pdb_files[idx] is not None: shutil.copy(self.pdb_files[idx], self.output_dir / f"{label}.pdb") m = Chem.MolFromSmiles(smiles) m = Chem.AddHs(m) AllChem.EmbedMolecule(m) AllChem.UFFOptimizeMolecule(m) mols.append(m) else: mols.append( smiles_to_pdb(smiles, f"{label}.pdb", label, cwd=self.output_dir) ) # get the charge of the molecule charges.append(Chem.GetFormalCharge(mols[-1])) self.charges = charges self.box_size = get_box(self.density, mols, self.count) create_pack_script( [f"{label}.pdb" for label in self.labels], self.count, self.box_size, self.tolerance, cwd=self.output_dir, ) cmd = ["packmol < packmol.inp"] subprocess.run(cmd, check=True, shell=True, cwd=self.output_dir) self.box = "box.pdb" def _convert_trajectory(self): io = znh5md.IO(self.traj_file, store="time") for idx in sorted(self.production_indices): if idx == len(self.mdp_files) + len(self.config_files) - 1: gro = self.output_dir / "box.gro" trr = self.output_dir / "box.trr" edr = self.output_dir / "box.edr" else: gro = self.output_dir / f"#box.gro.{idx + 1}#" trr = self.output_dir / f"#box.trr.{idx + 1}#" edr = self.output_dir / f"#box.edr.{idx + 1}#" u = mda.Universe(gro, trr, topology_format="GRO", format="TRR") aux = mda.auxiliary.EDR.EDRReader(edr) u.trajectory.add_auxiliary(auxdata=aux) atoms_list = [] timesteps = [] for ts in u.trajectory: atoms, timestep = timestep_to_atoms(u, ts) atoms_list.append(atoms) timesteps.append(timestep) if len(timesteps) > 1: io.timestep = timesteps[-1] - timesteps[-2] # Assuming constant timestep else: io.timestep = 1 io.extend(atoms_list)
[docs] def run(self): self.output_dir.mkdir(exist_ok=True, parents=True) files = self.config_files + self.mdp_files validate_mdp(files[-1]) self._pack_box() self._create_box_gro() self._run_acpype() self._create_species_top_atomtypes() self._create_box_top() self._run_gmx() self._convert_trajectory() if self.cleanup: paths = list(self.output_dir.iterdir()) with (self.output_dir / "info.txt").open("w") as f: f.write("The following data has been removed:\n") f.write("\n".join([file.name for file in paths])) for path in paths: if path.is_file(): path.unlink() else: shutil.rmtree(path)