This file contains Particle
class definition and code governing the switching of the dynamical regimes
from __future__ import division
import numpy
from collections import defaultdict
from common import GRID, UNITS
from common.integrators import adams_moulton_solver
from common.utils import PicklableObject, trace_unhandled_exceptions
from particles import DustParticle, RadiationParticle, IntermediateParticle, NonEqParticle
from KAWANO.interpolation import dist_interp_values as distribution_interpolation
RADIATION
DUST
INTERMEDIATE
NONEQ
class REGIMES(dict):
RADIATION = RadiationParticle
DUST = DustParticle
INTERMEDIATE = IntermediateParticle
NONEQ = NonEqParticle
Master-class for particle species. Realized as finite state machine that switches to different regime when temperature becomes comparable to the particle mass or drops below particle decoupling_temperature
class Particle(PicklableObject):
_saveable_fields = [
'name', 'symbol',
'mass', 'decoupling_temperature',
'dof', 'eta',
'data',
'_distribution',
'collision_integral', 'collision_integrals',
'T', 'aT', 'params',
'grid'
]
_defaults = {
'mass': 0 * UNITS.eV,
'decoupling_temperature': 0 * UNITS.eV,
'name': 'Particle',
'symbol': 'p',
'dof': 2,
'statistics': STATISTICS.FERMION,
'params': None,
'grid': GRID
}
def __init__(self, **kwargs):
settings = dict(self._defaults)
settings.update(kwargs)
for key, value in settings.items():
setattr(self, key, value)
self.eta = 1. if self.statistics == STATISTICS.FERMION else -1.
self.set_grid(self.grid)
self.calculate_collision_integral = numpy.vectorize(self.calculate_collision_integral)
self.collision_integrals = []
self.data = {
'distribution': [self._distribution],
'collision_integral': []
}
if self.params:
self.set_params(self.params)
String-like representation of particle species it's regime and parameters
def __str__(self):
return "%s (%s, %s)\nn = %s, rho = %s\n" % (
self.name,
"eq" if self.in_equilibrium else "non-eq",
self.regime.name,
self.density() / UNITS.eV**3,
self.energy_density() / UNITS.eV**4
) + ("-" * 80)
Set internal parameters using arguments or default values
def set_params(self, params):
self.params = params
self.T = params.T
self.aT = params.aT
self.update()
self.init_distribution()
For equilibrium particles distribution function is by definition given by its statistics and will not be used until species comes into non-equilibrium regime
self._distribution = numpy.zeros(self.grid.MOMENTUM_SAMPLES, dtype=numpy.float_)
Particle collision integral is not effective in the equilibrium as well
self.collision_integral = numpy.zeros(self.grid.MOMENTUM_SAMPLES, dtype=numpy.float_)
Update the particle parameters according to the new state of the system
def update(self, force_print=False):
oldregime = self.regime
oldeq = self.in_equilibrium
Update particle internal params only while it is in equilibrium
if self.in_equilibrium or self.in_equilibrium != oldeq:
Particle species has temperature only when it is in equilibrium
self.T = self.params.T
self.aT = self.params.aT
Particle decouples, have to init the distribution function array for kinetics
self.init_distribution()
Apply collision integral to modify the distribution function
def update_distribution(self):
if self.in_equilibrium:
return
self._distribution += self.collision_integral * self.params.dy
self._distribution = numpy.maximum(self._distribution, 0)
Clear collision integrands for the next computation step
self.collision_integrals = []
self.data['collision_integral'].append(self.collision_integral)
self.data['distribution'].append(self._distribution)
def integrate_collisions(self):
return numpy.vectorize(self.calculate_collision_integral)(self.grid.TEMPLATE)
@trace_unhandled_exceptions
def calculate_collision_integral(self, p0):
if not self.collision_integrals:
return 0
As = []
Bs = []
for integral in self.collision_integrals:
As.append(integral.integrate(p0, integral.F_1))
Bs.append(integral.integrate(p0, integral.F_f))
order = min(len(self.data['collision_integral']) + 1, 5)
index = numpy.searchsorted(self.grid.TEMPLATE, p0)
fs = [i[index] for i in self.data['collision_integral'][-order+1:]]
H = self.params.H
if p0 == 0:
A = sum(As)
B = sum(Bs)
feq = self.equilibrium_distribution(p0)
print "{} p0 = {:.3e} A = {:.3e} t = {:.3e} d = {:.3e}".format(
self.symbol, p0 / UNITS.MeV, A * UNITS.s, -1. / B / UNITS.s, -(A/B) / feq
)
prediction = adams_moulton_solver(y=self.distribution(p0), fs=fs,
A=sum(As) / H, B=sum(Bs) / H,
h=self.params.dy, order=order)
total_integral = (prediction - self.distribution(p0)) / self.params.dy
return total_integral
For ultra-relativistic particles the mass is effectively 0
. This implies that all computed numerically values can be as well obtained analytically: energy density, pressure, etc.
Let particle mass be equal and regime factor equal . As soon as the temperature of the system drops to the value about , particle should be switched to the computation regime where its mass is also considered: REGIMES.INTERMEDIATE
. When drops down even further to the value , particle species can be treated as REGIMES.DUST
with a Boltzmann distribution function.
@property
def regime(self):
regime_factor = 1e1
if not self.in_equilibrium:
return REGIMES.NONEQ
if self.T > self.mass * regime_factor:
regime = REGIMES.RADIATION
elif self.T * regime_factor < self.mass:
regime = REGIMES.DUST
else:
regime = REGIMES.INTERMEDIATE
return regime
TODO: probably just remove these and always use particle.regime.something()
?
def density(self):
return self.regime.density(self)
Two possible strategies for the distribution function interpolation are implemented:
While linear interpolation is exceptionally simple, the exponential interpolation gives exact results for the equilibrium functions - thus collision integral for them almost exactly cancels out unlike the case of linear interpolation.
Distribution functions are continuously evaluated each during the simulation, so to avoid excessive evaluation of the costly logarithms and exponentials, this function first checks if the current momenta value coincides with any of the grid points.
def distribution(self, p):
p = abs(p)
if self.in_equilibrium or p > self.grid.MAX_MOMENTUM:
return self.equilibrium_distribution(p)
return distribution_interpolation(
p,
self.grid.TEMPLATE,
self._distribution
)
index = numpy.searchsorted(self.grid.TEMPLATE, p)
if index >= self.grid.MOMENTUM_SAMPLES - 1:
return self._distribution[-1]
if p == self.grid.TEMPLATE[index]:
return self._distribution[index]
Determine the closest grid points
i_low = index
p_low = self.grid.TEMPLATE[i_low]
i_high = index + 1
p_high = self.grid.TEMPLATE[i_high]
E_p = self.conformal_energy(p)
E_low = self.conformal_energy(p_low)
E_high = self.conformal_energy(p_high)
g_high = numpy.log(1 / self._distribution[i_high] - 1)
g_low = numpy.log(1 / self._distribution[i_low] - 1)
g = ((E_p - E_low) * g_high + (E_high - E_p) * g_low) / (E_high - E_low)
return 1. / (numpy.exp(g) + self.eta)
Equilibrium distribution that corresponds to the particle internal temperature
def equilibrium_distribution(self, y=None, aT=None):
if aT is None:
aT = self.aT
if y is None:
return self.equilibrium_distribution_function(
numpy.vectorize(self.conformal_energy)(self.grid.TEMPLATE) / aT
)
else:
return self.equilibrium_distribution_function(self.conformal_energy(y) / aT)
@property
def equilibrium_distribution_function(self):
if self.eta == -1:
return STATISTICS.Bose
else:
return STATISTICS.Fermi
def init_distribution(self):
self._distribution = self.equilibrium_distribution()
return self._distribution
Simple check for equilibrium
@property
def in_equilibrium(self):
return self.T > self.decoupling_temperature
def conformal_energy(self, y):
if self.mass > 0:
return numpy.sqrt(y**2 + self.conformal_mass**2)
else:
return abs(y)