# -*- coding: utf-8 -*-
import numpy
from common import integrators
from common.utils import PicklableObject
def F_A(self, p, skip_index=None):
temp = -1.
for i, particle in enumerate(self.reaction):
if skip_index is None or i != skip_index:
if particle.side == -1:
temp *= particle.specie.distribution(p[i])
else:
temp *= 1. - particle.specie.eta * particle.specie.distribution(p[i])
return temp
Backward reaction distribution functional term
param skip_index
Particle to skip in the expression
def F_B(self, p, skip_index=None):
temp = 1.
for i, particle in enumerate(self.reaction):
if skip_index is None or i != skip_index:
if particle.side == 1:
temp *= particle.specie.distribution(p[i])
else:
temp *= 1. - particle.specie.eta * particle.specie.distribution(p[i])
return temp
in means that the distribution function was omitted in the corresponding expression. represents the value of the particle .
Variable part of the distribution functional
def F_f(self, p):
return (
self.F_A(p=p, skip_index=0) - self.particle.eta * self.F_B(p=p, skip_index=0)
)
Representation of the concrete collision integral for a specific particle Integral.reaction[0]
class BoltzmannIntegral(PicklableObject, DistributionFunctional):
_saveable_fields = [
'particle', 'reaction', 'decoupling_temperature', 'constant', 'Ms', 'grids',
]
reaction = None # All particles involved
If the Boltzmann integral is obtained as a crossing of some other process, the mass of the crossed fermion in the matrix element has to change sign.
For example, compare the matrix elements of reactions
In particular, this has something to do with the K2
term in the FourParticleIntegral
.
Temperature when the typical interaction time exceeds the Hubble expansion time
decoupling_temperature = 0.
constant = 0.
Update self with configuration kwargs
, construct particles list and energy conservation law of the integral.
def __init__(self, **kwargs):
for key in kwargs:
setattr(self, key, kwargs[key])
if not self.Ms:
self.Ms = []
self.integrand = numpy.vectorize(self.integrand, otypes=[numpy.float_])
String-like representation of the integral. Corresponds to the first particle
def __str__(self):
return (
" + ".join([p.specie.symbol + ('\'' if p.antiparticle else '')
for p in self.reaction if p.side == -1])
+ " ⟶ "
+ " + ".join([p.specie.symbol + ('\'' if p.antiparticle else '')
for p in self.reaction if p.side == 1])
+ "\t({})".format(', '.join([str(M) for M in self.Ms]))
)
Initialize collision integral constants and save them to the first involved particle
def initialize(self):
raise NotImplementedError()
Helper procedure that caches conformal energies and masses of the reaction
def calculate_kinematics(self, p):
particle_count = len(self.reaction)
p = (p + [0.]*particle_count)[:particle_count]
E = []
m = []
for i, particle in enumerate(self.reaction):
E.append(particle.specie.conformal_energy(p[i]))
m.append(particle.specie.conformal_mass)
Parameters of one particle can be inferred from the energy conservation law
E[particle_count-1] = self.reaction[particle_count-1].side \
* sum([-self.reaction[i].side * E[i] for i in range(particle_count-1)])
p[particle_count-1] = numpy.sqrt(numpy.abs(E[particle_count-1]**2 - m[particle_count-1]**2))
return p, E, m
def forward_integral(p):
return numpy.vectorize(lambda p0: p0**2 / (2 * numpy.pi)**3
* self.integrate(p0, self.F_A)[0])(p)
def backward_integral(p):
return numpy.vectorize(lambda p0: p0**2 / (2 * numpy.pi)**3
* self.integrate(p0, self.F_B)[0])(p)
grid = self.particle.grid
forward_rate, _ = integrators.integrate_1D(forward_integral,
(grid.MIN_MOMENTUM, grid.MAX_MOMENTUM))
backward_rate, _ = integrators.integrate_1D(backward_integral,
(grid.MIN_MOMENTUM, grid.MAX_MOMENTUM))
return -forward_rate, backward_rate
Coarse integration region based on the self.particle.grid
points.
Assumes that integration region is connected.
def bounds(self, p0):
points = []
for p1 in self.particle.grid.TEMPLATE:
points.append((p1, self.lower_bound(p0, p1),))
points.append((p1, self.upper_bound(p0, p1),))
return points
Find the first self.particle.grid
point in the integration region
def lower_bound(self, p0, p1):
index = 0
while (index < self.particle.grid.MOMENTUM_SAMPLES
and not self.in_bounds([p0, p1, self.particle.grid.TEMPLATE[index]])):
index += 1
if index == self.particle.grid.MOMENTUM_SAMPLES:
return self.particle.grid.MIN_MOMENTUM
return self.particle.grid.TEMPLATE[index]
Find the last self.particle.grid
point in the integration region
def upper_bound(self, p0, p1):
index = int(
(min(p0 + p1, self.particle.grid.MAX_MOMENTUM) - self.particle.grid.MIN_MOMENTUM)
/ self.particle.grid.MOMENTUM_STEP
)
while index >= 0 and not self.in_bounds([p0, p1, self.particle.grid.TEMPLATE[index]]):
index -= 1
if index == -1:
return self.particle.grid.MIN_MOMENTUM
return self.particle.grid.TEMPLATE[index]