Source code for qepsilon.system.particles

"""
This file contains the particle class for the QEpsilon project.
"""

import torch as th
import numpy as np
import warnings
from qepsilon.utilities import Constants, Constants_Metal 

class OpticalTweezer:
    def __init__(self, min_waist, wavelength, max_depth, center: th.Tensor, axis: th.Tensor=th.tensor([0.0, 0.0, 1.0]), on = True):
        """
        Initialize the optical tweezer.
        Args:
            min_waist: float, the minimum waist of the tweezer
            wavelength: float, the wavelength of the tweezer
            max_depth: float, the maximum depth of the tweezer
            center: th.Tensor, shape (3), the center of the tweezer
            axis: th.Tensor, shape (3), the axis of the tweezer
        """
        self.min_waist = min_waist
        self.wavelength = wavelength
        self.max_depth = max_depth
        self.zR = np.pi * self.min_waist**2 / self.wavelength
        self.center = center
        self.axis = axis / th.norm(axis)  ## normalize the axis
        self.on = on
        self.pot_engine = None

    def get_pot(self, positions: th.Tensor):
        """
        Get the potential at a given position.
        Args:
            positions: th.Tensor, shape (nbatch, n_particles, 3), the positions of the particles
        Returns:
            the potential at the given position, shape (nbatch)
        """
        R = positions - self.center[None, None,:]
        z = (R * self.axis).sum(dim=-1) # (nbatch, n_particles)
        r = R - z[:,:, None] * self.axis[None, None,:]
        r = th.norm(r, dim=-1) # (nbatch, n_particles)
        wz = self.min_waist * th.sqrt(1 + (z / self.zR)**2)
        pot = - self.max_depth * (self.min_waist / wz)**2 * th.exp(-2 * r**2 / wz**2)
        return pot
    
    def init_pot_engine(self, example_input: th.Tensor):
        center = self.center
        axis = self.axis
        min_waist = self.min_waist
        max_depth = self.max_depth
        zR = self.zR
        def pot_engine(positions: th.Tensor):
            """
            Get the potential at a given position.
            """
            R = positions - center[None, None,:]
            z = (R * axis).sum(dim=-1) # (nbatch, n_particles)
            r = R - z[:,:, None] * axis[None, None,:]
            r = th.norm(r, dim=-1) # (nbatch, n_particles)
            wz = min_waist * th.sqrt(1 + (z / zR)**2)
            pot = - max_depth * (min_waist / wz)**2 * th.exp(-2 * r**2 / wz**2)
            pot = pot.sum()
            return pot
        self.pot_engine = th.jit.trace(pot_engine, example_input)
        return
    
    def get_force(self, positions: th.Tensor):
        """
        Get the force at a given position.
        """
        input = positions.clone()
        input.requires_grad = True
        pot = self.pot_engine(input)
        force = - th.autograd.grad(pot, input)[0]
        return force

    def get_info(self):
        return {
            "on": self.on,
            "center": self.center,
            "axis": self.axis,
            "min_waist": self.min_waist,
            "wavelength": self.wavelength,
            "max_depth": self.max_depth,
        }


