r"""
A module for calculating Resonant Relaxation diffusion coefficients
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing as mp
from ast import literal_eval as make_dict
from builtins import range
from builtins import super
from builtins import zip
from collections import namedtuple
from functools import lru_cache
import h5py
import numpy as np
# noinspection PyPackageRequirements
import progressbar
import vegas
from numba import jit
from numpy import mod
from numpy import sqrt
from scipy import special
from . import Cusp
Res = namedtuple('Res', ('l', 'n', 'np', 'neval'))
[docs]class DRR(Cusp):
r"""
Resonant relaxation diffusion coefficient (DRR).
Assuming a power law stellar cusp around a massive black hole (MBH).
The cusp is assumed to have an isotropic distribution function
:math:`f(E) \propto |E|^p` corresponding ro a stellar density
:math:`n(r) \propto r^{-\gamma}` where :math:`\gamma = \tfrac{3}{2} + p`
Parameters
----------
sma : float
The semi-mahor axis along which DRR will be computed
gamma : float, int, optional
The slope of the density profile.
Default: 7/4 (Bahcall wolf cusp)
mbh_mass : float, int, optional
Mass of the MBH [solar mass].
Default: :math:`4.3 \times 10^6` (Milky Way MBH)
star_mass : float, int, optional
Mass of individual stars [solar mass].
Default: 1.0
rh : float, int, optional
Radius of influence [pc].
Define as the radius in which the velocity
dispersion of the stellar cusp :math:`\sigma` is equal to the
Keplerian velocity due to the MBH
:math:`\sigma(r_\mathrm{h})^2 = G M_{\bullet} / r_\mathrm{h}`.
Default: 2.0
"""
def __init__(self, sma, gamma=1.75, mbh_mass=4e6, star_mass=1.0,
j_grid_size=128, rh=2.0, seed=None):
super().__init__(gamma=gamma,
mbh_mass=mbh_mass,
star_mass=star_mass,
rh=rh)
self.sma = sma
self.gr_factor = 1.0
self.j = np.logspace(np.log10(self.jlc(self.sma)), 0,
j_grid_size + 1)[:-1]
if seed is None:
self.seed = np.random.randint(int(1e8))
else:
self.seed = seed
np.random.seed(self.seed)
self.seeds = np.random.randint(int(1e8), size=self.j.size)
@property
def omega(self):
try:
return self._omega
except AttributeError:
self._omega = self.nu_p(self.sma, self.j)
return self._omega
@omega.setter
def omega(self, omega):
self._omega = omega
@lru_cache()
def _res_intrp(self, ratio):
return ResInterp(self, self.omega * ratio, self.gr_factor)
def _integrand(self, j, sma_p, j_p, lnnp, true_anomaly):
d_nu_p = abs(self.d_nu_p(sma_p, j_p))
a2_int = a2_integrand(self.sma, j, sma_p, j_p, lnnp, true_anomaly)
return j_p * a2_int / d_nu_p
[docs] def __call__(self, l_max, neval=1e3, threads=1,
progress_bar=True, seed=None):
r"""
Returns the RR diffusion coefficient :math:`D_{JJ}/J_{\mathrm{c}}^2` [1/yr].
Parameters
----------
l_max : int
Maximal order of spherical harmonics to compute
neval : int
The maximum number of integrand evaluations
in each iteration of the `vegas` algorithm.
Default: 1000
threads : int
Number of parallel threads to use.
Default: 1 (no parallelization)
progress_bar : bool
Show progress bar.
Default: ``True``
"""
# Get all non-vanishing resonances up to l=l_max
lnnp = [(l, n, n_p)
for l in range(1, l_max + 1)
for n in range(1, l + 1)
for n_p in range(-l - 1, l + 1)
if (not mod(l + n, 2) + mod(l + n_p, 2)) and n_p != 0
]
if progress_bar:
pbar = progressbar.ProgressBar()(range(len(lnnp)))
else:
pbar = range(len(lnnp))
for i in pbar:
self._drr_lnnp(*lnnp[i], neval=neval, threads=threads)
drr = np.vstack([self._drr_lnnp(*lnnp, neval=neval, threads=threads)[0]
for lnnp in lnnp])
# Remove non-physical values
drr[drr < 0] = 0
# sum all resonances
drr = drr.sum(axis=0)
drr_err = sqrt(sum(self._drr_lnnp(*lnnp, neval=neval,
threads=threads)[-1] ** 2
for lnnp in lnnp))
return drr, drr_err
def _drr_lnnp(self, l, n, n_p, neval=1e3, threads=1):
r"""
Calculates the :math:`(l,n,n_p)` term of the diffusion coefficient
"""
neval = int(neval)
# Pre compute the resonance interpolation grid
try:
drr, drr_err = self._drr_lnnp_cache[Res(l, n, n_p, neval)]
return drr, drr_err
except AttributeError:
self._drr_lnnp_cache = {}
except KeyError:
pass
ratio = n / n_p
self._res_intrp(ratio)
if threads > 1:
queue = mp.Queue()
def parallel_drr(pos, j_s, omega_s, seed_s):
_results = [
self._drr(j, omega, (l, n, n_p), neval=neval, seed=seed)
for j, omega, seed in zip(j_s, omega_s, seed_s)
]
_drr = [result[0] for result in _results]
_drr_err = [result[-1] for result in _results]
queue.put((pos, (_drr, _drr_err)))
js = [self.j[i::threads] for i in range(threads)]
omegas = [self.omega[i::threads] for i in range(threads)]
seeds = [self.seeds[i::threads] for i in range(threads)]
processes = [
mp.Process(target=parallel_drr, args=(i, j_s, omega_s, seed_s))
for i, (j_s, omega_s, seed_s) in
enumerate(zip(js, omegas, seeds))]
for process in processes:
process.start()
for process in processes:
process.join()
drr, drr_err = zip(*[(drr, drr_err)
for (i, (drr, drr_err)) in
sorted(map(queue.get, processes))])
drr, drr_err = (np.concatenate(list(zip(*drr))),
np.concatenate(list(zip(*drr_err))))
else:
results = [self._drr(j, omega, (l, n, n_p), neval=neval, seed=seed)
for j, omega, seed in
zip(self.j, self.omega, self.seeds)]
drr, drr_err = np.array(list(zip(*results)))
self._drr_lnnp_cache[Res(l, n, n_p, neval)] = (drr, drr_err)
return drr, drr_err
def _drr(self, j, omega, lnnp, neval=1e3, seed=None):
l, n, n_p = lnnp
ratio = n / n_p
get_jf = self._res_intrp(ratio)(omega * ratio)
if seed is not None:
# make a unique seed
np.random.seed([seed, l, n, 2 * l + n_p])
pi = np.pi
@vegas.batchintegrand
def c_lnnp(x):
true_anomaly = x[:, :-1].T * pi
sma_f = self.inverse_cumulative_a(x[:, -1])
jf = get_jf(sma_f)
res = np.zeros(x.shape[0], float)
ix = np.where(jf > 0)[0]
if len(ix) > 0:
res[ix] = self._integrand(j, sma_f[ix], jf[ix], lnnp,
true_anomaly[:, ix])
return res
self.c_lnnp = c_lnnp
integ = vegas.Integrator(5 * [[0, 1]])
if get_jf is None:
result = np.zeros(2)
else:
result = np.array(integrate(c_lnnp, integ, neval))
return result * (8 * pi * n ** 2 / abs(n_p) *
_a2_norm_factor(*lnnp) *
self.nu_r(self.sma) ** 2 / self.mass_ratio ** 2 *
self.total_number_of_stars)
@property
def l_max(self):
try:
return max([key.l
for key in self._drr_lnnp_cache.keys()])
except AttributeError:
pass
@property
def neval(self):
try:
return max([key.neval
for key in self._drr_lnnp_cache.keys()])
except AttributeError:
pass
[docs] def save(self, file_name):
r"""
Save the current instance to an hdf5 file.
Example
-------
>>> drr = DRR(0.1, j_grid_size=32)
>>> d, d_err = drr(l_max=3)
>>> drr.save('example.hdf5')
>>> drr = DRR.from_file('example.hdf5')
>>> d, d_err = drr(l_max=drr.l_max, neval=drr.neval)
"""
with h5py.File(file_name, 'w') as h5:
drr_lnnp_cache = h5.create_group("_drr_lnnp_cache")
for key, value in self._drr_lnnp_cache.items():
drr_lnnp_cache[str(dict(key._asdict()))] = value
for key, value in self.__dict__.items():
try:
h5[key] = value
except TypeError:
pass
def _read(self, file_name):
r"""
Read the cached data from an hdf5 file
"""
with h5py.File(file_name, 'r') as h5:
for key, value in h5['_drr_lnnp_cache'].items():
try:
self._drr_lnnp_cache[Res(**make_dict(key))] = value.value
except AttributeError:
self._drr_lnnp_cache = {Res(**make_dict(key)): value.value}
for key, value in h5.items():
if key != '_drr_lnnp_cache':
setattr(self, key, value.value)
[docs] @classmethod
def from_file(cls, file_name):
r"""
Load from file and return an instance
Example
-------
>>> drr = DRR(0.1, j_grid_size=32)
>>> d, d_err = drr(l_max=3)
>>> drr.save('example.hdf5')
>>> drr = DRR.from_file('example.hdf5')
>>> d, d_err = drr(l_max=drr.l_max, neval=drr.neval)
"""
drr = cls(1.0)
drr._read(file_name)
return drr
def integrate(func, integ, neval=1e4):
integ(func, nitn=10, neval=neval)
result = integ(func, nitn=10, neval=neval)
try:
res, err = np.array([[r.val, np.sqrt(r.var)] for r in result]).T
except TypeError:
res, err = result.val, np.sqrt(result.var)
return res, err
@jit(nopython=True)
def a2_integrand(sma, j, sma_p, j_p, lnnp, true_anomaly):
r"""
Returns the :math:`|a_{\ell n n^{\prime}}|^{2}` integrand to use in the MC integration
"""
l, n, n_p = lnnp
cnnp = np.cos(true_anomaly.T * np.array([n, n, n_p, n_p])).T
cnnp = cnnp[0] * cnnp[1] * cnnp[2] * cnnp[3]
j2 = j ** 2
j_p2 = j_p ** 2
ecc, eccp = np.sqrt(1 - j2), np.sqrt(1 - j_p2)
r12 = (sma * j2 / (1 - ecc * np.cos(true_anomaly[:2])))
rp12 = (sma_p * j_p2 / (1 - eccp * np.cos(true_anomaly[2:])))
r_1, r_2 = r12[0], r12[-1]
rp1, rp2 = rp12[0], rp12[-1]
return (cnnp / j2 / j_p2 / sma ** 2 / sma_p ** 4 *
(np.minimum(r_1, rp1) * np.minimum(r_2, rp2)) ** (2 * l + 1) /
(r_1 * r_2 * rp1 * rp2) ** (l - 1))
@lru_cache()
def _a2_norm_factor(l, n, n_p):
r"""
Normalization factor for :math:`|a_{\ell n n^{prime}} |^{2}`
"""
return (abs(special.sph_harm(n, l, 0, np.pi / 2)) ** 2 *
abs(special.sph_harm(n_p, l, 0, np.pi / 2)) ** 2 *
(4 * np.pi / (2 * l + 1)) ** 2) / (2 * l + 1)
class ResInterp(object):
r"""
Generates an interpolation function :math:`j^{\prime} (a^{\prime})` where :math:`j^{\prime}`
satisfies the resonant condition: :math:`\nu^{\prime} (a^{\prime}, j^{\prime}) == \omega`
Parameters
----------
cusp : a `Cusp` instance
omega : ndarray
The resonance frequencies
Example
-------
>>> from scrrpy import Cusp
>>> from scrrpy.drr import ResInterp
>>> import numpy as np
>>> cusp = Cusp(gamma=1.75, mbh_mass=4e6, rh=2.0)
>>> j = np.logspace(-2, 0, 11)[:-1]
>>> omega = cusp.nu_p(a=0.1, j=j)
>>> resint = ResInterp(cusp, omega)
>>> jf = np.array([resint(o)(0.1) for o in omega])
>>> abs(1 - jf/j).max() < 1e-4
True
"""
def __init__(self, cusp, omega, gr_factor=1.0):
r"""
"""
self._cusp = cusp
self.size = 1000
self._cusp.gr_factor = gr_factor
self.omega = omega
self._af = np.logspace(np.log10(16 * self._cusp.rg),
np.log10(self._cusp.rh),
1001)[1:]
self.x = np.logspace(-5, 0, self.size + 1)[:-1][::-1]
self.j_grid = self.make_grid()
def __call__(self, omega):
log_ap, jp = self.j_grid[omega]
if jp.size == 0:
return
return lambda af: np.interp(np.log(af), log_ap, jp, left=0, right=0)
def make_grid(self):
j_grid = []
for af in self._af:
jlc = self._cusp.jlc(af)
j = (1 - jlc) * self.x + jlc
nup = self._cusp.nu_p(af, j)
j_grid.append(np.exp(np.interp(self.omega, nup, np.log(j),
left=-np.inf, right=-np.inf)))
j_grid = np.array(list(zip(*j_grid)))
return dict((omega, (np.log(self._af[j > 0]), j[j > 0]))
for omega, j in zip(self.omega, j_grid))