'''
The UnbiasedSimulation class for running MD simulations.
'''
import os
from sys import stdout
from pathlib import Path
from mdtraj.reporters import XTCReporter
from .reporter import CVReporter, MinimizationReporter
from . import Charmm36mFF
import openmm
import openmm.app as app
from openmm.unit import angstroms, picoseconds, kelvin, bar
from openmm.unit import Quantity, is_quantity
[docs]
class UnbiasedSimulation():
'''
UnbiasedSimulation class for running MD. An example use is:
.. code-block:: python
import af2rave.simulation as af2sim
sim = af2sim.UnbiasedSimulation(<arguments>)
sim.run(steps)
These are the parameters the initialization function accepts:
:param pdb_file: Path to OpenMM.app.pdbfile.PDBFile object
:type pdb_file: str
:param list_of_index: List of indexes to calculate the CVs
:type list_of_index: list[tuple[int, int]]
:param forcefield: OpenMM.app.ForceField object. Default: Charmm36mFF
:type forcefield: OpenMM.app.ForceField
:param temp: Temperature of the system. Default: 310 K
:type temp: float | openmm.unit.Quantity[unit=kelvin]
:param pressure: Pressure of the system. Default: 1 Bar.
Can be set to none to disable pressure coupling (NVT).
:type pressure: float | openmm.unit.Quantity[unit=bar]
:param dt: Time step of the integrator. Default: 0.002 ps
:type dt: float | openmm.unit.Quantity[unit=picoseconds]
:param cutoff: Nonbonded cutoff. Default: 10.0 Angstroms
:type cutoff: float | openmm.unit.Quantity[unit=angstroms]
:param steps: Simulation steps. Default: 50 million steps (100 ns)
:type steps: int
:param cv_file: File to write CVs to. Default: COLVAR.dat
:type cv_file: str
:param cv_freq: Frequency of CVs written to the file. Default: 50
:type cv_freq: int
:param xtc_file: Trajectory file name. Default: traj.xtc
:type xtc_file: str
:param xtc_freq: Frequency writing trajectory in steps. Default: 1000
:type xtc_freq: int
:param xtc_reporter: XTCReporter object.
This overrides traj_filename and traj_freq. Default: None
:type xtc_reporter: mdtraj.reporters.XTCReporter
:param custom_forces: List of custom forces to be added to the system.
Default: None
:param append: Appends to existing file. Default: False
:type append: bool
:param progress_every: Progress report to stdout frequency. Default: 1000
Can be set to None to suppress this.
:type progress_every: int
'''
def __init__(self, pdb_file, **kwargs):
self._prefix = f"{Path(pdb_file).stem}"
pdb_file = app.pdbfile.PDBFile(pdb_file)
self._pos = pdb_file.positions
self._top = pdb_file.topology
self._pressure = self._get_pressure(**kwargs)
self._temp = self._get_temperature(**kwargs)
self._forcefield = kwargs.get('forcefield', Charmm36mFF)
self._platform = self._choose_platform()
self.simulation = self._initialize_simulation(**kwargs)
self._append = kwargs.get('append', False)
# Reporters
self._progress_every = kwargs.get('progress_every', 1000)
self._add_reporter(self._get_cv_reporter(**kwargs))
self._add_reporter(self._get_xtc_reporter(**kwargs))
# pressure coupling if needed
if self._pressure is not None:
self.simulation.context.getSystem().addForce(
openmm.MonteCarloBarostat(self._pressure, self._temp)
)
self.simulation.context.reinitialize()
@property
def pos(self) -> list[openmm.Vec3]:
'''
Get the positions of the simulation.
:return: List of positions
:rtype: list[openmm.Vec3]
'''
self._pos = self.simulation.context.getState(getPositions=True).getPositions()
return self._pos
[docs]
def run(self, steps: int = 50000000) -> app.Simulation:
'''
Run the simulation from given pdb file. Default: 50 million steps (100 ns).
:param steps: Number of steps to run the simulation.
Default: 50 million steps (100 ns)
:type steps: int
'''
# we have to do this at this later stage, because by design we
# do not know the number of steps at the time of initialization
self._add_reporter(self._get_thermo_reporter(steps))
self.simulation.context.setPositions(self._pos)
self.simulation.minimizeEnergy(
maxIterations=500, reporter=MinimizationReporter(500)
)
self.save_pdb(self._prefix + "_minimized.pdb")
self.simulation.step(steps)
return self.simulation
[docs]
def restart(self, steps: int = 50000000, restart_file: str = None) -> app.Simulation:
'''
Run the simulation from given pdb file. Default: 50 million steps (100 ns).
:param steps: Number of steps to run the simulation.
Default: 50 million steps (100 ns)
:type steps: int
:param restart_file: Name of the checkpoint file. Default: {prefix}.chk
:type restart_file: str
'''
self._load_checkpoint(restart_file)
self._add_reporter(self._get_thermo_reporter(steps))
self.simulation.step(steps)
return self.simulation
[docs]
def save_checkpoint(self, filename: str = None) -> None:
'''
Save the checkpoint of the simulation.
:param filename: Name of the checkpoint file. Default: `{prefix}_out.chk`
:type filename: str
'''
if filename is None:
filename = f"{self._prefix}_out.chk"
self.simulation.saveCheckpoint(filename)
[docs]
def save_pdb(self, filename: str = None) -> None:
'''
Save the final state PDB of the simulation.
:param filename: Name of the pdb file. Default: `{prefix}_out.pdb`
:type filename: str
'''
if filename is None:
filename = f"{self._prefix}_out.pdb"
with open(filename, 'w') as ofs:
# Note that this pos has no underscore, so it will call the property
# and get the latest positions from the simulation context
app.PDBFile.writeFile(self._top, self.pos, ofs, keepIds=True)
def _load_checkpoint(self, restart_file: str) -> None:
'''
Load the checkpoint of the simulation.
:param filename: Name of the checkpoint file.
:type filename: str
'''
if restart_file is None:
restart_file = self._prefix + ".chk"
print(f"No restart file provided. Attempting from default {restart_file}.")
if not os.path.exists(restart_file):
raise FileNotFoundError("Checkpoint file does not exist")
if restart_file.endswith(".chk"):
self.simulation.loadCheckpoint(restart_file)
elif restart_file.endswith(".pdb"):
pdb = app.PDBFile(restart_file)
self.simulation.context.setPositions(pdb.positions)
def _initialize_simulation(self, **kwargs) -> app.Simulation:
'''
Create the integrator for the system using LangevinMiddleIntegrator.
Returns the OpenMM simulation object.
:param dt: Time step of the simulation. Default: 0.002, in ps
:type dt: openmm.unit.
:param cutoff: Nonbonded cutoff. Default: 10.0, in Angstroms
:type cutoff: float
'''
dt = kwargs.get('dt', 0.002 * picoseconds)
if not is_quantity(dt):
dt = dt * picoseconds
cutoff = kwargs.get('cutoff', 10.0 * angstroms)
if not is_quantity(cutoff):
cutoff = cutoff * angstroms
system = self._forcefield.createSystem(
self._top,
nonbondedMethod=app.PME,
nonbondedCutoff=cutoff,
constraints=app.HBonds
)
custom_forces = kwargs.get('custom_forces', None)
if custom_forces is not None:
if not isinstance(custom_forces, list):
custom_forces = [custom_forces]
for force in custom_forces:
system.addForce(force)
integrator = openmm.LangevinMiddleIntegrator(self._temp, 1 / picoseconds, dt)
simulation = app.Simulation(self._top, system, integrator, self._platform)
return simulation
def _choose_platform(self) -> openmm.Platform:
'''
Choose the platform for the simulation.
1. CUDA 2. OpenCL 3. CPU
:return: OpenMM platform
:rtype: OpenMM.Platform
:raises: RuntimeError if no suitable platform is found.
'''
# enumerate all platforms with openmm
n_platforms = openmm.Platform.getNumPlatforms()
platforms = [openmm.Platform.getPlatform(i) for i in range(n_platforms)]
platform_names = [platform.getName() for platform in platforms]
# We will use platforms in the following order
selection = ["CUDA", "OpenCL", "CPU"]
for plt in selection:
if plt in platform_names:
print(f"Using {plt} platform.")
return platforms[platform_names.index(plt)]
# if the code reaches here something is wrong
raise RuntimeError("No suitable platform found. Attempted: CUDA, OpenCL, CPU")
def _get_thermo_reporter(self, steps: int) -> None | app.StateDataReporter:
'''
Initialize the state reporters for the simulation.
'''
if self._progress_every is None:
return None
rep = app.StateDataReporter(stdout,
self._progress_every, step=True,
potentialEnergy=True, temperature=True, volume=True,
progress=True, remainingTime=True,
totalSteps=steps, elapsedTime=True,
speed=True, separator="\t", append=self._append
)
return rep
def _get_cv_reporter(self, **kwargs) -> CVReporter | None:
'''
Get the CV reporter for the simulation.
:return: CVReporter object or None
:rtype: CVReporter | None
'''
if "cv_reporter" in kwargs:
return kwargs["cv_reporter"]
list_of_index = kwargs.get('list_of_index', None)
if list_of_index is not None:
cv_file = kwargs.get('cv_file', self._prefix + "_colvar.dat")
cv_freq = kwargs.get('cv_freq', 50)
append = kwargs.get('append', False)
return CVReporter(cv_file, cv_freq, list_of_index, append)
print("No atom indices provided. Will not output CV timeseries.")
return None
def _get_xtc_reporter(self, **kwargs) -> XTCReporter | None:
'''
Get the CV reporter for the simulation.
:return: CVReporter object or None
:rtype: CVReporter | None
'''
if "xtc_reporter" in kwargs:
return kwargs["xtc_reporter"]
else:
xtc_file = kwargs.get('xtc_file', self._prefix + ".xtc")
xtc_freq = kwargs.get('xtc_freq', 1000)
# We will allow the user to suppress trajectory writing by setting
# either traj_filename or traj_freq to None
if (xtc_file is None) or (xtc_freq is None):
return None
return XTCReporter(xtc_file, xtc_freq, append=self._append)
def _get_pressure(self, **kwargs) -> Quantity | None:
'''
Get the pressure for the simulation.
:return: Pressure of the system
:rtype: openmm.unit.Quantity[unit=bar]
'''
pressure = kwargs.get('pressure', 1.0 * bar)
if pressure is None:
return None
if not is_quantity(pressure):
return pressure * bar
return pressure
def _get_temperature(self, **kwargs) -> Quantity:
'''
Get the temperature for the simulation.
:return: Temperature of the system
:rtype: openmm.unit.Quantity[unit=kelvin]
'''
temp = kwargs.get('temp', 310.0 * kelvin)
if temp is None:
raise ValueError("Temperature cannot be set to None.")
if not is_quantity(temp):
return temp * kelvin
return temp
def _add_reporter(self, reporter) -> None:
'''
Add a reporter to the simulation. Allows a NoneType reporter which is ignored.
:param reporter: Reporter object
:type reporter: openmm.app.StateDataReporter | CVReporter | XTCReporter
'''
if reporter is None:
pass
else:
self.simulation.reporters.append(reporter)