""" Methods to integrate functions """
#################################################
# Imports
################################################
import numpy as np
import scipy
from spectral.fourier.transforms import compute_ifft
from waveformtools.waveformtools import message
from waveformtools.waveformtools import (
get_starting_angular_frequency as sang_f,
)
from scipy.interpolate import InterpolatedUnivariateSpline
##################################################
# Fixed frequency integration
##################################################
[docs]
def fixed_frequency_integrator(
udata_time,
delta_t,
utilde_conven=None,
freq_axis=None,
omega0="auto",
order=1,
zero_mode=0,
omega_threshold_factor=10,
):
"""Fixed frequency integrator as presented in Reisswig et. al.
Parameters
----------
udata_time: 1d array
The input data in time.
delta_t: float
The time stepping.
utilde_conven: 1d array, optional
The conventional FFT of the samples udata_time.
freq_axis: 1darray, optional
The frequency axis of the FFT. Must be supplied
along with `utilde_conven`.
omega0: float, optional
The cutoff angular frequency in the integration.
Must be lower than the starting angular frequency
of the input waveform. All frequencies whose absolute
value is below this value will be neglected.
The default cutoff-value is 0.
order: int, optional
The number of times to integrate
the integrand in time. Defaults to 1.
zero_mode: float, optional
The zero mode amplitude of the FFT required.
Defaults to 0 i.e. the zero mode is removed.
Returns
-------
u_integ_n_time: 1d array
The input waveform in time-space,
integrated in frequency space using FFI.
u_integ_integ_n: 1d array
The integrated u samples in Fourier space.
"""
if omega0 == "auto":
omega0 = max(
abs(sang_f(udata_time, delta_t) / omega_threshold_factor), 1e-5
)
message(f"Using cutoff frequency {omega0}", message_verbosity=1)
if not utilde_conven:
# Compute the FFT of data
from numpy.fft import ifft
from spectral.fourier.transforms import compute_fft, unset_fft_conven
# from waveformtools import taper
# udata_x_re = taper(u_time.real, delta_t=delta_t)
# udata_x_im = taper(u_time.imag, delta_t=delta_t)
# udata_x = np.array(udata_x_re) + 1j * np.array(udata_x_im)
# x_axis = udata_x_re.sample_times
# udata_x = np.array(udata_x)
freq_axis, utilde_conven = compute_fft(udata_time, delta_t)
if np.isnan(utilde_conven).any():
message("Nan Found in utilde_conven!")
if np.isnan(freq_axis).any():
message("Nan Found in freq_axis!")
# Find the length of the input data.
Nlen = len(udata_time)
else:
Nlen = len(utilde_conven)
assert (
np.array(freq_axis) != np.array(None)
).all(), "Please supply the frequency axis along with utilde_conven"
# df = np.diff(freq_axis)[0]
df = scipy.stats.mode(np.diff(freq_axis))[0]
message("df ", df, message_verbosity=3)
# Find the location of the zero index.
if (freq_axis == 0).any():
zero_index = np.where(freq_axis == 0)[0][0]
else:
zero_index = None
omega_axis = construct_ffi_omega_axis(freq_axis, omega0, zero_index)
# Set the zero frequency element separately.
utilde_conven[zero_index] = zero_mode
# Integrate in frequency space
utilde_integ_n = np.power((-1j / omega_axis), order) * utilde_conven
if np.isnan(utilde_integ_n).any():
message("Nan Found in utilde_integ_n!", message_verbosity=0)
# Get the inverse fft
# utilde_integ_n_orig = unset_fft_conven(utilde_integ_n)
# u_integ_n_time = ifft(utilde_integ_n_orig)
u_integ_n_time, u_integ = compute_ifft(utilde_integ_n, df)
return u_integ_n_time, u_integ
[docs]
def construct_ffi_omega_axis(freq_axis, omega0, zero_index):
"""Construct an angular frequency axis for
use with FFI"""
assert (
omega0 > 0
), "Please supply a non-zero positive value of cutoff angular frequency"
# Construct the angular frequency axis.
omega_axis = 2 * np.pi * freq_axis
# omega_axis[zero_index] = 1
message(
"The chosen cutoff angular frequency is", omega0, message_verbosity=2
)
message("Omega axis", omega_axis, message_verbosity=4)
# Change the angular frequency if its magnitude is below a given omega0.
for index, element in enumerate(omega_axis):
# Loop over the samples.
# Skip the zero index
if index != zero_index:
# print(freq_integ[index])
# try:
# Get the sign of the angular frequency.
sign = int(element / abs(element))
if abs(element) < omega0:
omega_axis[index] = sign * omega0
else:
sign = 1
assert (
omega_axis[index] == 0
), f"The zero mode element must be zero frequency. Instead it is {omega_axis[index]}"
omega_axis[index] = 1
return omega_axis
#############################################
# 2D integrals
#############################################
[docs]
def TwoDIntegral(func, grid_info, int_method=None):
"""Integrate a function over a sphere.
Parameters
----------
func: function
The function to be integrated
NTheta, NPhi: int
The number of grid points in the theta and phi directions.
Note that NTheta must be even.
ht, hp: float
The grid spacings.
int_method: string
The method to use for the integration.
Options are DH (Driscoll Healy),
SP (Simpson's),
MP (Midpoint).
Returns
-------
integ : float
The function f integrated over the sphere.
"""
if int_method is None:
if grid_info.grid_type == "GL":
int_method = "GL"
elif grid_info.grid_type == "Uniform":
int_method = "MP"
else:
raise KeyError(
"Unable to discern default integration"
"method due to unknown grid type. Please provide"
"method explicitely"
)
if int_method == "DH":
integral = DriscollHealy2DInteg(func, grid_info)
elif int_method == "MP":
integral = MidPoint2DInteg(func, grid_info)
elif int_method == "SP":
integral = Simpson2DInteg(func, grid_info)
elif int_method == "GL":
integral = GaussLegendre2DInteg(func, grid_info)
else:
raise ValueError("Unknown method!")
return integral
[docs]
def MidPoint2DInteg(func, info):
"""Evaulate the 2D surface integral using the midpoint rule.
Parameters
----------
func: ndarray
The data to be integrated
info: surface_grid_info
An instance of the surface grid info class
containing information about the grid.
Returns
-------
integ: float
The function f integrated over the sphere.
"""
ht = info.dtheta
hp = info.dphi
theta_grid, _ = info.meshgrid
integral = (
np.tensordot(func, np.sin(theta_grid), axes=((-2, -1), (0, 1)))
* ht
* hp
)
return integral
[docs]
def DriscollHealy2DInteg(func, info):
"""Implementation of the Driscoll Healy 2D integration that
exhibits near spectral convergence.
Parameters
----------
func : function
The function to be integrated
NTheta, NPhi : int
The number of grid points in the theta and phi directions.
Note that NTheta must be even.
ht, hp : float
The grid spacings.
Returns
-------
integ : float
The function f integrated over the sphere.
"""
if len(func.shape) > 2:
raise NotImplementedError(
"Driscoll-Healy's method cannot currently handle 3d arrays"
)
NTheta = info.ntheta_act
NPhi = info.nphi_act
# NTheta, NPhi = func.shape
theta_grid, _ = info.meshgrid
ht = info.dtheta
hp = info.dphi
if NTheta < 0:
raise ValueError("Ntheta is negative!")
elif NPhi < 0:
raise ValueError("Nphi is negative!")
if (NTheta % 2) != 0:
raise ValueError("NTheta must be even!")
integrand_sum = 0.0
func *= np.sin(theta_grid)
# Skip the poles (ix=0 and ix=NTheta), as the weight there is zero
# theta_1d = np.pi* np.arange(1, NTheta)/NTheta
# ell_weight_axis = np.arange(int(NTheta/2))
# theta_2d, ell_2d = np.meshgrid(theta_1d, ell_weight_axis)
# weights_grid = (4/ np.pi) * np.sin((2 * ell_2d + 1) * theta_2d) /
# (2 * ell_2d + 1)
# weights_axis = np.sum(weights_grid, axis=1)
# latitude_sum_axis = np.sum(func, axis=1)
# integrand_axis = latitude_sum_axis * weights_axis
for theta_index in range(1, NTheta):
# These weights lead to an almost spectral convergence
this_theta = np.pi * theta_index / NTheta
# this_theta = theta_1d[theta_index]
weight = 0
# theta = M_PI * ix / NTheta;
# weight = 0.0;
for ell in range(int(NTheta / 2)):
# for (int l = 0; l < NTheta/2; ++ l)
weight += np.sin((2 * ell + 1) * this_theta) / (2 * ell + 1)
# weight_axis = np.sin((2 * ell_weight_axis + 1)
# * this_theta) / (2 * ell_weight_axis + 1)
weight *= 4.0 / np.pi
latitude_sum = 0
# local_sum = 0.0;
# Skip the last point (iy=NPhi), since we assume periodicity and
# therefore it has the same value as the first point. We don't use
# weights in this direction, which leads to spectral convergence.
# (Yay periodicity!)
for index_phi in range(NPhi):
# for (int iy = 0; iy < NPhi; ++ iy)
latitude_sum += func[theta_index, index_phi]
# latitude_sum = np.sum(func[index_theta, :])
integrand_sum += weight * latitude_sum
return ht * hp * integrand_sum
[docs]
def DriscollHealy2DInteg_v2(func, info):
"""Implementation of the Driscoll Healy 2D integration that
exhibits near spectral convergence.
Parameters
----------
func : function
The function to be integrated
NTheta, NPhi : int
The number of grid points in the theta and phi directions.
Note that NTheta must be even.
ht, hp : float
The grid spacings.
Returns
-------
integ : float
The function f integrated over the sphere.
"""
if len(func.shape) > 2:
raise NotImplementedError(
"Driscoll-Healy's method cannot currently handle 3d arrays"
)
NTheta = info.ntheta_act
NPhi = info.nphi_act
# NTheta, NPhi = func.shape
theta_grid, _ = info.meshgrid
ht = info.dtheta
hp = info.dphi
if NTheta < 0:
raise ValueError("Ntheta is negative!")
elif NPhi < 0:
raise ValueError("Nphi is negative!")
if (NTheta % 2) != 0:
raise ValueError("NTheta must be even!")
integrand_sum = 0.0
# func*=np.sin(theta_grid)
# Skip the poles (ix=0 and ix=NTheta), as the weight there is zero
theta_1d = np.pi * np.arange(1, NTheta) / NTheta
ell_weight_axis = np.arange(int(NTheta / 2))
print("theta 1d", len(theta_1d))
print("ell axis", len(ell_weight_axis))
theta_2d, ell_2d = np.meshgrid(theta_1d, ell_weight_axis)
print("T2d", theta_2d.shape)
weights_grid = (
(4 / np.pi) * np.sin((2 * ell_2d + 1) * theta_2d) / (2 * ell_2d + 1)
)
print("WG", weights_grid.shape)
weights_axis = np.sum(weights_grid, axis=1)
print("WG", weights_axis.shape)
latitude_sum_axis = np.sum(func, axis=1)
print("LSA", latitude_sum_axis.shape)
integrand_axis = latitude_sum_axis * weights_axis
integrand_sum = np.sum(integrand_axis)
return ht * hp * integrand_sum
[docs]
def Simpson2DInteg(func, info):
"""Implementation of Simpson's 2D integration
scheme.
Parameters
----------
func : function
The function to be integrated
NTheta, NPhi : int
The number of grid points in the theta and phi directions.
Note that NTheta must be even.
ht, hp : float
The grid spacings.
Returns
-------
integ : float
The function f integrated over the sphere.
"""
if len(func.shape) > 2:
raise NotImplementedError(
"Simpson's method cannot currently handle 3d arrays"
)
NTheta = info.ntheta_act
NPhi = info.nphi_act
theta_grid, _ = info.meshgrid
# NTheta, NPhi = func.shape
ht = info.dtheta
hp = info.dphi
integrand_sum = 0
index_theta = 0
index_phi = 0
assert NTheta > 0
assert NPhi > 0
# assert(func.all())
assert NTheta % 2 == 0
assert NPhi % 2 == 0
Px = int(NTheta / 2)
Py = int(NPhi / 2)
func *= np.sin(theta_grid)
# Around corners
integrand_sum += (
func[0, 0]
+ func[NTheta - 1, 0]
+ func[0, NPhi - 1]
+ func[NTheta - 1, NPhi - 1]
)
# Arount edges
for index_phi in range(1, Py):
integrand_sum += (
4 * func[0, 2 * index_phi - 1]
+ 4 * func[NTheta - 1, 2 * index_phi - 1]
)
# for (iy = 1; iy <= py-1; iy++)
for index_phi in range(1, Py - 1):
integrand_sum += (
2 * func[0, 2 * index_phi] + 2 * func[NTheta - 1, 2 * index_phi]
)
# for (ix = 1; ix <= px; ix++)
for index_theta in range(1, Px):
integrand_sum += (
4 * func[2 * index_theta - 1, 0]
+ 4 * func[2 * index_theta - 1, NPhi - 1]
)
# for (ix = 1; ix <= px-1; ix++)
for index_theta in range(1, Px - 1):
integrand_sum += (
2 * func[2 * index_theta, 0] + 2 * func[2 * index_theta, NPhi - 1]
)
# In the Interiors
# for (iy = 1; iy <= py; iy++)
for index_phi in range(1, Py):
# for (ix = 1; ix <= px; ix++)
for index_theta in range(1, Px):
integrand_sum += 16 * func[2 * index_theta - 1, 2 * index_phi - 1]
# for (iy = 1; iy <= py-1; iy++)
for index_phi in range(1, Py - 1):
# for (ix = 1; ix <= px; ix++)
for index_theta in range(Px):
integrand_sum += 8 * func[2 * index_theta - 1, 2 * index_phi]
# for (iy = 1; iy <= py; iy++)
for index_phi in range(1, Py):
# for (ix = 1; ix <= px-1; ix++)
for index_theta in range(1, Px - 1):
integrand_sum += 8 * func[2 * index_theta, 2 * index_phi - 1]
# for (iy = 1; iy <= py-1; iy++)
for index_phi in range(1, Py - 1):
# for (ix = 1; ix <= px-1; ix++)
for index_theta in range(1, Px - 1):
integrand_sum += 4 * func[2 * index_theta, 2 * index_phi]
return (1 / 9) * ht * hp * integrand_sum
[docs]
def GaussLegendre2DInteg(func, info):
"""Evaulate the 2D surface integral using the Gauss-Legendre rule.
Parameters
----------
func: ndarray
The data to be integrated
info: surface_grid_info
An instance of the surface grid info class
containing information about the grid.
Returns
-------
integ: float
The function f integrated over the sphere.
"""
integral = (
np.tensordot(func, info.weights_grid, axes=((-2, -1), (0, 1)))
* info.dphi
)
return integral
[docs]
def twod_time_integral(times, twod_func_ts, a=None, b=None):
''' Integrate a twoD array in time '''
xdata = times
if a is None:
a = xdata[0]
if b is None:
b = xdata[-1]
ntime, ntheta, nphi = twod_func_ts.shape
ans = np.zeros((ntheta ,nphi), dtype=np.complex128)
for th_idx in range(ntheta):
for ph_idx in range(nphi):
ydata = twod_func_ts[:, th_idx, ph_idx]
yinterp_re = InterpolatedUnivariateSpline(xdata, ydata.real, k=5)
yinterp_im = InterpolatedUnivariateSpline(xdata, ydata.imag, k=5)
integral_re = yinterp_re.integral(a=a, b=b)
integral_im = yinterp_im.integral(a=a, b=b)
ans[th_idx, ph_idx] = integral_re + 1j*integral_im
return ans