#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
from multitaper.mtspec import MTSpec
from mspasspy.ccore.utility import MsPASSError, ErrorSeverity, Metadata
from mspasspy.ccore.seismic import TimeSeries, DoubleVector, PowerSpectrum
[docs]class MTPowerSpectrumEngine:
"""
Wrapper class to use German Prieto's multitaper package as a plug
in replacement for the MsPASS class of the same name. The MsPASS
version is in C++ and there are some collisions in concept. It should
be used only in a python script where it might be useful to use
features of Prieto's library not available in the cruder implementation
in MsPASS. (e.g. adaptive weights in multitaper power spectrum estimation).
The biggest collision in concept is that the MsPASS version with this class
name was designed to be created and moved around to avoid the overhead of
recreating the Slepian tapers on each use. Prieto's class creates these
in the constructor but caches a number of things when first created that
allows secondary calls to run faster. The implementation attempts
to overcome this collision by caching an instance of Prieto's mstspec class
on the first call to the apply method. Secondary calls to apply test
for consistency with the previous call and only recreate the mtspec
instance if something that invalidates the previous call has changed.
:param winsize: Number of samples expected for data window. Note this
argument defines the size of the eigentapers so it defines the size of
input time series the object expect to process.
:param tbp: multitaper time-bandwidth product
:param number_tapers: number of tapers to use for estimators (See Prieto's)
documentation for details)
:param nfft: optional size of the fft work arrays to use. nfft should be
greater than or equal winsize. When greater zero padding is used in the
usual way. Default is 0 which sets nfft to 2*winsize. If nfft<winsize
a diagnostic message will be printed and nfft will be set equal to winsize.
:param iadapt: integer argument passed to Prieto's MTspec that sets the
eigentaper weighting scheme. See Prieto's MTspect class documentation
for options. Default is 0 which enables the adaptive weighting scheme.
"""
def __init__(
self,
winsize,
tbp,
number_tapers,
nfft=0,
iadapt=0,
):
self.winsize = winsize
self.tbp = tbp # nw in multitaper
self.number_tapers = number_tapers # kspec in multitaper
self.vn = None # constructor for MTSpec will set this when passed None
self.nfft = nfft
self.idapt = iadapt
self.lamb = None # constructor for MTSpec set this
self.MTSpec_instance = None
[docs] def apply(self, d, dt=1.0):
"""
need to support vector input or a TimeSeries - returns a PowerSpectrum
"""
# All inputs end up filling a numpy array passed to MTSpec below
if isinstance(d, TimeSeries):
y = np.array(d.data)
dt = d.dt
md = Metadata(d)
elif isinstance(d, DoubleVector):
# pybind11 bindings defined std::vector as a DoubleVector
# This is exactly like the use for TimeSeries where data
# is a DoubleVector
y = np.array(d)
md = Metadata()
# These are mspass schema standard names - could be
# a future maintenance issue
md["npts"] = len(d)
md["delta"] = dt
elif isinstance(d, np.ndarray):
y = d
md = Metadata()
# These are mspass schema standard names - could be
# a future maintenance issue
md["npts"] = len(d)
md["delta"] = dt
else:
raise TypeError(
"MTPowerSpectrumEngine.apply: arg0 has invalid type - must be TimeSeries, DoubleVector, or numpy array"
)
self.MTSpec_instance = MTSpec(
y,
nw=self.tbp,
kspec=self.number_tapers,
dt=dt,
nfft=self.nfft,
iadapt=self.idapt,
vn=self.vn,
lamb=self.lamb,
)
# Set these so they will be automatically reused on a
# successive call. May need to trap size mismatch above
# if these were previously set - this needs some testing
# REMOVE ME WHEN VERIFIED
self.vn = self.MTSpec_instance.vn
self.lamb = self.MTSpec_instance.lamb
self.nfft = self.MTSpec_instance.nfft
# We copy these attributes to metadata for optional downstream use
md["MTSpec_dt"] = self.MTSpec_instance.dt
md["MTSpec_df"] = self.MTSpec_instance.df
md["MTSpec_nw"] = self.MTSpec_instance.nw
md["MTSpec_kspec"] = self.MTSpec_instance.kspec
md["MTSpec_nfft"] = self.MTSpec_instance.nfft
md["MTSpec_npts"] = self.MTSpec_instance.npts
# these could be set as aliases but we set the explicitly for now
# even if it is redundamt
md["time_bandwith_product"] = self.MTSpec_instance.nw
md["number_tapers"] = self.MTSpec_instance.kspec
# This method returns only the positive frequencies and spectra values
f, spec = self.MTSpec_instance.rspec()
# this is an obnoxious collision with the C++ api DoubleVector
npts = self.MTSpec_instance.npts
work = DoubleVector()
for i in range(len(spec)):
work.append(spec[i])
result = PowerSpectrum(
md,
work,
self.MTSpec_instance.df,
"multitaper.MTSpec",
0.0,
dt,
md["npts"],
)
return result
[docs] def frequencies(self):
"""
Return a numpy array of the frequencies
"""
if self.MTSpec_instance:
return self.MTSpec_instance.freq
else:
raise MsPASSError(
"MTPowerSpectrumEngine.frequencies: engine has not been initialized. Method undefined until apply method called",
ErrorSeverity.Fatal,
)
[docs] def time_bandwidth_product(self) -> float:
"""
Return time bandwidth product for this operator.
"""
if self.MTSpec_instance:
return self.MTSpec_instance.nw
else:
raise MsPASSError(
"MTPowerSpectrumEngine.time_bandwidth_product: engine has not been initialized. Method undefined until apply method called",
ErrorSeverity.Fatal,
)
[docs] def number_tapers(self) -> int:
"""
Return the number of tapers defined for this operator.
"""
if self.MTSpec_instance:
return self.MTSpec_instance.kspec
else:
raise MsPASSError(
"MTPowerSpectrumEngine.number_tapers: engine has not been initialized. Method undefined until apply method called",
ErrorSeverity.Fatal,
)
[docs] def set_df(self, df):
"""
Explicit setter for frequency bin interval. Needed when
changing sample interval for input data. Rarely of use but
included for compatibility with the C++ class with the
same name.
This maybe should just be pass or throw an error.
"""
self.MTSpec_instance.df = df