"""Functions for handling data IO operations from waveforms class."""
#########################
# Imports
#########################
import json
import re
import sys
import os
import h5py, tarfile
import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline as interp
from scipy.interpolate import interp1d
from waveformtools.waveformtools import (
cleandata,
interp_resam_wfs,
message,
xtract_camp_phase,
)
from scipy.stats import mode as stats_mode
##########################
# RIT data
##########################
def _key_gen(ell, emm, extras=None):
"""Generates strings to be used as keys for.
managing h5 datasets.
Parameters
----------
ell: int
The polar angular mode number
:math:`\\ell`.
emm: int
The aximuthal angular mode number
:math:`m`.
extras: str
Any extra string to be appended
to the end of the key.
Returns
-------
key: str
A string key.
"""
key = f"l{ell}_m{emm}"
if extras is not None:
key += f"_{extras}"
# message('adding rext')
return key
[docs]
def get_ell_max_from_keys(all_keys):
"""Get ell max from a list of keys.
Parameters
----------
all_keys: list
A list of strings string containing the modes keys.
Returns
-------
ell_max: int
The maximum available ell.
"""
# available mode numbers
all_ell_modes = set({})
# Get mode numbers
for item in all_keys:
this_match = re.search(r"\_l\d\_", item)
if this_match is None:
# message(f'Skipped file {item}')
continue
# message('Match found', this_match.string)
s1, s2 = this_match.span()
this_ell = int(this_match.string[s1 + 2 : s2 - 1])
# message(this_ell)
all_ell_modes.add(this_ell)
ell_max = max(all_ell_modes)
return ell_max
[docs]
def get_ell_max_from_file(data_dir, var_type="Psi4", file_name="*.h5"):
"""Get the largest available mode number from available data in files.
Parameters
----------
data_dir: string
A string containing the directory path where the mode files
can be found.
var_type: string, optional
A string that denotes the variable that is being loaded.
Options are Psi4 and strain.
The former is the default.
file_name: string, optional
The h5 file that contains the modes data.
It defaults to the only file in the directory.
If there are multiple files, it throws an error.
Returns
-------
ell_max: int
The maximum available ell.
keys_list: list
A list of data access keys.
Notes
-----
Reads in various ASCII dat files for RIT Psi4,
h5 files for RIT strain and gen strain.
"""
if var_type == "Psi4":
import os
# Get files
all_fnames = os.listdir(data_dir)
# Get only files
all_fnames = [
item for item in all_fnames if os.path.isfile(f"{data_dir}/{item}")
]
elif var_type == "Strain":
# import h5py
message("Fetching all keys from H5 file", message_verbosity=3)
data_file = h5py.File(f"{data_dir}/{file_name}")
all_fnames = list(data_file.keys())
data_file.close()
# message(all_fnames)
# Parse ell values
# Filter the keys.
all_fnames = [item for item in all_fnames if "_l" in item]
message("All files found", all_fnames, message_verbosity=3)
ell_max = get_ell_max_from_keys(all_fnames)
return ell_max, all_fnames
def _get_modes_list_from_keys(keys_list, r_ext):
"""Get the modes list from the keys list.
of an hdf file.
Parameters
----------
keys_list: list
The list containing all the keys
r_ext: float
The extraction radius of the data.
Returns
-------
modes_list: list
The list of modes.
"""
# Sort the keys to ensure a nice
# modes list structure.
keys_list_orig = sorted(keys_list)
message("List of keys received", keys_list, message_verbosity=3)
if r_ext != -1:
keys_list = [item for item in keys_list_orig if f"r{r_ext}" in item]
if keys_list == []:
message(
"Got an empty list. Searching for r_ext value in key string"
)
keys_list = [item for item in keys_list_orig if f"{r_ext}" in item]
message("List of filtered keys", keys_list, message_verbosity=3)
# The list of modes.
modes_list = []
# Initialize the emm modes sublist.
emm_modes_for_ell = []
# Present ell value to
# initialize the mode concatenation.
ell_old = 0
for key in keys_list:
# message(key)
# Get the ell value
ell, emm = _get_ell_emm_from_key(key)
message(
"ell value: ",
ell,
"emm value:",
emm,
message_verbosity=3,
)
if ell != ell_old:
# If the ell value has changed,
# update the modes list before moving
# onto the next ell value.
modes_list.append([ell_old, emm_modes_for_ell])
# The present ell value is the old
# ell value.
ell_old = ell
# Reset the ell_mode list.
emm_modes_for_ell = []
# Update it with the new emm mode.
emm_modes_for_ell.append(emm)
modes_list.append([ell, emm_modes_for_ell])
return modes_list
def _get_ell_emm_from_key(key):
"""Get the :math:`\\ell` and :math:`m` values.
from a given key string of an hdf file.
Parameters
----------
key: str
The input key string
Returns
-------
ell: int
The :math:`\\ell` value
emm: int
The :math:`m` value.
Notes
-----
Assumes that the input string has :math:`\\ell` and :math:`m` values
in the form `l{int}m{int}`.
"""
import re
str_match = re.search(r"l\d*", key)
ell_str_start = str_match.start()
ell_str_end = str_match.end()
ell = int(key[ell_str_start + 1 : ell_str_end])
str_match = re.search(r"m-*\d*", key)
emm_str_start = str_match.start()
emm_str_end = str_match.end()
emm = int(key[emm_str_start + 1 : emm_str_end])
return ell, emm
[docs]
def get_iteration_numbers_from_keys(keys_list):
"""Get the iteration number from keys.
Parameters
----------
keys_list: list
The list of keys.
Returns
-------
iteration_numbers: list
The list containing the iteration
numbers.
"""
import re
iteration_numbers = []
for key in keys_list:
str_match = re.search(r" it=\d* ", key)
it_str_start = str_match.start()
it_str_end = str_match.end()
it_value = int(key[it_str_start + 4 : it_str_end])
iteration_numbers.append(it_value)
return iteration_numbers
[docs]
def get_files_in_tar_gz(file_path):
"""Get the files in a tar.gz.
Parameters
----------
file_path: str,pathlib.Path
The full path to the tar file
"""
with tarfile.open(file_path, "r:gz") as tf:
all_files = [one_file.name for one_file in tf.getmembers()]
return all_files
[docs]
def get_ell_max_from_tar_gz_file(file_path):
"""Get the ell_max from files in a tar.gz.
Parameters
----------
file_path: str,pathlib.Path
The full path to the tar file
Returns
-------
ell_max: int
The l max of the modes available
filtered_fnames: list of str
A list contaning the names of the modes files.
"""
all_fnames = get_files_in_tar_gz(file_path)
filtered_fnames = [item for item in all_fnames if "_l" in item]
message("All files found", filtered_fnames, message_verbosity=3)
ell_max = get_ell_max_from_keys(filtered_fnames)
return ell_max, filtered_fnames
[docs]
def read_data_from_tar_gz_subfile(tar_gz_file_handle, subfile_name):
"""Extract and read the data in a subfile inside the given open.
tar gz file handle
"""
with tar_gz_file_handle.extractfile(subfile_name) as df:
data = np.genfromtxt(df)
return data
[docs]
def reorder_modes_list(modes_list):
"""Reorder a modes list."""
modes_dict = {}
ordered_modes_list = []
for ell, emm_list in modes_list:
modes_dict.update({ell: sorted(emm_list)})
sorted_ells = sorted(list(modes_dict.keys()))
for ell in sorted_ells:
ordered_modes_list.append([ell, modes_dict[ell]])
return ordered_modes_list
[docs]
def construct_mode_list(ell_max, spin_weight):
"""
Construct a modes list in the form.
[[ell1, [emm1, emm2, ...], [ell2, [emm..]],..]
given the :math:`\\ell_{max}.`
Parameters
----------
spin_weight: int
The spin weight of the modes.
ell_max: int
The :math:`\\ell_{max}` of the modes list.
Returns
-------
modes_list: list
A list containg the mode indices lists.
Notes
-----
The modes list is the form which the `waveform` object understands.
"""
# The modes list.
modes_list = []
message("Construct modes list in dataIO", message_verbosity=4)
message(
f"ell_max {ell_max}, spin weight {spin_weight}", message_verbosity=4
)
for ell_index in range(abs(spin_weight), ell_max + 1):
# Append all emm modes for each ell mode.
modes_list.append([ell_index, list(range(-ell_index, ell_index + 1))])
message(
"ell max of created modes list"
f"{max([item[0] for item in modes_list])}",
message_verbosity=4,
)
message("--------------------------------\n", message_verbosity=4)
return modes_list
[docs]
def sort_keys(modes_keys_list):
"""Sort the keys in a list based on its iteration number
Parameters
----------
modes_keys_list: str
The list of keys.
Returns
-------
sorted_modes_keys_list: str
The sorted list.
"""
iteration_numbers = get_iteration_numbers_from_keys(modes_keys_list)
sargs = np.argsort(iteration_numbers)
sorted_modes_keys_list = np.array(modes_keys_list)[sargs]
return sorted_modes_keys_list
from matplotlib.style import available
from pathlib import Path
[docs]
def load_RIT_Psi4_data_from_disk(
data_file_path,
wfa=None,
label="RIT_rPsi4inf",
ell_max=None,
modes_list=None,
save_as_ma=False,
spin_weight=-2,
resam_type="finest",
interp_kind="cubic",
crop=False,
centre=False,
output_modes_array=False,
):
"""Load the Psi4 waveforms from the RIT catalogue.
from ASCII files from disk.
Parameters
----------
wfa: ModesArray
An instance of the waveforms ModesArray class.
Updates this instance if provided, else creates a new instance.
data_file_path: string/pathlib.Path
A string containing the directory path where
the mode files can be found.
label: string, optional
The label of the modes_array object.
ell_max: int, optional
The maximum mode number to load. If not specified,
then all available modes are loaded.
save_as_ma: bool, optional
Save to disk again as a modes_array h5 file?
spin_weight: int, optional
The spin weight of the object. Used for filtering modes.
Defaults to -2.
resam_type: string, float, optional
The type of resampling to do. Options are finest and coarsest,
and user input float.
interp_kind: string, optional
The interpolation type to use. Default is cubic.
Returns
-------
rit_modes_array: modes_array
A modes_array instance containing the loaded modes if
`output_modes_array` is True
time_axis, modes_data: ndarray
Time axis and an array whose
first axis is time and second is
flatened modes index consistent with
ModesArray
Notes
-----
It seems like the time axis of individual modes are identical to
each other. Hence, one need not worry about choosing the time domain.
This may change in the future.
"""
tar_file_name_prefix = os.path.basename(data_file_path)[:-7]
# For interpolation
from scipy.interpolate import interp1d
message("Loading RIT Psi4 type data.", message_verbosity=1)
available_ell_max, available_modes_files = get_ell_max_from_tar_gz_file(
data_file_path
)
# Create a modes array
if modes_list is None:
# Max available mode l.
if ell_max is None:
ell_max = available_ell_max
else:
if ell_max > available_ell_max:
message(
f"ell_max {ell_max} requested but maximum"
"available ell_max is {available_ell_max}",
message_verbosity=1,
)
ell_max = available_ell_max
# Construct a modes list
wf_modes_list = construct_mode_list(
ell_max=ell_max, spin_weight=spin_weight
)
message("The modes list is", wf_modes_list, message_verbosity=3)
else:
modes_list = reorder_modes_list(modes_list)
ell_max = max([item[0] for item in modes_list])
wf_modes_list = modes_list
##########################################
# Read in the data
#########################################
message("Reading in modes...", message_verbosity=2)
created = False
with tarfile.open(data_file_path, "r:gz") as open_tar_file_handle:
for ell, emm_list in wf_modes_list:
for emm in emm_list:
mode_idx = ell * 2 + ell + emm
message("Loading", ell, emm, message_verbosity=2)
# Construct file path
wf_psi4_mode_data = read_data_from_tar_gz_subfile(
open_tar_file_handle,
f"{tar_file_name_prefix}/rPsi4_l{ell}_m{emm}_rInf.asc",
)
# Get time axis
wf_psi4_time = wf_psi4_mode_data[:, 0]
if not created:
message("\t Inferring time axis", message_verbosity=2)
min_dt = round(min(np.diff(wf_psi4_time)), 2)
max_dt = round(max(np.diff(wf_psi4_time)), 2)
message(
f"Min dt {min_dt} and Max dt {max_dt}",
message_verbosity=2,
)
if resam_type == "finest":
# Choose finest available timestep
# for upto 3 decimal digits.
m_dt = min_dt
message(
"\tResampling at the finest timestep",
m_dt,
message_verbosity=2,
)
elif resam_type == "coarsest":
m_dt = max_dt
message(
"\tResampling at the coarsest timestep",
m_dt,
message_verbosity=2,
)
elif isinstance(resam_type, float):
m_dt = resam_type
message(
"\tResampling at user defined timestep",
m_dt,
message_verbosity=2,
)
else:
raise KeyError(
f"Unrecognized resampling type {resam_type}"
)
# New (resampled) time axis
time_axis = np.arange(
wf_psi4_time[0], wf_psi4_time[-1] + m_dt, m_dt
)
# Length of data.
data_len = len(time_axis)
# To hold the loaded data
modes_data = np.zeros(
(data_len, (ell_max + 1) ** 2), dtype=np.complex128
)
created = True
###############################
# Load the phase data
##############################
Yphase = wf_psi4_mode_data[:, 4]
Yphase_interp_fun = interp1d(
wf_psi4_time, Yphase, kind=interp_kind
)
# Resample
Yphase_resam = Yphase_interp_fun(time_axis)
###########################
# Load the amplitude data
###########################
Yamp = wf_psi4_mode_data[:, 3]
Yamp_interp_fun = interp1d(wf_psi4_time, Yamp, kind=interp_kind)
# Resample
Yamp_resam = Yamp_interp_fun(time_axis)
wfmode = Yamp_resam * np.exp(1j * Yphase_resam)
###################################
# Set the modes data
###################################
modes_data[:, mode_idx] = wfmode
if output_modes_array == True:
from waveformtools.modes_array import ModesArray
data_dir = os.path.dirname(data_file_path)
if not isinstance(wfa, ModesArray):
wfa = ModesArray(
label=label,
data_dir=data_dir,
modes_list=modes_list,
extra_mode_axes_shape=None,
)
# Create a modes array object
wfa.create_modes_array(ell_max=ell_max, data_len=data_len)
# Assign to it the time axis
wfa.time_axis = time_axis
wfa.ell_max = ell_max
wfa.modes_list = wf_modes_list
wfa.r_ext = np.inf
wfa._actions += "->load_modes"
wfa._modes_data = modes_data.T
if crop is not False or centre is True:
# Trim or recenter
if crop is True or centre is True:
wfa.trim(trim_upto_time=0)
wfa.centered = True
wfa._actions += "->recenter"
elif isinstance(crop, float):
wfa.trim(trim_upto_time=crop)
wfa._actions += "->crop"
if save_as_ma is True:
# Save the modes array as waveforms hdf file
wfa.save_modes(out_file_name="{label}_resam.h5")
wfa._actions += "->save_as_wfh5"
return wfa
else:
return time_axis, modes_data
[docs]
def load_RIT_Strain_data_from_disk(
wfa=None,
data_dir="./",
file_name="*",
label="RIT_strain",
spin_weight=-2,
ell_max="auto",
resam_type="auto",
interp_kind="cubic",
save_as_ma=False,
modes_list=None,
crop=False,
centre=True,
r_ext_factor=1,
debug=False,
):
"""Load the RIT or strain waveforms from the RIT/ MAYA catalogue data,.
from hdf5 files from disk.
Parameters
----------
wfa: waveforms
An instance of the waveforms class. Creates a new one if
not provided.
data_dir: string
A string containing the directory path
where the mode files can be found.
label: string, optional
The label of the modes_array object.
ell_max: int, optional
The maximum mode number to load.
If not specified, then all available modes are loaded.
save_as_ma: bool, optional
Save to disk again as a modes_array h5 file?
spin_weight: int, optional
The spin weight of the object. Used for filtering modes.
Defaults to -2.
resam_type: string, float, optional
The type of resampling to do. Options are
the finest and coarsest, and user input float.
interp_kind: string, optional
The interpolation type to use. Default is cubic.
Returns
-------
rit_modes_array: modes_array
A modes_array instance containing the loaded modes.
Notes
-----
It seems like the time axis of individual modes are
identical to each other. Hence, one need not worry about
choosing the time domain. This may change in future.
"""
message("Loading RIT strain data.", message_verbosity=1)
from functools import partial
# Initialize the interpolator
if isinstance(interp_kind, int):
message(
"Interpolating using InterpolatedUnivariateSpline",
message_verbosity=2,
)
interpolator = partial(interp, k=interp_kind)
elif isinstance(interp_kind, str):
from scipy.interpolate import interp1d
message("Interpolating using interp1d", message_verbosity=2)
interpolator = partial(interp1d, kind=interp_kind)
from waveformtools.modes_array import ModesArray
# Max available mode l.
ell_max_act, keys_list = get_ell_max_from_file(
data_dir=data_dir, var_type="Strain", file_name=file_name
)
####################################
# Set variables with priorities
# Note: rework this in dictionaries
####################################
if ell_max == "auto":
ell_max = ell_max_act
if ell_max is None:
message("ell_max not provided.")
if wfa is not None:
wfa_ell_max = wfa.ell_max
else:
wfa_ell_max = None
if wfa_ell_max is None:
message("modes array not provided. Setting ell_max from file...")
ell_max = ell_max_act
else:
message("Setting ell_max from given modes_array")
ell_max = wfa.ell_max
message("Chosen ell max", ell_max, "Available ell_max", ell_max_act)
if not isinstance(wfa, ModesArray):
# Create a modes array
wfa = ModesArray(label=label, ell_max=ell_max, modes_list=modes_list)
# wfa = modes_array(label=label, data_dir=data_dir, modes_list=modes_list)
if debug is True:
wf_nl = ModesArray(
label=label + "_nl", ell_max=ell_max, modes_list=modes_list
)
if not data_dir:
data_dir = wfa.data_dir
else:
wfa.data_dir = data_dir
if not file_name:
file_name = wfa.file_name
else:
wfa.file_name = file_name
if not ell_max:
ell_max = wfa.ell_max
else:
wfa.ell_max = ell_max
# ell_max = 12
if not modes_list:
if not wfa.modes_list:
message("Constructing the modes list")
modes_list = construct_mode_list(
ell_max=ell_max, spin_weight=wfa.spin_weight
)
else:
modes_list = wfa.modes_list
else:
wfa.modes_list = modes_list
# For interpolation
from scipy.interpolate import interp1d
# Alias of the modes_array
# label = 'q1a0_a'
# Enforce only l>abs(spin_Weight) modes.
# wf_modes_list = [item for item in wf_modes_list if item[0]>=abs(spin_weight)]
# tend = []
# tstart = []
##########################################
# Read in the data
#########################################
# message(file_name)
# Get the time axis
# import h5py
data_file = h5py.File(f"{data_dir}/{file_name}")
try:
# For RIT data type
time_axis = data_file["NRTimes"][...]
# dt_auto = time_axis[1]-time_axis[0]
from scipy.stats import mode as stats_mode
dt_auto = stats_mode(np.diff(time_axis))[0]
except Exception as excep:
dt_auto = None
message(
"NRTimes not present. Will compute dt auto from mode time axis",
excep,
message_verbosity=2,
)
message("Reading in modes...")
for ell, emm_list in modes_list:
for emm in emm_list:
this_amp_key = f"amp_l{ell}_m{emm}"
this_phase_key = f"phase_l{ell}_m{emm}"
message("Loading", ell, emm, message_verbosity=3)
# Construct file path
# Create modes_array on first run
###############################
# Load the phase data
##############################
Tphase = data_file[this_phase_key]["X"][...]
Yphase = data_file[this_phase_key]["Y"][...]
# message(wfa.modes_data)
if wfa.modes_data.all() == np.array(None):
message("Creating modes data")
if dt_auto is None:
# For MAYA data type
time_axis = Tphase
# dt_auto = time_axis[1]-time_axis[0]
from scipy.stats import mode as stats_mode
dt_auto = stats_mode(np.diff(time_axis))[0]
min_dt = round(min(np.diff(time_axis)), 2)
max_dt = round(max(np.diff(time_axis)), 2)
message(f"Default dt is {dt_auto}", message_verbosity=2)
if resam_type == "auto":
# Choose finest available timestep
# for upto 3 decimal digits.
m_dt = dt_auto
message(
"Sampling at the default timestep",
m_dt,
message_verbosity=2,
)
elif resam_type == "finest":
m_dt = min_dt
message(
"Sampling at the finest available timestep",
m_dt,
message_verbosity=2,
)
elif resam_type == "coarsest":
m_dt = max_dt
message(
"Sampling at the coarsest available timestep",
m_dt,
message_verbosity=2,
)
elif isinstance(resam_type, float):
m_dt = resam_type
message(
"Resampling at user defined timestep",
m_dt,
message_verbosity=2,
)
# New (resampled) time axis
time_axis = np.arange(time_axis[0], time_axis[-1], m_dt)
else:
raise NotImplementedError(
f"Unknown resampling parameter {resam_type}"
)
# Length of data.
data_len = len(time_axis)
# Create a modes array object
wfa.create_modes_array(ell_max=ell_max, data_len=data_len)
# Assign to it the time axis
wfa.time_axis = time_axis
# message(wfa.time_axis)
# message(wfa.time_axis - wf_psi4_time)
# continue
###################################
# Uniform sampling
###################################
# message('Wfa time axis', wfa.time_axis)
Yphase_interp_fun = interpolator(Tphase, Yphase)
# Yphase_interp_fun = interpolator(Tphase, Yphase, k=3)
# Resample
Yphase_resam = Yphase_interp_fun(time_axis)
###########################
# Load the amplitude data
###########################
Tamp = data_file[this_amp_key]["X"][...]
Yamp = data_file[this_amp_key]["Y"][...]
Yamp_interp_fun = interpolator(Tamp, Yamp)
# Yamp_interp_fun = interpolator(Tamp, Yamp, k=3)
# wf_c = Yamp*np.exp(1j*Yphase)
# Resample
Yamp_resam = Yamp_interp_fun(time_axis)
wfmode = Yamp_resam * np.exp(1j * Yphase_resam)
# if not (Tphase==Tamp).all():
# raise ValueError('The time axis of the amps and phase are different!')
# wfmode = interp_resam_wfs(wf_c, Tphase, time_axis, k=4)
###################################
# Load the modes data
###################################
wfa.set_mode_data(ell=ell, emm=emm, data=r_ext_factor * wfmode)
data_file.close()
#####################
# Finishing touches
#####################
wfa._actions += "->load_modes"
# Trim or recenter
if centre is True:
wfa.trim(trim_upto_time=0)
wfa._actions += "->center"
if isinstance(crop, float):
wfa.trim(trim_upto_time=crop)
wfa._actions += "->crop"
if save_as_ma is True:
# Save the modes array as waveforms hdf file
wfa.save_modes(out_file_name=f"{label}_resam.h5")
wfa._actions += "->save_as_wfh5"
if debug is True:
return wfa, wf_nl
else:
return wfa
#################################################################
# Generic data type
#################################################################
[docs]
def load_gen_data_from_disk(
wfa=None,
label="generic waveform",
data_dir="./",
file_name="*.h5",
r_ext=None,
ell_max=8,
pre_key=None,
modes_list=None,
crop=False,
centre=True,
key_ex=None,
r_ext_factor=1,
*args,
**kwargs,
):
"""Load the RIT strain waveforms from the RIT catalogue,.
from hdf5 files from disk.
Parameters
----------
data_dir: string
A string containing the directory path where
the mode files can be found.
label: string, optional
The label of the modes_array object.
ell_max: int, optional
The maximum mode number to load. If not specified,
then all available modes are loaded.
save_as_ma: bool, optional
Save to disk again as a modes_array h5 file?
spin_weight: int, optional
The spin weight of the object. Used for filtering modes.
Defaults to -2.
resam_type: string, float, optional
The type of resampling to do.
Options are finest and coarsest, and user input float.
interp_kind: string, optional
The interpolation type to use. Default is cubic.
Returns
-------
rit_modes_array: modes_array
A modes_array instance containing the loaded modes.
Notes
-----
It seems like the time axis of individual modes are identical to
each other. Hence, one need not worry about
choosing the time domain. This may change in future.
"""
message("Loading generic data.", message_verbosity=1)
from waveformtools.modes_array import ModesArray
# Max available mode l.
if not isinstance(wfa, ModesArray):
# Create a modes array
wfa = ModesArray(
label=label,
data_dir=data_dir,
modes_list=modes_list,
ell_max=ell_max,
)
# if not data_dir:
# data_dir = wfa.data_dir
# else:
# wfa.data_dir = data_dir
# if not file_name:
# file_name = wfa.file_name
# else:
# wfa.file_name = file_name
# if not ell_max:
# ell_max = wfa.ell_max
# else:
# wfa.ell_max = ell_max
# if not label:
# label = wfa.label
# ell_max = 12
# Max available mode l.
full_path = f"{data_dir}/{file_name}"
message(f"Loading data from {full_path}", message_verbosity=1)
# Enforce only l>2 modes.
# wf_modes_list = [item for item in wf_modes_list if
# item[0]>=abs(spin_weight)]
# Open the modes file.
# import h5py, json
full_path = wfa.data_dir/wfa.file_name
# message(wfa.file_name)
with h5py.File(full_path, "r") as wfile:
#################################
# Get metadata
###############################
# Load metadata if present.
try:
# if 1:
metadata_bytes = bytes(np.void(wfile["metadata"])).decode()
metadata = json.loads(metadata_bytes)
# Import the metadata.
for key, val in metadata.items():
if val is not None:
wfa.__dict__.update({key: val})
message("Metadata loaded", message_verbosity=2)
message(
"Waveform meta data:", wfa.get_metadata(), message_verbosity=2
)
except Exception as ex:
# If no metadata found, pass empty dict for updation.
message("No metadata found!", ex)
metadata = {}
pass
# Get the list of keys.
modes_keys_list = list(wfile.keys())
# message('Keys ', modes_keys_list)
# message(wfa.get_metadata())
# Check and filter for particular key string pattern
if key_ex is not None:
# Filter the keys according to key_ex if specified.
message("Filtering as per", key_ex)
wfa.key_ex = key_ex
modes_keys_list = [
item for item in modes_keys_list if key_ex in item
]
# message(modes_keys_list)
else:
message(
"key_ex is not specified. Proceeding without filtering..",
message_verbosity=2,
)
modes_keys_list = sorted(modes_keys_list)
# message('Modes keys', modes_keys_list)
wfa.mode_keys_list = modes_keys_list
# Construct the list of modes if it doesnt exist.
##########################
# Construct modes list
##########################
if r_ext:
wfa.r_ext = r_ext
elif wfa.r_ext is not None:
r_ext = wfa.r_ext
else:
message("Unable to parse extraction radius!")
sys.exit(0)
if not modes_list:
# Check if modes list is given for which mode to load.
# If the list of modes is not given then construct the list of modes.
if not ell_max:
# If ell max is also not specified,
# construct the list of modes using
# the list of modes h5 file keys.
modes_list = _get_modes_list_from_keys(modes_keys_list, r_ext)
# message(modes_list)
# Get the ell max
ell_max = max([item[0] for item in modes_list])
# metadata.update({ell_max : ell_max })
wfa.ell_max = ell_max
else:
# self.ell_max = ell_max
# If ell max is given, construct the
# list of modes directly.
modes_list = construct_mode_list(
ell_max, spin_weight=wfa.spin_weight
)
# set the modes list attr.
wfa.modes_list = modes_list
else:
# self.modes_list = modes_list
# If modes list is given, get ell_max from it.
if not ell_max:
# Get the ell max
ell_max = max([item[0] for item in modes_list])
# Set the ell_max attribute if not already.
if not wfa.ell_max:
wfa.ell_max = ell_max
#################################################
# Load modes
#################################################
# Modes array created flag
cflag = 0
# Load the modes listed in mode_numbers list
for item in modes_list:
# For every ell mode list in modes_list
ell, emm_list = item
for emm_index, emm in enumerate(emm_list):
# For every (ell, emm) mode.
# Find the key corresponding to the mode
try:
key = str(
[
item
for item in modes_keys_list
if re.search(
f"l{ell}_m{emm}_r{r_ext}", item
)
][0]
)
# message('The loaded key is ', key, type(key))
# message('The loaded key is ', key, type(key))
# if key=='l0_m0_r500.00':
# message('Its alright')
except Exception as ex:
message(
f"Waveform dataset for l{ell}, m{emm} not found",
ex,
)
sys.exit(0)
# Get the data
data = np.array(wfile[key])
# set the time and data axis
time_axis, data_re = cleandata([data[:, 0], data[:, 1]])
time_axis, data_im = cleandata([data[:, 0], data[:, 2]])
# create flag
if not cflag:
if (wfa.modes_data == np.array(None)).all():
if crop:
# Crop the beginning portion.
# delta_t = time_axis[1] - time_axis[0]
# shift = int(wfa.r_ext / delta_t)
raise NotImplementedError(
"Not implemented! Please contact the developers!"
)
else:
shift = 0
data_len = len(time_axis) - shift
wfa._data_len = data_len
# Delete the attribute
# del self.modes_data
# Create an array for the waveform mode object
wfa.create_modes_array(wfa.ell_max, data_len)
# self.modes_data = np.zeros([ell_max+1, 2*(ell_max+1) +1, data_len],
# dtype=np.complex128)
# self.modes_data = np.zeros([ell_max+1, 2*(ell_max+1) +1, data_len],
# dtype=np.complex128)
cflag = 1
# set the time axis.
# wfa.time_axis = time_axis[shift:]
wfa._time_axis = time_axis
# wfa.set_mode_data(ell, emm,
# r_ext_factor*(data_re[shift:] + 1j * data_im[shift:]))
wfa.set_mode_data(
ell=ell,
emm=emm,
data=r_ext_factor * (data_re + 1j * data_im),
)
##############################
# Recenter axis
##############################
# Trim or recenter
if centre:
crop_time = 0
if crop:
crop_time = wfa.r_ext
wfa.trim(trim_upto_time=crop_time)
# maxloc = np.argmax(np.absolute(self.mode(2, 2)))
# maxtime = time_axis[shift + maxloc]
# if wfa.maxtime is None:
# wfa.maxtime = maxtime
# message("Max time is", maxtime)
# if centre:
# wfa.time_axis = time_axis[shift:] - maxtime
# message(wfa.file_name)
return wfa
###################################################################
# SpEC
###################################################################
[docs]
def load_SpEC_data_from_disk(
wfa=None,
label="SXS Strain",
data_dir="./",
file_name="rhOverM_Extrapolated_N5_CoM_Mem.h5",
extrap_order=None,
r_ext=None,
ell_max=None,
centre=True,
modes_list=None,
save_as_ma="False",
resam_type="auto",
interp_kind="cubic",
compression_opts=0,
r_ext_factor=1,
debug=False,
):
"""Load the SpEC waveform to modes_array,from hdf5 files from disk.
Parameters
----------
wfa: modes_array, optional
The modes array to which to store the loaded waveform to. A new modes array will be returned
if not provided.
data_dir: string
A string containing the directory path where the mode files can be found.
file_name: string
The name of the file containing the waveform data.
label: string, optional
The label of the modes_array object.
ell_max: int, optional
The maximum mode number to load. If not specified,
then all available modes are loaded.
save_as_ma: bool, optional
Save to disk again as a modes_array h5 file?
resam_type: string, float, optional
The type of resampling to do. Options are finest and coarsest, and user input float.
interp_kind: string, optional
The interpolation type to use. Default is cubic.
Returns
-------
modes_array: modes_array
A modes_array instance containing the loaded modes.
"""
message(f"Loading SpEC data N{extrap_order}", message_verbosity=1)
from waveformtools.modes_array import ModesArray
# Load SXS waveforms to modes_array.
# Spectra infinty
full_path = f"{data_dir}/{file_name}"
wf_f0 = h5py.File(full_path)
if extrap_order is not None:
# Extrap Key pattern
gkey = f"Extrapolated_N{extrap_order}.dir"
try:
wf_file = wf_f0[gkey]
except KeyError as ke:
message(
ke,
":\n Reading as SpEC file in external extrap mode",
message_verbosity=2,
)
wf_file = wf_f0
else:
wf_file = wf_f0
all_keys = list(wf_file.keys())
print(all_keys)
# Max available mode l.
ell_max_act = get_ell_max_from_keys(all_keys)
# message(ell_max_act)
####################################
# Set variables with priorities
# Note: rework this in dictionaries
####################################
if ell_max == "auto":
ell_max = ell_max_act
if ell_max is None:
message("ell_max not provided.")
if wfa is not None:
wfa_ell_max = wfa.ell_max
else:
wfa_ell_max = None
if wfa_ell_max is None:
message("modes array not provided. Setting ell_max from file...")
ell_max = ell_max_act
else:
message("Setting ell_max from given modes_array")
ell_max = wfa.ell_max
message("Chosen ell max", ell_max, "Available ell_max", ell_max_act)
if not isinstance(wfa, ModesArray):
# Create a modes array
wfa = ModesArray(label=label, ell_max=ell_max, modes_list=modes_list)
# wfa = modes_array(label=label, data_dir=data_dir, modes_list=modes_list)
if debug is True:
wf_nl = ModesArray(
label=label + "_nl", ell_max=ell_max, modes_list=modes_list
)
wfa.extrap_order = extrap_order
message(f"Using extrap order {extrap_order}")
if not data_dir:
data_dir = wfa.data_dir
else:
wfa.data_dir = data_dir
if not file_name:
file_name = wfa.file_name
else:
wfa.file_name = file_name
if not ell_max:
ell_max = wfa.ell_max
else:
wfa.ell_max = ell_max
# ell_max = 12
if not modes_list:
if not wfa.modes_list:
message("Constructing the modes list")
# sys.exit(0)
modes_list = construct_mode_list(
ell_max=ell_max, spin_weight=wfa.spin_weight
)
else:
modes_list = wfa.modes_list
else:
wfa.modes_list = modes_list
# Filter
modes_list = [item for item in modes_list if item[0] >= 2]
############################################################
# create flag
# flag = None
if (wfa.modes_data == np.array(None)).any():
wfa = initialize_modes_array(wf_time, wfa, resam_type=resam_type, ell_max=ell_max)
# Load modes
for ell, emm_list in modes_list:
for emm in emm_list:
# message(ell, emm)
this_key = f"Y_l{ell}_m{emm}.dat"
# Input waveform from disk
wf_data = wf_file[this_key][...]
wf_time = wf_data[:, 0]
# print('wf time', wf_time)
wf_data_re = wf_data[:, 1]
wf_data_im = wf_data[:, 2]
wf_data_c = wf_data_re + 1j * wf_data_im
# wf_amp, wf_phase = xtract_camp_phase(wf_data_re, wf_data_im)
# Interpolate and resamplea
# Note
# Interpolating in amplitude and phase is better
# and has lower interpolation errors
# but is slower due to unwrapping of phases.
wf_int = interp_resam_wfs(
wf_data_c, wf_time, time_axis, kind="cubic", k=None
)
# amp_int = interp_resam_wfs(wf_amp, wf_time, time_axis)
# phase_int = interp_resam_wfs(wf_phase, wf_time, time_axis)
# re_int = interp1d(wf_time, wf_data_re)
# message(wf_time[0], wf_time[-1], time_axis[0], time_axis[-1])
# re_dat = re_int(time_axis)
# im_int = interp1d(wf_time, wf_data_im)
# im_dat = im_int(time_axis)
# wfa.set_mode_data(ell, emm, data=re_dat + 1j * im_dat)
# message(f"Setting mode Data {ell}, {emm} \n {wf_int} \n" )
wfa.set_mode_data(ell=ell, emm=emm, data=wf_int)
if debug is True:
for ell, emm_list in modes_list:
for emm in emm_list:
this_key = f"Y_l{ell}_m{emm}.dat"
# Input waveform from disk
wf_data = wf_file[this_key]
wf_time = wf_data[:, 0]
wf_data_re = wf_data[:, 1]
wf_data_im = wf_data[:, 2]
if wf_nl.modes_data.all() == np.array(None):
wf_nl.create_modes_array(
ell_max=ell_max, data_len=len(wf_time)
)
wf_nl.time_axis = wf_time
wf_nl.data_len = len(wf_time)
wf_nl.set_mode_data(ell=ell, emm=emm, data=wf_data_re + 1j * wf_data_im)
if centre:
wfa.trim(trim_upto_time=0)
if save_as_ma is True:
# Save the modes array as waveforms hdf file
wfa.save_modes(
out_file_name=f"{label}_resam.h5",
compression_opts=compression_opts,
)
wf_f0.close()
if debug is True:
return wfa, wf_nl
else:
return wfa
########################################################################################################################
# SpECTRE
########################################################################################################################
[docs]
def load_SpECTRE_data_from_disk(
wfa=None,
label="SpECTRE Strain",
data_dir="./",
file_name="rhOverM_Extrapolated_N5_CoM_Mem.h5",
r_ext=None,
ell_max=12,
centre=True,
modes_list=None,
r_ext_factor=1,
save_as_ma="False",
resam_type="auto",
kind="cubic",
compression_opts=0,
spin_weight=None,
):
"""Load the SpECTRE or SpEC CCE waveform to modes_array,
from hdf5 files from disk.
Parameters
----------
wfa: modes_array, optional
The modes array to which to store
the loaded waveform to. A new modes array will be returned
if not provided.
data_dir: string
A string containing the directory path
where the mode files can be found.
file_name: string
The name of the file containing the waveform data.
label: string, optional
The label of the modes_array object.
ell_max: int, optional
The maximum mode number to load. If not specified,
then all available modes are loaded.
save_as_ma: bool, optional
Save to disk again as a modes_array h5 file?
resam_type: string, float, optional
The type of resampling to do.
Options are finest and coarsest, and user input float.
interp_kind: string, optional
The interpolation type to use. Default is cubic.
Returns
-------
modes_array: modes_array
A modes_array instance containing the loaded modes.
"""
#spin_weight = -2
if spin_weight is None:
if wfa is None:
spin_weight=-2
else:
if wfa.spin_weight is None:
spin_weight = -2
else:
spin_weight = wfa.spin_weight
message("Loading SpECTRE data.", message_verbosity=1)
from waveformtools.waveforms import ModesArray
# Load SXS waveforms to modes_array.
# Spectre infinty
full_path = f"{data_dir}/{file_name}"
try:
import scri
except Exception as ex:
message(
"scri module is required for reading in SXS waveforms."
"Please install and try again",
ex,
)
sys.exit(0)
wf_file = scri.rpxmb.load(full_path)[0].to_inertial_frame()
ell_max_act = int(wf_file.ell_max)
#import pdb
#pdb.set_trace()
# Add readinf ell max from file
if ell_max is None:
if wfa is None:
wfa_ell_max = None
else:
wfa_ell_max = wfa.ell_max
if wfa_ell_max is None:
ell_max = ell_max_act
else:
ell_max = wfa_ell_max
if not isinstance(wfa, ModesArray):
# Create a modes array
wfa = ModesArray(label=label, ell_max=ell_max, modes_list=modes_list)
if not data_dir:
data_dir = wfa.data_dir
else:
wfa.data_dir = data_dir
if not file_name:
file_name = wfa.file_name
else:
wfa.file_name = file_name
if not ell_max:
ell_max = wfa.ell_max
else:
wfa.ell_max = ell_max
if not modes_list:
if not wfa.modes_list:
message("Constructing the modes list")
modes_list = construct_mode_list(
ell_max=ell_max, spin_weight=spin_weight
)
else:
modes_list = wfa.modes_list
else:
wfa.modes_list = modes_list
wf_time = wf_file.t
if (wfa.modes_data == np.array(None)).any():
wfa = initialize_modes_array(wf_time, wfa, resam_type=resam_type, ell_max=ell_max)
time_axis = wfa.time_axis
for ell, emm_list in modes_list:
for emm in emm_list:
wf_data = wf_file.data[:, wf_file.index(ell, emm)]
wf_data_re = wf_data.real
wf_data_im = wf_data.imag
re_int = interp1d(wf_time, wf_data_re, kind=kind)
re_dat = re_int(time_axis)
im_int = interp1d(wf_time, wf_data_im, kind=kind)
im_dat = im_int(time_axis)
wfa.set_mode_data(ell=ell, emm=emm, data=re_dat + 1j * im_dat)
if centre:
wfa.trim(trim_upto_time=0)
if save_as_ma is True:
# Save the modes array as waveforms hdf file
wfa.save_modes(
out_file_name=f"{label}_resam.h5",
compression_opts=compression_opts,
)
return wfa
[docs]
def initialize_modes_array(time_axis, modes_array, resam_type='finest', ell_max=8):
""" Initialize a modes array given the time axis """
message("Creating modes data")
dt_auto = stats_mode(np.diff(time_axis))[0]
message(f'Default dt is {dt_auto}', message_verbosity=3)
min_dt = min(np.diff(time_axis))
max_dt = max(np.diff(time_axis))
message(
f"Min dt {min_dt} and Max dt {max_dt}", message_verbosity=2
)
if resam_type == "finest":
# Choose finest available timestep
# for upto 3 decimal digits.
m_dt = min_dt
message(
"Resampling at the finest timestep",
m_dt,
message_verbosity=1,
)
if resam_type == "coarsest":
m_dt = max_dt
message(
"Resampling at the coarsest timestep",
m_dt,
message_verbosity=1,
)
if isinstance(resam_type, float):
m_dt = resam_type
message(
"Resampling at user defined timestep",
m_dt,
message_verbosity=1,
)
if resam_type == "auto":
# Choose finest available timestep
# for upto 3 decimal digits.
m_dt = dt_auto
message(
"Resampling at the default timestep",
m_dt,
message_verbosity=1,
)
# New (resampled) time axis
time_axis = np.arange(time_axis[0], time_axis[-1], m_dt)
# Length of data.
data_len = len(time_axis)
modes_array.create_modes_array(ell_max=ell_max, data_len=data_len)
modes_array.time_axis = time_axis
return modes_array
##################################################################
# Output
##################################################################
[docs]
def save_modes_data_to_gen(
wfa,
ell_max=None,
pre_key=None,
key_format=None,
modes_to_save=None,
out_file_name="mp_new_modes.h5",
r_ext_factor=None,
compression_opts=0,
r_ext=None,
):
"""Save the waveform mode data to an hdf file.
Parameters
----------
pre_key: str, optional
A string containing the key of the group in
the HDF file in which the modes` dataset exists.
It defaults to `None`.
mode_numbers: list
The mode numbers to load from the file.
Each item in the list is a list that
contains two integrer numbers, one for
the mode index :math:`\\ell` and the
other for the mode index :math:`m`.
Returns
-------
waveform_obj: 3d array
Sets the three dimensional array `waveform.modes`
that contains the required :math:`\\ell, m` modes.
Examples
--------
>>> from waveformtools.modes_array import ModesArray
>>> wf = modes_array()
>>> wf.data_dir = './'
>>> wf.filename = 'data_file.h5'
>>> wf.modes_list = [[2, 2], [3, 3]]
>>> wf.load_gen_data()
"""
# from waveformtools.modes_array import ModesArray
#############################
# I/O assignments.
#############################
wfa.out_file_name = wfa.label + "_" + out_file_name
wfa.out_file_name = wfa.out_file_name.replace(" ", "_")
wfa.out_file_name = wfa.out_file_name.replace("->", "_")
# get the full path.
full_path = wfa.data_dir/wfa.out_file_name
if r_ext is None:
if wfa.r_ext is None:
r_ext = 500
else:
r_ext = wfa.r_ext
if r_ext_factor is None:
r_ext_factor = wfa.r_ext
###################################
# Identify the modes to save.
###################################
if not modes_to_save:
if ell_max is not None:
modes_to_save = wfa.modes_list[:ell_max]
else:
modes_to_save = wfa.modes_list
##########################
# Create the modes file.
##########################
message("Saving waveform", wfa.label, message_verbosity=2)
with h5py.File(full_path, "w") as wfile:
# Create the metadata dataset.
metadata = wfa.get_metadata()
message(metadata, message_verbosity=1)
metadata_bytes = json.dumps(metadata).encode()
# dt = h5py.special_dtype(vlen=str)
# metadata=np.asarray([metadata_bytes], dtype=dt)
wfile.create_dataset(
"metadata", data=metadata_bytes, compression_opts=compression_opts
)
# Load the modes listed in mode_numbers list
for item in modes_to_save:
# For every ell mode list in modes_list
ell, emm_list = item
for emm in emm_list:
# For every (ell, emm) mode.
data = wfa.mode(ell, emm)
# set the time and data axis
data_re = data.real
data_im = data.imag
save_data = np.transpose(
np.array([wfa.time_axis, data_re, data_im])
)
# Make the key
key = _key_gen(ell, emm, extras=f"r{r_ext:.2f}")
# message('Processing key', key)
# Create data set
wfile.create_dataset(key, data=save_data)
return 1