Source code for qepsilon.simulation.lindblad_system

"""
This file contains the Lindblad simulation class for the QEpsilon project.
"""

import numpy as np
import torch as th
from qepsilon.operator_group import *
from qepsilon.system.density_matrix import DensityMatrix, QubitDensityMatrix
from qepsilon.system.particles import Particles
from qepsilon.utilities import ABAd, trace
import logging
import warnings
from time import time as timer

[docs] class LindbladSystem(th.nn.Module): r""" This class represents a generic open quantum system. The states are represented by a density matrix. The evolution of the system is governed by the Lindblad equation, which is a generalization of the Schrodinger equation for open quantum systems. Both the Hamiltonian and the jump operators here allows fluctuating coefficients. So technically, we are dealing with a stochastic master equation. The Lindblad equation is given as :math:`d\rho(t) / dt = -i [H(t), \rho(t)] + \sum_k \gamma_k L_k(t) \rho(t) L_k(t)^\dagger - 1/2 \{L_k(t)^\dagger L_k(t), \rho(t)\}` where :math:`H(t)` is the Hamiltonian, :math:`L_k(t)` is the jump operator. Note that the coefficients of the jump operators are absorbed into :math:`L_k(t)`. In this class, each component of the Hamiltonian :math:`H(t)`, and each jump operator :math:`L_k(t)`, is represented by a OperatorGroup object. Each OperatorGroup object contains a group of operators and the corresponding coefficients. The operators themselves are time-independent, like a Pauli operator. But the coefficients can be time-dependent stochastic processes. This corresponds to the case where the qubits are coupled to noisy environments such as inhomogeneous, fluctuating magnetic fields. """
[docs] def __init__(self, num_states: int, batchsize: int = 1): super().__init__() self.ns = num_states self.nb = batchsize self.density_matrix = DensityMatrix(num_states, batchsize) self._hamiltonian_operator_group_dict = th.nn.ModuleDict() self._jumping_group_dict = th.nn.ModuleDict() self._channel_group_dict = th.nn.ModuleDict() self._ham = None self._jump = None self._ham_eff = None
[docs] def to(self, device='cuda'): """ This overrides the ``to`` method of PyTorch Module. It is used to move all relevant components of the system to a specific device. """ self.density_matrix.to(device=device) for operator_group in self._hamiltonian_operator_group_dict.values(): operator_group.to(device=device) for operator_group in self._jumping_group_dict.values(): operator_group.to(device=device) for operator_group in self._channel_group_dict.values(): operator_group.to(device=device) if self._ham is not None: self._ham = self._ham.to(device=device) if self._jump is not None: self._jump = [jump.to(device=device) for jump in self._jump]
############################################################ # Setter and getter ############################################################ @property def hamiltonian_operator_groups(self): return list(self._hamiltonian_operator_group_dict.values()) @property def jumping_operator_groups(self): return list(self._jumping_group_dict.values()) @property def channel_operator_groups(self): return list(self._channel_group_dict.values())
[docs] def get_hamiltonian_operator_group_by_ID(self, id: str): if id in self._hamiltonian_operator_group_dict: return self._hamiltonian_operator_group_dict[id] else: raise ValueError(f"The ID {id} does not exist in the Hamiltonian operator group dictionary.")
[docs] def get_jumping_operator_group_by_ID(self, id: str): if id in self._jumping_group_dict: return self._jumping_group_dict[id] else: raise ValueError(f"The ID {id} does not exist in the jumping operator group dictionary.")
[docs] def get_channel_operator_group_by_ID(self, id: str): if id in self._channel_group_dict: return self._channel_group_dict[id] else: raise ValueError(f"The ID {id} does not exist in the channel operator group dictionary.")
@property def rho(self): """ This property returns the density matrix of the system. """ rho = self.density_matrix.get_rho() if rho is None: raise ValueError("The density matrix has not been initialized.") return rho @rho.setter def rho(self, rho: th.Tensor): if rho.dtype != th.cfloat: _rho = rho.to(dtype=th.cfloat) else: _rho = rho self.density_matrix.set_rho(_rho) @property def hamiltonian(self): return self._ham # do not set the Hamiltonian buffer directly. # @hamiltonian.setter # def hamiltonian(self, hamiltonian: th.Tensor): # self._ham = hamiltonian @property def jump(self): return self._jump # do not set the jump buffer directly. # @jump.setter # def jump(self, jump: list[th.Tensor]): # self._jump = jump
[docs] def reset(self): """ Reset the system. """ for operator_group in self._hamiltonian_operator_group_dict.values(): operator_group.reset() for operator_group in self._jumping_group_dict.values(): operator_group.reset()
[docs] def HamiltonianParameters(self): parameters_list = [] for operator_group in self._hamiltonian_operator_group_dict.values(): parameters_list.extend(list(operator_group.parameters())) return parameters_list
[docs] def JumpingParameters(self): parameters_list = [] for operator_group in self._jumping_group_dict.values(): parameters_list.extend(list(operator_group.parameters())) return parameters_list
[docs] def ChannelParameters(self): parameters_list = [] for operator_group in self._channel_group_dict.values(): parameters_list.extend(list(operator_group.parameters())) return parameters_list
############################################################ # Methods for adding operator groups ############################################################
[docs] def add_operator_group_to_hamiltonian(self, operator_group: OperatorGroup): """ This function adds an operator group to the Hamiltonian part of the system. Args: operator_group: an OperatorGroup object, containing a group of operators and the corresponding coefficients. """ if operator_group.id in self._hamiltonian_operator_group_dict: raise ValueError(f"The ID {operator_group.id} already exists.") if operator_group.nb != self.nb: raise ValueError(f"The batchsize of the operator group {operator_group.id} does not match the batchsize of the system.") if operator_group.ns != self.ns: raise ValueError(f"The dimension of the operator group {operator_group.id} does not match the number of states of the system.") self._hamiltonian_operator_group_dict[operator_group.id] = operator_group
[docs] def add_operator_group_to_jumping(self, operator_group: OperatorGroup): """ This function adds an operator group to the jumping part of the system. Args: operator_group: an OperatorGroup object, containing a group of operators and the corresponding coefficients. """ if operator_group.id in self._jumping_group_dict: raise ValueError(f"The ID {operator_group.id} already exists.") if operator_group.nb != self.nb: raise ValueError(f"The batchsize of the operator group {operator_group.id} does not match the batchsize of the system.") if operator_group.ns != self.ns: raise ValueError(f"The dimension of the operator group {operator_group.id} does not match the number of states of the system.") self._jumping_group_dict[operator_group.id] = operator_group
[docs] def add_operator_group_to_channel(self, operator_group: OperatorGroup): """ This function adds an operator group to the channel part of the system. """ if operator_group.id in self._channel_group_dict: raise ValueError(f"The ID {operator_group.id} already exists.") ## TODO: check if self.ns=operator_group.ns; check also if self.nb=operator_group.nb self._channel_group_dict[operator_group.id] = operator_group
[docs] def remove_operator_group_from_hamiltonian(self, id: str): """ This function removes an operator group from the Hamiltonian part of the system. """ if id not in self._hamiltonian_operator_group_dict: raise ValueError(f"The ID {id} does not exist in the Hamiltonian operator group dictionary.") del self._hamiltonian_operator_group_dict[id]
[docs] def remove_operator_group_from_jumping(self, id: str): """ This function removes an operator group from the jumping part of the system. """ if id not in self._jumping_group_dict: raise ValueError(f"The ID {id} does not exist in the jumping operator group dictionary.") del self._jumping_group_dict[id]
[docs] def remove_operator_group_from_channel(self, id: str): """ This function removes an operator group from the channel part of the system. """ if id not in self._channel_group_dict: raise ValueError(f"The ID {id} does not exist in the channel operator group dictionary.") del self._channel_group_dict[id]
############################################################ # Methods for evolving the system ############################################################
[docs] def step_hamiltonian(self, dt: float, set_buffer: bool = False): """ This function steps the Hamiltonian for a time step dt. Args: dt: a float, the time step. Returns: hamiltonian: a (self.nb, self.ns, self.ns) tensor, the Hamiltonian operator at time t. """ hamiltonian = 0 for operator_group in self._hamiltonian_operator_group_dict.values(): ops, coefs = operator_group.sample(dt) ## sanitary check if coefs.shape != (self.nb,): raise ValueError("The coefficients sampled from an operator group should be a 1D tensor of length equal to the batchsize.") ## no broadcasting if the operators is already batched if ops.shape == (self.nb, self.ns, self.ns): hamiltonian += ops * coefs[:, None, None] ## broadcast if the operators is not already batched elif ops.shape == (self.ns, self.ns): hamiltonian += ops[None, :, :] * coefs[:, None, None] else: raise ValueError(f"The shape of the operator sampled from operator group {operator_group.id} should either be (batchsize, n_states, n_states) or (n_states, n_states).") if set_buffer: self._ham = hamiltonian return hamiltonian
[docs] def step_jumping(self, dt: float, set_buffer: bool = False): """ This function steps the density matrix for a time step dt. Args: dt: a float, the time step. Returns: jump_operator_list: a list of (self.nb, self.ns, self.ns) tensors, the jump operators at time t. """ jump_operator_list = [] for operator_group in self._jumping_group_dict.values(): ops, coefs = operator_group.sample(dt) ## sanitary check if coefs.shape != (self.nb,): raise ValueError("The coefficients sampled from an operator group should be a 1D tensor of length equal to the batchsize.") ## no broadcasting if the operators is already batched if ops.shape == (self.nb, self.ns, self.ns): jump_operator_list.append(ops * coefs[:, None, None]) ## broadcast if the operators is not already batched elif ops.shape == (self.ns, self.ns): jump_operator_list.append(ops[None, :, :] * coefs[:, None, None]) else: raise ValueError(f"The shape of the operator sampled from operator group {operator_group.id} should be (batchsize, n_states, n_states) or (n_states, n_states).") if set_buffer: self._jump = jump_operator_list return jump_operator_list
[docs] def step_rho(self, dt: float, hamiltonian: th.Tensor, jump_operators: list[th.Tensor], time_independent: bool = False, profile: bool = False): """ This function steps the density matrix for a time step dt. Args: dt: a float, the time step. hamiltonian: a (self.nb, self.ns, self.ns) tensor, the Hamiltonian operator at time t. jump_operators: a list of (self.nb, self.ns, self.ns) tensors, the jump operators at time t. Returns: rho_new: a (self.nb, self.ns, self.ns) tensor, the density matrix at time t+dt. """ rho = self.rho identity = th.eye(self.ns, dtype=th.cfloat).to(rho.device).unsqueeze(0).repeat(self.nb, 1, 1) ## get the effective Hamiltonian=H + 1/2j \sum_k L_k^\dagger L_k if profile: t0 = timer() th.cuda.synchronize() if time_independent and self._ham_eff is not None: ham_eff = self._ham_eff else: ham_eff = hamiltonian.clone() for jump_operator in jump_operators: ham_eff -= 1j * 0.5 * th.matmul(jump_operator.conj().permute(0, 2, 1), jump_operator) if time_independent: self._ham_eff = ham_eff if profile: th.cuda.synchronize() t1 = timer() logging.info(f"The time taken for getting the effective Hamiltonian is {t1 - t0}s.") ## sparsify tensors if the dimension of the matrix is larger than 2000 if self.ns > 2000: identity = identity.to_sparse() ham_eff = ham_eff.to_sparse() # rho = rho.to_sparse() jump_operators = [jump_operator.to_sparse() for jump_operator in jump_operators] ## first part: (1-i*dt*H_eff)rho(1+i*dt*H_eff) rho_new = ABAd(identity - 1j * dt * ham_eff, rho) if profile: th.cuda.synchronize() t2 = timer() logging.info(f"The time taken for the unitary contribution to the density matrix evolution is {t2 - t1}s.") ## second part: dt * \sum_k L_k \rho L_k^\dagger for jump_operator in jump_operators: rho_new += ABAd(jump_operator, rho) * dt if rho_new.is_sparse: rho_new = rho_new.to_dense() if profile: th.cuda.synchronize() t3 = timer() logging.info(f"The time taken for the Lindblad contribution to the density matrix evolution is {t3 - t2}s.") ### Do not normalize here. Sometimes we may be evolving a general operator instead of density matrix ### # ## normalize if the trace of rho_new is too large. # rho_trace = trace(rho_new) # if th.abs(rho_trace.mean()-1) > 0.1: # print('normalize') # rho_new = rho_new / rho_trace[:, None, None] return rho_new
[docs] def normalize(self): rho_trace = trace(self.rho) self.rho = self.rho / rho_trace[:, None, None] return
[docs] def step(self, dt: float, set_buffer: bool = False, time_independent: bool = False, profile: bool = False): """ This function steps the system for a time step dt. """ if profile: t0 = timer() th.cuda.synchronize() hamiltonian = self.step_hamiltonian(dt, set_buffer) if profile: th.cuda.synchronize() t1 = timer() logging.info(f"The time taken for stepping the Hamiltonian is {t1 - t0}s.") if self._jumping_group_dict: jump_operator_list = self.step_jumping(dt, set_buffer) else: jump_operator_list = [] if profile: th.cuda.synchronize() t2 = timer() logging.info(f"The time taken for stepping the jump operators is {t2 - t1}s.") self.rho = self.step_rho(dt, hamiltonian, jump_operator_list, time_independent, profile=profile) if profile: th.cuda.synchronize() t3 = timer() logging.info(f"The time taken for stepping the density matrix is {t3 - t2}s.") return self.rho
[docs] def step_unitary(self, dt: float, set_buffer: bool = False): """ This function steps the system for a time step dt without considering the jump operators. """ hamiltonian = self.step_hamiltonian(dt, set_buffer) rho = self.rho evo_op = th.linalg.matrix_exp(-1j * dt * hamiltonian) rho_new = ABAd(evo_op, rho) self.rho = rho_new return self.rho
[docs] def observe(self, operator): """ This function observes the system with an operator. """ if isinstance(operator, OperatorGroup) is False: raise ValueError("The operator must be an OperatorGroup object. It should not be a plain array or tensor.") ops, _coefs = operator.sample(dt=0) ## sanitary check if _coefs.shape == (1,): coefs = th.ones(self.nb, dtype=_coefs.dtype, device=_coefs.device) * _coefs[0] elif _coefs.shape == (self.nb,): coefs = _coefs else: raise ValueError("The coefficients sampled from an operator group should be a 1D tensor of length equal to the batchsize.") ## no broadcasting if the operators is already batched if ops.shape == (self.nb, self.ns, self.ns): ops_batched = ops * coefs[:, None, None] ## broadcast if the operators is not already batched elif ops.shape == (self.ns, self.ns): ops_batched = ops[None, :, :] * coefs[:, None, None] return trace(th.matmul(ops_batched, self.rho)) / trace(self.rho)
[docs] class QubitLindbladSystem(LindbladSystem): r""" This class represents the states of n physical qubits as a open quantum system. The states are represented by a density matrix. The evolution of the system is governed by the Lindblad equation, which is a generalization of the Schrodinger equation for open quantum systems. Both the Hamiltonian and the jump operators in the Lindblad equation allows fluctuating coefficients. So technically, we are dealing with a stochastic master equation. The Lindblad equation is given as :math:`d\rho(t) / dt = -i [H(t), \rho(t)] + \sum_k L_k(t) \rho(t) L_k(t)^\dagger - 1/2 \{L_k(t)^\dagger L_k(t), \rho(t)\}` where :math:`H(t)` is the Hamiltonian, :math:`L_k(t)` is the jump operator. Note that the coefficients of the jump operators are absorbed into :math:`L_k(t)`. In this class, each component of the Hamiltonian :math:`H(t)`, and each jump operator :math:`L_k(t)`, is represented by a OperatorGroup object. Each OperatorGroup object contains a group of operators and the corresponding coefficients. The operators themselves are time-independent, like a Pauli operator. But the coefficients can be time-dependent stochastic processes. This corresponds to the case where the qubits are coupled to noisy environments such as inhomogeneous, fluctuating magnetic fields. """
[docs] def __init__(self, n_qubits: int, batchsize: int): self.nq = n_qubits super().__init__(2**n_qubits, batchsize) self.density_matrix = QubitDensityMatrix(n_qubits=self.nq, batchsize=batchsize)
[docs] def set_rho_by_config(self, config): if isinstance(config, th.Tensor): _c = config elif isinstance(config, np.ndarray): if config.shape != (self.nq,): raise ValueError("Config must have shape (#qubits).") if config.dtype != np.int64: raise ValueError("Config must be an integer array.") _c = th.tensor(config, dtype=th.int64) elif isinstance(config, list): if len(config) != self.nq: raise ValueError("Config must have length (#qubits).") if not all(isinstance(i, int) for i in config): raise ValueError("Config must be a list of integers.") _c = th.tensor(config, dtype=th.int64) else: raise ValueError("Config must be a 0/1 tensor, a 0/1 numpy array, or a list of 0/1 integers.") if not all(i in [0, 1] for i in _c): raise ValueError("Config must be a sequence of 0/1 integers. Example for 2-qubit system: [0, 1] means |01>.") self.density_matrix.set_rho_by_config(_c)
[docs] def rotate(self, direction: th.Tensor, angle: float, config=None, inplace: bool = True): """ Apply a rotation operator about the Cartesian axes in the Bloch Basis. """ rho_new = self.density_matrix.apply_unitary_rotation(self.rho, direction, angle, config) if inplace: self.rho = rho_new return self.rho else: return rho_new
[docs] def kraus_operate(self, kraus_operators: list[th.Tensor], config=None, inplace: bool = True): """ Apply a Kraus operation. """ rho_new = self.density_matrix.apply_kraus_operation(self.rho, kraus_operators, config) if inplace: self.rho = rho_new return self.rho else: return rho_new
[docs] class ParticleLindbladSystem(QubitLindbladSystem): """ This class represents the states of n physical qubits as a open quantum system. The quantum states include the two-level states of each qubit and the center-of-mass position and momentum of each qubit. We take a ring-polymer (number of beads = p) representation of the center-of-mass state for sampling the qubit distribution. The two-level states will be represented by a (p x 2^n x 2^n) density matrix. The center-of-mass position will be represented by a (p x 3n) matrix. For harmonic trapping with frequency omega, the number of beads needed for convergence can be estimated by (hbar omega / kT). """
[docs] def __init__(self, n_qubits: int, batchsize: int, particles: Particles): super().__init__(n_qubits, batchsize) self.particles = particles
[docs] def reset(self): """ Reset the system. In addition to the reset of the quantum system, the center-of-mass positions and momenta of the particles are also reset to arbitrary thermal states. """ for operator_group in self._hamiltonian_operator_group_dict.values(): operator_group.reset() for operator_group in self._jumping_group_dict.values(): operator_group.reset() self.particles.reset()
# self.step_particles(100) # TODO: remove this line
[docs] def step_particles(self, dt: float): """ This function steps the thermal motino of the particles for a time step dt. """ dt_thermal = self.particles.dt substeps = int(np.round(dt / dt_thermal, 0)) if (dt / dt_thermal) != substeps: raise ValueError("The time step dt of integrating Lindblad equation must be an integer multiple of the time step dt_thermal for integrating the thermal motion of particles.") for _ in range(substeps): self.particles.zero_forces() self.particles.modify_forces(self.particles.get_trapping_forces()) self.particles.step_langevin(record_traj=False) return
[docs] def step(self, dt: float, set_buffer: bool = False, profile: bool = False): """ This function steps the system for a time step dt. """ if profile: t0 = timer() self.step_particles(dt) if profile: t1 = timer() logging.info(f"The time taken for stepping the thermal motion of qubits is {t1 - t0}s.") hamiltonian = self.step_hamiltonian(dt, set_buffer=True) if self._jumping_group_dict: jump_operator_list = self.step_jumping(dt, set_buffer) else: jump_operator_list = [] self.rho = self.step_rho(dt, hamiltonian, jump_operator_list) if profile: t2 = timer() logging.info(f"The time taken for stepping the quantum states of qubits is {t2 - t1}s.") return self.rho