[docs] class Particles(th.nn.Module): """ This class represents particles in the QEpsilon project. """
[docs] def __init__(self, n_particles: int, batchsize: int = 1, ndim: int = 3, mass: float or th.Tensor = 1.0, dt: float = 0.1, tau: float = None, unit: str = 'pm_ps'): super().__init__() self.nq = n_particles self.nb = batchsize self.ndim = ndim self.positions = th.zeros(batchsize, n_particles, ndim) self.velocities = th.zeros(batchsize, n_particles, ndim) self.forces = th.zeros(batchsize, n_particles, ndim) if isinstance(mass, float): self.masses = th.ones(n_particles) * mass elif isinstance(mass, th.Tensor): if mass.shape != (n_particles,): raise ValueError(f"Mass must have shape ({n_particles,})") else: self.masses = mass.to(dtype=self.positions.dtype, device=self.positions.device) else: raise ValueError(f"Mass must be a float or a tensor.") ## unit system if unit == 'um_us': self.unit = Constants elif unit == 'pm_ps': self.unit = Constants_Metal else: raise ValueError(f"Unit must be 'um_us' or 'pm_ps'") self.dt = dt ## parameters for Langevin dynamics if tau is None: self.tau = 100 * dt else: self.tau = tau self.gamma = 1.0 / self.tau ## this gamma is the damping coefficient in dv/dt = -gamma v + noise ## history of positions self.traj = []
########################################################################### # Methods for dealing with particles ###########################################################################
[docs] def get_positions(self): return self.positions.clone().detach()
[docs] def get_velocities(self): return self.velocities.clone().detach()
[docs] def get_temperature(self): temp = (self.get_velocities()**2) * self.masses[None,:,None] / self.unit.kb temp = temp.mean(1) return temp
[docs] def get_forces(self): return self.forces.clone().detach()
[docs] def get_trajectory(self): return th.stack(self.traj)
[docs] def set_positions(self, positions: th.Tensor): if positions.shape == (self.nb, self.nq, self.ndim): self.positions = positions elif positions.shape == (self.nq, self.ndim): self.positions = positions.repeat(self.nb,1,1) else: raise ValueError(f"Positions must have shape ({self.nb}, {self.nq}, {self.ndim}) or ({self.nq}, {self.ndim})")
[docs] def set_gaussian_velocities(self, temp: float = None): self.velocities = th.randn_like(self.positions) temp = th.ones(self.ndim, dtype=self.positions.dtype, device=self.positions.device) * temp self.velocities *= th.sqrt(self.unit.kb * temp)[None,None,:] self.velocities *= th.sqrt(1 / self.masses[None,:,None])
[docs] def set_velocities(self, velocities: th.Tensor): if velocities.shape != (self.nb, self.nq, self.ndim): raise ValueError(f"Velocities must have shape ({self.nb}, {self.nq}, {self.ndim})") self.velocities = velocities
[docs] def zero_forces(self): self.forces = th.zeros_like(self.positions).detach()
[docs] def modify_forces(self, df: th.Tensor): """ Modify the forces on the particles. """ self.forces += df
[docs] def modify_forces_by_harmonic_trap(self, omega: float or th.Tensor, x0: th.Tensor = None): """ Modify the forces on the particles by a harmonic trap. Args: omega: float or th.Tensor, the frequency of the harmonic trap. Shape: (self.ndim) or (self.nq,self.ndim) x0: th.Tensor, the position of the harmonic trap. Shape: (self.ndim) or (self.nq,self.ndim) """ if isinstance(omega, float): _omega = th.ones(self.ndim, dtype=self.masses.dtype, device=self.masses.device) * omega _omega = _omega.reshape(1,1,self.ndim) elif isinstance(omega, th.Tensor): if omega.shape == (self.ndim,): _omega = omega.reshape(1,1,self.ndim) elif omega.shape == (self.nq, self.ndim): _omega = omega.reshape(1,self.nq,self.ndim) else: raise ValueError(f"omega must have shape ({self.ndim,}) or ({self.nq}, {self.ndim})") else: raise ValueError(f"omega must be a float or a tensor of shape ({self.ndim,}) or ({self.nq}, {self.ndim})") if x0 is None: eq_pos = th.zeros(self.ndim, dtype=self.positions.dtype, device=self.positions.device).reshape(1,1,self.ndim) elif isinstance(x0, th.Tensor): if x0.shape == (self.ndim,): eq_pos = x0.reshape(1,1,self.ndim).to(dtype=self.positions.dtype, device=self.positions.device) elif x0.shape == (self.nq, self.ndim): eq_pos = x0.reshape(1,self.nq,self.ndim).to(dtype=self.positions.dtype, device=self.positions.device) else: raise ValueError(f"x0 must have shape ({self.ndim,}) or ({self.nq}, {self.ndim})") else: raise ValueError(f"The equilibrium position x0 must be a tensor") self.forces += - self.masses[None,:,None] * _omega**2 * (self.positions - eq_pos) return
[docs] def reset(self, temp: float = None): """ Reset the particles to a thermal equilibrium state. """ self.set_gaussian_velocities(temp=temp) self.traj = []
########################################################################### # Methods for simulating dynamics ###########################################################################
[docs] def get_noise(self, temp = None): noise = th.randn_like(self.positions) _temp = th.ones(self.ndim, dtype=self.positions.dtype, device=self.positions.device) * temp noise *= th.sqrt(self.unit.kb * _temp)[None,None,:] noise *= th.sqrt(1 / self.masses[None,:,None]) return noise
[docs] def step_langevin(self, record_traj=False, temp = None): """ Isothermal Langevin dynamics. Update the positions and velocities by one time step with mid-point Langevin method. """ dt = self.dt ## parameters for Langevin dynamics z1 = np.exp(-dt * self.gamma) z2 = np.sqrt(1 - np.exp(-2 * dt * self.gamma)) ## initial values v0 = self.get_velocities() x0 = self.get_positions() ## update velocities for one step with gradient force v1 = v0 + self.forces * dt / self.masses[None,:,None] ## update positions for half step x1 = x0 + 0.5 * v1 * dt ## modify velocity with damping and random force noise = self.get_noise(temp) v1 = z1 * v1 + z2 * noise ## update positions for half step x1 += 0.5 * v1 * dt ## wrap up self.set_positions(x1) self.set_velocities(v1) if record_traj: self.traj.append(x1.clone().detach()) return
[docs] def step_adiabatic(self, record_traj=False): """ Adiabatic dynamics. Update the positions and velocities by one time step with leapfrog method. """ dt = self.dt ## initial values v0 = self.get_velocities() x0 = self.get_positions() ## update velocities for one step with gradient force v1 = v0 + self.forces * dt / self.masses[None,:,None] ## update positions for half step x1 = x0 + v1 * dt ## wrap up self.set_positions(x1) self.set_velocities(v1) if record_traj: self.traj.append(x1.clone().detach()) return
[docs] class ParticlesInTweezers(Particles): """ This class represents particles in the QEpsilon project. """
[docs] def __init__(self, n_particles: int, batchsize: int = 1, mass: float = 1.0, radial_temp: float = 1.0, axial_temp: float = 1.0, dt: float = 0.1, tau: float = None, unit: str = 'um_us'): ndim = 3 super().__init__(n_particles, batchsize, ndim, mass, dt, tau, unit) self.radial_temp = radial_temp self.axial_temp = axial_temp self._traps_dict = {} self.traj = []
########################################################################### # Methods for dealing with optical tweezers ###########################################################################
[docs] def init_tweezers(self, id, min_waist, wavelength, max_depth, center, axis, on=True): """ Initialize a tweezer. Args: id: int, the id of the tweezer min_waist: float, the minimum waist of the tweezer wavelength: float, the wavelength of the tweezer max_depth: float, the maximum depth of the tweezer center: th.Tensor, shape (3), the center of the tweezer axis: th.Tensor, shape (3), the axis of the tweezer """ ## sanitary check if id in self._traps_dict: raise ValueError(f"Tweezer with id {id} already exists") self._traps_dict[id] = OpticalTweezer(min_waist, wavelength, max_depth, center, axis, on)
[docs] def get_tweezer_by_id(self, id): return self._traps_dict[id]
[docs] def remove_tweezer(self, id): del self._traps_dict[id]
[docs] def turn_on_tweezer(self, id): self._traps_dict[id].on = True
[docs] def turn_off_tweezer(self, id): self._traps_dict[id].on = False
[docs] def get_trapping_info(self): infos = {} for id, trap in self._traps_dict.items(): infos[id] = trap.get_info() return infos
########################################################################### # Methods for dealing with particles ###########################################################################
[docs] def set_positions_at_tweezer_center(self): self.positions = th.zeros_like(self.positions) if len(self._traps_dict) != self.nq: raise ValueError(f"Number of tweezers ({len(self._traps_dict)}) must be equal to the number of particles ({self.nq}) if setting positions at tweezer centers") for idx, trap in enumerate(self._traps_dict.values()): self.positions[:, idx, :] += trap.center.to(dtype=self.positions.dtype, device=self.positions.device)
[docs] def set_gaussian_velocities(self, radial_temp: float = None, axial_temp: float = None): if radial_temp is None: radial_temp = self.radial_temp if axial_temp is None: axial_temp = self.axial_temp self.velocities = th.randn_like(self.positions) temp = th.tensor([self.radial_temp, self.radial_temp, self.axial_temp], dtype=self.positions.dtype, device=self.positions.device) self.velocities *= th.sqrt(self.unit.kb * temp)[None,None,:] self.velocities *= th.sqrt(1 / self.masses[None,:,None])
[docs] def get_trapping_forces(self): """ Get the trapping forces on the particles. """ forces = th.zeros_like(self.positions.detach()) for trap in self._traps_dict.values(): if trap.pot_engine is None: trap.init_pot_engine(self.positions) if trap.on: forces += trap.get_force(self.positions) return forces
[docs] def reset(self): """ Reset the particles to a thermal equilibrium state. """ self.set_positions_at_tweezer_center() self.set_gaussian_velocities() self.equilibrate(nsteps=200, tau=100*self.dt) self.traj = []
########################################################################### # Methods for simulating dynamics ###########################################################################
[docs] def get_noise(self, temp=None): if temp is not None: raise ValueError("get_noise in ParticlesInTweezers does not support custom temperature. The temperature is set by the radial and axial temperatures.") noise = th.randn_like(self.positions) temp = th.tensor([self.radial_temp, self.radial_temp, self.axial_temp], dtype=self.positions.dtype, device=self.positions.device) noise *= th.sqrt(self.unit.kb * temp)[None,None,:] noise *= th.sqrt(1 / self.masses[None,:,None]) return noise
[docs] def equilibrate(self, nsteps: int, tau: float = 1): """ Equilibrate the particles with Langevin dynamics. """ dt = self.dt system_damping = self.gamma * 1.0 self.gamma = 1 / tau for _ in range(nsteps): self.zero_forces() self.modify_forces(self.get_trapping_forces()) self.step_langevin(record_traj=False) self.gamma = system_damping return
class PathIntegralParticles(Particles): def __init__(self, n_particles: int, batchsize: int = 1, mass: float = 1.0, temperature: float = 1.0, unit: str = 'pm_ps'): ndim = 3 super().__init__(n_particles, batchsize, ndim, mass, unit=unit) self.temperature = temperature self.spring_constant = None raise NotImplementedError("Path integral particles are not implemented yet")