from copy import deepcopy
import sys
import h5py
import numpy as np
import math
from sympy import comp
from waveformtools import dataIO
from waveformtools.dataIO import (
construct_mode_list,
get_iteration_numbers_from_keys,
sort_keys,
)
from spectral.spherical.grids import UniformGrid
from spectral.spherical.swsh import Yslm_vec
from waveformtools.waveformtools import interp_resam_wfs, message
from pathlib import Path
from waveformtools.integrate import fixed_frequency_integrator
from waveformtools.waveformtools import (
get_starting_angular_frequency as sang_f,
)
from waveformtools.single_mode import SingleMode
from scipy.interpolate import InterpolatedUnivariateSpline
from waveformtools.BMS import compute_linear_momentum_contribution_from_news, compute_impulse_from_force
[docs]
class SphericalArray:
"""A class for handling waveforms on a sphere.
Attributes
----------
label: str
The label of the waveform data.
time_axis: 1d array
The time axis of the data.
frequency_axis: 1d array
The frequency axis if the data
is represented in frequency domain.
Grid: UniformGrid
An instance of the `UniformGrid` class.
data_len: int
The length of the data along the time axis.
Methods
-------
delta_t:
Fetch the time stepping `delta_t`.
to_modes_array:
Find the waveform expressed in the
SWSH basis.
boost:
Boost the waveform.
supertranslate:
Supertranslate the waveform.
Notes
-----
The modes array has the axis in the form
(flattened_modes_1d, *shape of extra modes, )
"""
def __init__(
self,
label=None,
time_axis=None,
frequency_axis=None,
data=None,
data_len=None,
data_dir=None,
file_name=None,
Grid=None,
spin_weight=-2,
ell_max=8,
):
self._label = label
self._data_len = data_len
# self.base_dir = base_dir # The base directory containing the
self._data = data
self._file_name = file_name
self._data_dir = data_dir
self._time_axis = time_axis
self._frequency_axis = frequency_axis
self._Grid = Grid
self._spin_weight = spin_weight
self._ell_max = ell_max
@property
def label(self):
return self._label
@property
def ell_max(self):
return self._ell_max
@property
def time_axis(self):
return self._time_axis
@property
def frequency_axis(self):
return self._frequency_axis
@property
def data(self):
return self._data
@property
def data_dir(self):
return self._data_dir
@property
def file_name(self):
return self._file_name
@property
def spin_weight(self):
return self._spin_weight
@property
def Grid(self):
return self._Grid
[docs]
def delta_t(self, value=None):
"""Sets and returns the value of time stepping :math:`dt`.
Parameters
----------
value: float, optional
The value of :math:`dt`
to set to the attribute.
Returns
-------
delta_t: float
Sets the attribute.
"""
if not self._delta_t:
if not value:
try:
self._delta_t = self.time_axis[1] - self.time_axis[0]
except Exception as ex:
message(
"Please input the value of `delta_t`"
"or supply the `time_axis` to the waveform.",
ex,
)
else:
self._delta_t = value
return self._delta_t
@property
def data_len(self):
"""Returns the length of the time/frequency axis.
Returns
-------
data_len: int
The length of the time/frequency axis.
"""
if self._data_len is None:
try:
data_len = len(self.time_axis)
except Exception as ex:
data_len = len(self.frequency_axis)
message(ex)
self._data_len = data_len
return self._data_len
[docs]
def to_modes_array(self, Grid=None, spin_weight=None, ell_max=8):
"""Decompose a given spherical array function on a sphere
into Spin Weighted Spherical Harmonic modes.
Parameters
----------
spin_weight: int, optional
The spin weight of the waveform.
It defaults to -2 for a
gravitational waveform.
ell_max: int, optional
The maximum value of the :math:`\\ell`
polar quantum number. Defaults to 8.
Grid: class instance
The class instance that contains
the properties of the spherical grid.
Returns
-------
waveforms_modes: modes_array
An instance of the `modes_array` class
containing the decomposed modes.
Notes
-----
1. Assumes that the sphere on which
this decomposition is carried out is so far out
that the coordinate system is spherical polar
on a round sphere.
2. Assumes that the poper area is the same
as its co-ordinate area.
3. Ensure that the label of the
input spherical array indicates whether
it is a time domain data or frequency domain data.
"""
if self.Grid is None:
if Grid is None:
message(
"Please specify the grid specs. Assuming a default"
" uniform grid.",
message_verbosity=1,
)
Grid = UniformGrid()
self._Grid = Grid
if self._ell_max is None:
if ell_max is None:
try:
self._ell_max = Grid.L + 1
except Exception as ex:
message(
ex,
"\n ell_max not provided. Assuming 8",
message_verbosity=1,
)
self._ell_max = 8
else:
try:
ell_max_grid = Grid.L + 1
assert ell_max <= ell_max_grid, (
"This GL grid"
"can only decompose upto"
f"ell_max of {ell_max_grid}"
)
self._ell_max = ell_max
except Exception as ex:
message(
ex,
"No ell_max constraints" "due to uniform grid",
message_verbosity=3,
)
if spin_weight is None:
if self.spin_weight is None:
message("Please specify spin weight. Assuming 2")
spin_weight = 2
self.spin_weight = spin_weight
else:
spin_weight = self.spin_weight
spin_weight = abs(spin_weight)
# Compute the meshgrid for theta and phi.
theta, phi = self.Grid.meshgrid
# Create a modes array object
# Create a modes list.
modes_list = construct_mode_list(ell_max, spin_weight=spin_weight)
if not self.label:
label = "decomposed_time_domain"
else:
label = self.label
# Create a mode array for the decomposed_waveform
waveform_modes = modes_array(
label=label, ell_max=ell_max, spin_weight=spin_weight
)
# Inherit the time or frequency axis.
if "time" in label:
# axis = self.time_axis
waveform_modes.time_axis = self.time_axis
else:
# axis = self.frequency_axis
waveform_modes.frequency_axis = self.frequency_axis
# Create the modes_array
waveform_modes.create_modes_array(
ell_max=ell_max, data_len=self.data_len
)
waveform_modes.modes_list = modes_list
# The area element on the sphere
# Compute the meshgrid for theta and phi.
theta, phi = self.Grid.meshgrid
# sqrt_met_det = np.sin(theta)
# sqrt_met_det = np.sqrt(np.power(np.sin(theta), 2))
# darea = sqrt_met_det * Grid.dtheta * Grid.dphi
modes_list = [item for item in modes_list if item[0] >= spin_weight]
from waveformtools.integrate import TwoDIntegral
for mode in modes_list:
ell, all_emms = mode
for emm in all_emms:
# m value.
# Spin weighted spherical harmonic function at (theta, phi)
Ybasis_fun = np.conj(
Yslm_vec(
spin_weight,
ell=ell,
emm=emm,
theta_grid=theta,
phi_grid=phi,
)
)
# Ydarea = Ybasis_fun * darea
# print(Ydarea.shape)
# print(full_integrand)
# Using quad
# multipole_ell_emm = np.tensordot(
# self.data, Ydarea, axes=((0, 1), (0, 1))
# )
# print("Shape", Ybasis_fun.shape, self.data.shape)
# integrand = np.tensordot(self.data,
# Ybasis_fun, axes=((0, 1), (0, 1)))
integrand = np.transpose(
np.transpose(self.data, (2, 0, 1)) * Ybasis_fun, (1, 2, 0)
)
# integrand = self.data * Ybasis_fun
multipole_ell_emm = TwoDIntegral(integrand, self.Grid)
# print(f'l{ell}m{emm}', multipole_ell_emm)
waveform_modes.set_mode_data(
ell=ell, emm=emm, data=multipole_ell_emm
)
return waveform_modes
[docs]
def boost(self, conformal_factor, Grid=None):
"""Boost the waveform given the unboosted
waveform and the boost conformal factor.
Parameters
----------
self: spherical_array
A class instance of `spherical array`.
conformal_factor: 2d array
The conformal factor for the
Lorentz transformation.
It may be a single floating point number
or an array on a spherical grid. The array
will be of dimensions [ntheta, nphi]
Grid: class instance
The class instance that contains
the properties of the spherical grid.
Returns
-------
boosted_waveform: sp_array
The class instance `sp_array` that
contains the boosted waveform.
"""
from waveformtools.waveforms import spherical_array
if Grid is None:
Grid = self.Grid
# Compute the boosted waveform
# on the spherical grid
# on all the elements.
boosted_waveform_data = (
np.transpose(self.data, (2, 0, 1)) * conformal_factor
)
# boosted_waveform_data = np.multiply(self.data,
# conformal_factor[:, :, None])
# boosted_waveform_data = boosted_waveform_item
# boosted_waveform_data = np.array([np.transpose(item)
# *conformal_factor
# for item in np.transpose(self.data)])
# Construct a 2d waveform array object
boosted_waveform = spherical_array(
Grid=Grid,
data=np.transpose(np.array(boosted_waveform_data), (1, 2, 0)),
)
# Assign other attributes.
boosted_waveform.label = "boosted " + self.label
boosted_waveform.time_axis = self.time_axis
return boosted_waveform
[docs]
def supertranslate(self, supertransl_alpha_sp, order=1):
"""Supertranslate the :math:`\\Psi_{4\\ell m}` waveform modes,
given the, the supertranslation parameter and the order.
Parameters
----------
supertransl_alpha_modes: modes_array
The modes_array containing the
supertranslation mode coefficients.
Grid: class instance
The class instance that contains
the properties of the spherical grid
using which the computations are
carried out.
order: int
The number of terms to use to
approximate the supertranslation.
Returns
-------
Psi4_supertransl: modes_array
A class instance containg the
modes of the supertranslated
waveform:math:`\\Psi_4`.
"""
# Create a spherical_array to hold the supertranslated waveform
Psi4_supertransl_sp = spherical_array(
Grid=self.Grid,
label="{} -> supertranslated time".format(self.label),
)
delta_t = float(self.delta_t())
# Set the data.
data = 0
# data = self.data
# Psi4_supertransl_sp.data = self.data
# delta = 0
# count = 0
from waveformtools.differentiate import differentiate5_vec_numba
for index in range(order):
# print(f'{index} loop')
dPsidu = self.data
for dorder in range(index + 1):
# print(f'differentiating {dorder+1} times')
dPsidu = differentiate5_vec_numba(dPsidu, delta_t)
message("Incrementing...")
# delta = np.power(supertransl_alpha_sp.data, index+1)
# * dPsidu / np.math.factorial(index+1)
# print(delta/self.data)
data += (
np.power(supertransl_alpha_sp.data, index + 1)
* dPsidu
/ math.factorial(index + 1)
) # delta
data += self.data
message("Equal to original waveform?", (data == self.data).all())
Psi4_supertransl_sp.data = data
Psi4_supertransl_sp.time_axis = self.time_axis
message("Done.")
return Psi4_supertransl_sp
[docs]
def load_qlm_data(
self, data_dir=None, Grid=None, bh=0, variable="sigma"
):
"""Load the 2D shear data from h5 files.
Parameters
----------
file_name: str
The name of the file containing data.
data_dir: str
The name of the directory containing data.
Grid: class instance
An instance of the Grid class.
bh: int
The black hole number (0, 1 or 2)
"""
if data_dir is None:
if self.data_dir is None:
print("Please supply the data directory!")
else:
data_dir = self.data_dir
else:
if self.data_dir is None:
self.data_dir = data_dir
if Grid is None:
if self.Grid is None:
message("Please supply the grid spec!")
else:
Grid = self.Grid
else:
if self.Grid is None:
self.Grid = Grid
# get the full path.
file_name = f"qlm_{variable}[{bh}].xy.h5"
full_path = self.data_dir + file_name
# cflag = 0
nghosts = Grid.nghosts
ntheta = Grid.ntheta
nphi = Grid.nphi
# Open the modes file.
with h5py.File(full_path, "r") as wfile:
# Get all the mode keys.
modes_keys_list = list(wfile.keys())
# modes_keys_list = sorted(modes_keys_list)
# Get the mode keys containing the data.
modes_keys_list = [
item for item in modes_keys_list if "it=" in item
]
# Get the itaration numbers.
iteration_numbers = sorted(
get_iteration_numbers_from_keys(modes_keys_list)
)
# sargs = np.argsort(iteration_numbers)
# iteration_numbers = iteration_numbers[sargs]
modes_keys_list = sort_keys(modes_keys_list)
# Construct the data array.
data_array = []
for key in modes_keys_list:
# data_item = np.array(wfile[key])
# print(data_item.shape)
data_item = np.array(wfile[key])[
nghosts : nphi - nghosts, nghosts : ntheta - nghosts
]
try:
data_item = data_item["real"] + 1j * data_item["imag"]
except Exception as ex:
message(ex)
pass
data_array.append(data_item)
self.data = np.transpose(np.array(data_array), (2, 1, 0))
self.iteration_axis = np.array(iteration_numbers)
#########################################################
# Load inv_coords data
#########################################################
inv_file_name = f"qlm_inv_z[{bh}].xy.h5"
# get the full path.
full_path = self.data_dir + inv_file_name
# Open the modes file.
with h5py.File(full_path, "r") as wfile:
# Get all the mode keys.
modes_keys_list = list(wfile.keys())
# modes_keys_list = sorted(modes_keys_list)
# Get the mode keys containing the data.
modes_keys_list = [
item for item in modes_keys_list if "it=" in item
]
modes_keys_list = sort_keys(modes_keys_list)
data_array = []
for key in modes_keys_list:
data_item = np.array(wfile[key])[
nghosts : nphi - nghosts, nghosts : ntheta - nghosts
]
# data_item = data_item['real'] + 1j*data_item['imag']
data_array.append(data_item)
self.invariant_coordinates_data = np.transpose(
np.array(data_array), (2, 1, 0)
)
#########################################################
# Load metric determinant data
#########################################################
twometric_qtt_file_name = f"qlm_qtt[{bh}].xy.h5"
twometric_qtp_file_name = f"qlm_qtp[{bh}].xy.h5"
twometric_qpp_file_name = f"qlm_qpp[{bh}].xy.h5"
# set the full path.
full_path = self.data_dir + twometric_qtt_file_name
# Open the modes file.
with h5py.File(full_path, "r") as wfile:
# Get all the mode keys.
modes_keys_list = list(wfile.keys())
# modes_keys_list = sorted(modes_keys_list)
# Get the mode keys containing the data.
modes_keys_list = [
item for item in modes_keys_list if "it=" in item
]
modes_keys_list = sort_keys(modes_keys_list)
qtt_data_array = []
for key in modes_keys_list:
data_item = np.array(wfile[key])[
nghosts : nphi - nghosts, nghosts : ntheta - nghosts
]
# data_item = data_item['real'] + 1j*data_item['imag']
qtt_data_array.append(data_item)
qtt_data_array = np.array(qtt_data_array)
qtt_data_array = np.transpose(qtt_data_array, (2, 1, 0))
# set the full path.
full_path = self.data_dir + twometric_qtp_file_name
# Open the modes file.
with h5py.File(full_path, "r") as wfile:
# Get all the mode keys.
modes_keys_list = list(wfile.keys())
# modes_keys_list = sorted(modes_keys_list)
# Get the mode keys containing the data.
modes_keys_list = [
item for item in modes_keys_list if "it=" in item
]
modes_keys_list = sort_keys(modes_keys_list)
qtp_data_array = []
for key in modes_keys_list:
data_item = np.array(wfile[key])[
nghosts : nphi - nghosts, nghosts : ntheta - nghosts
]
# data_item = data_item['real'] + 1j*data_item['imag']
qtp_data_array.append(data_item)
qtp_data_array = np.array(qtp_data_array)
qtp_data_array = np.transpose(qtp_data_array, (2, 1, 0))
# set the full path.
full_path = self.data_dir + twometric_qpp_file_name
# Open the modes file.
with h5py.File(full_path, "r") as wfile:
# Get all the mode keys.
modes_keys_list = list(wfile.keys())
# modes_keys_list = sorted(modes_keys_list)
# Get the mode keys containing the data.
modes_keys_list = [
item for item in modes_keys_list if "it=" in item
]
modes_keys_list = sort_keys(modes_keys_list)
qpp_data_array = []
for key in modes_keys_list:
data_item = np.array(wfile[key])[
nghosts : nphi - nghosts, nghosts : ntheta - nghosts
]
# data_item = data_item['real'] + 1j*data_item['imag']
qpp_data_array.append(data_item)
qpp_data_array = np.array(qpp_data_array)
qpp_data_array = np.transpose(qpp_data_array, (2, 1, 0))
sqrt_met_det = np.sqrt(
np.linalg.det(
np.transpose(
np.array(
[
[qtt_data_array, qtp_data_array],
[qtp_data_array, qpp_data_array],
]
),
(2, 3, 4, 0, 1),
)
)
)
self.sqrt_met_det_data = sqrt_met_det
[docs]
def to_shear_modes_array(self, Grid=None, spin_weight=None, ell_max=8):
"""Decompose a given spherical array function on a sphere
into Spin Weighted Spherical Harmonic modes.
Parameters
----------
spin_weight: int, optional
The spin weight of the waveform.
It defaults to -2 for
a gravitational waveform.
ell_max: int, optional
The maximum value of the :math:`\\ell`
polar quantum number. Defaults to 8.
Grid: class instance
The class instance that contains
the properties of the spherical grid.
Returns
-------
waveforms_modes: modes_array
An instance of the `modes_array`
class containing the decomposed modes.
Notes
-----
1. Assumes that the sphere on which
this decomposition is carried out is so far out
that the coordinate system is spherical polar
on a round sphere.
2. Assumes that the poper area is
the same as its co-ordinate area.
3. Ensure that the label of the input
spherical array indicates whether
it is a time domain data or
frequency domain data.
"""
if Grid is None:
if self.Grid is None:
message("Please specify the grid specs. Assuming defaults.")
Grid = UniformGrid()
self.Grid = Grid
else:
Grid = self.Grid
if spin_weight is None:
if self.spin_weight is None:
message("Please specify spin weight. Assuming 2")
spin_weight = 2
self.spin_weight = spin_weight
else:
spin_weight = self.spin_weight
spin_weight = abs(spin_weight)
# Compute the meshgrid for theta and phi.
theta, phi = Grid.meshgrid
# Create a modes array object
# Create a modes list.
modes_list = construct_mode_list(ell_max, spin_weight=spin_weight)
if not self.label:
label = "decomposed_time_domain"
else:
label = self.label
# Create a mode array for the decomposed_waveform
waveform_modes = modes_array(
label=label, ell_max=ell_max, spin_weight=spin_weight
)
# Inherit the time or frequency axis.
if "time" in label:
# axis = self.time_axis
waveform_modes.time_axis = self.time_axis
else:
# axis = self.frequency_axis
waveform_modes.frequency_axis = self.frequency_axis
# Create the modes_array
waveform_modes.time_axis = self.time_axis[:]
# sargs = np.argsort(waveform_modes.time_axis)
# message(sargs)
waveform_modes.time_axis = waveform_modes.time_axis
waveform_modes.create_modes_array(
ell_max=ell_max, data_len=self.data_len
)
waveform_modes.modes_list = modes_list
# The area element on the sphere
# Compute the meshgrid for theta and phi.
theta, phi = Grid.meshgrid
phi = np.transpose(
np.array([phi for index in range(len(self.time_axis))]), (1, 2, 0)
)
# sqrt_met_det = np.sin(theta)
# sqrt_met_det = np.sqrt(np.power(np.sin(theta), 2))
darea = self.sqrt_met_det_data * Grid.dtheta * Grid.dphi
theta = np.emath.arccos(self.invariant_coordinates_data)
modes_list = [item for item in modes_list if item[0] >= spin_weight]
for mode in modes_list:
ell, all_emms = mode
for emm in all_emms:
# m value.
# message(f'Processing l{ell} m{emm}')
# Spin weighted spherical harmonic function at (theta, phi)
Ybasis_fun = np.conj(
Yslm_vec(
spin_weight=spin_weight,
ell=ell,
emm=emm,
theta_grid=theta,
phi_grid=phi,
)
)
# Ybasis_fun = np.array([np.conj(Yslm_vec(spin_weight=
# spin_weight, ell=ell, emm=emm,
# theta_grid=theta[:, :, index],
# phi_grid=phi[:, :, index])) for index in
# range(self.data_len)])
# Ybasis_fun = np.transpose(Ybasis_fun, (1, 2, 0))
# message('Ybasis_fun', Ybasis_fun.shape)
Ydarea = Ybasis_fun * darea
# message('Ydarea', Ydarea.shape)
# message(full_integrand)
# Using quad
# message('self.data', self.data.shape)
# multipole_ell_emm = np.tensordot(self.data, Ydarea,
# axes=((0, 1), (0, 1)))
multipole_ell_emm = np.sum(self.data * Ydarea, (0, 1))
# message(f'l{ell}m{emm}', multipole_ell_emm)
# message('multipole_ell_emm', multipole_ell_emm.shape)
waveform_modes.set_mode_data(
ell=ell, emm=emm, data=multipole_ell_emm
)
return waveform_modes