#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This file contains a class definition for a wrapper for
the suite of scalar deconvolution methods supported by mspass.
It demonstrates the concept of a processing object created
by wrapping C code.
Created on Fri Jul 31 06:24:10 2020
@author: Gary Pavlis
"""
import numpy as np
from mspasspy.ccore.seismic import DoubleVector
from mspasspy.ccore.utility import AntelopePf, Metadata, MsPASSError, ErrorSeverity
from mspasspy.util.converter import Metadata2dict
from mspasspy.algorithms.window import WindowData
from mspasspy.ccore.algorithms.basic import TimeWindow, _ExtractComponent
from mspasspy.ccore.algorithms.deconvolution import (
LeastSquareDecon,
WaterLevelDecon,
MultiTaperXcorDecon,
MultiTaperSpecDivDecon,
)
from mspasspy.util.decorators import mspass_func_wrapper
[docs]class RFdeconProcessor:
"""
This class is a wrapper for the suite of receiver function deconvolution
methods we call scalar methods. That is, the operation is reducable to
two time series: wavelet signal and the data (TimeSeries) signal.
That is in contrast to three component methods that always treat the
data as vector samples. The class should be created as a global
processor object to be used in a spark job. The design assumes the
processor object will be passed as an argument to the RFdecon
function that should appear as a function in a spark map call.
"""
def __repr__(self) -> str:
repr_str = "{type}(alg='{alg}', pf='{pf}')".format(
type=str(self.__class__), alg=self.algorithm, pf=self.pf
)
return repr_str
def __str__(self) -> str:
md_str = str(Metadata2dict(self.md))
processor_str = "{type}(alg='{alg}', pf='{pf}', md={md})".format(
type=str(self.__class__), alg=self.algorithm, pf=self.pf, md=md_str
)
return processor_str
def __init__(self, alg="LeastSquares", pf="RFdeconProcessor.pf"):
self.algorithm = alg
self.pf = pf
pfhandle = AntelopePf(pf)
if self.algorithm == "LeastSquares":
self.md = pfhandle.get_branch("LeastSquare")
self.__uses_noise = False
elif alg == "WaterLevel":
self.md = pfhandle.get_branch("WaterLevel")
self.__uses_noise = False
elif alg == "MultiTaperXcor":
self.md = pfhandle.get_branch("MultiTaperXcor")
self.__uses_noise = True
elif alg == "MultiTaperSpecDiv":
self.md = pfhandle.get_branch("MultiTaperSpecDiv")
self.__uses_noise = True
elif alg == "GeneralizedIterative":
raise RuntimeError("Generalized Iterative method not yet supported")
else:
raise RuntimeError("Illegal value for alg=" + alg)
# below is needed because AntelopePf cannot be serialized.
self.md = Metadata(self.md)
[docs] def loaddata(self, d, dtype="Seismogram", component=0, window=False):
"""
Loads data for processing. When window is set true
use the internal pf definition of data time window
and window the data. The dtype parameter changes the
behavior of this algorithm significantly depending on
the setting. It can be one of the following:
Seismogram, TimeSeries, or raw_vector. For the first
two the data to process will be extracted in a
pf specfied window if window is True. If window is
False TimeSeries data will be passed directly and
Seismogram data will have the data defined by the
component parameter copied to the internal data
vector workspace. If dtype is set to raw_vector
d is assumed to be a raw numpy vector of doubles or
an the aliased std::vector used in ccore, for example,
in the TimeSeries object s vector. Setting dtype
to raw_vector and window True will result in this
method throwing a RuntimeError exception as the
combination is not possible since raw_vector data
have no time base.
:param d: input data (contents expected depend upon
value of dtype parameter).
:param dtype: string defining the form d is expected
to be (see details above)
:param component: component of Seismogram data to
load as data vector. Ignored if dtype is raw_vector
or TimeSeries.
:param window: boolean controlling internally
defined windowing. (see details above)
:return: Nothing (not None nothing) is returned
"""
# First basic sanity checks
if dtype == "raw_vector" and window:
raise RuntimeError(
"RFdeconProcessor.loaddata: "
+ "Illegal argument combination\nwindow cannot be true with raw_vector input"
)
if not (
dtype == "Seismogram" or dtype == "TimeSeries" or dtype == "raw_vector"
):
raise RuntimeError(
"RFdeconProcessor.loaddata: " + " Illegal dtype parameter=" + dtype
)
dvector = []
if window:
if dtype == "Seismogram":
ts = _ExtractComponent(d, component)
ts = WindowData(ts, self.dwin.start, self.dwin.end)
dvector = ts.data
elif dtype == "TimeSeries":
ts = WindowData(d, self.dwin.start, self.dwin.end)
dvector = ts.data
else:
dvector = d
else:
if dtype == "Seismogram":
ts = _ExtractComponent(d, component)
dvector = ts.data
elif dtype == "TimeSeries":
dvector = ts.data
else:
dvector = d
# Have to explicitly convert to ndarray because DoubleVector cannot be serialized.
self.dvector = np.array(dvector)
[docs] def loadwavelet(self, w, dtype="Seismogram", component=2, window=False):
# This code is painfully similar to loaddata. To reduce errors
# only the names have been changed to protect the innocent
if dtype == "raw_vector" and window:
raise RuntimeError(
"RFdeconProcessor.loadwavelet: "
+ "Illegal argument combination\nwindow cannot be true with raw_vector input"
)
if not (
dtype == "Seismogram" or dtype == "TimeSeries" or dtype == "raw_vector"
):
raise RuntimeError(
"RFdeconProcessor.loadwavelet: " + " Illegal dtype parameter=" + dtype
)
wvector = []
if window:
if dtype == "Seismogram":
ts = _ExtractComponent(w, component)
ts = WindowData(ts, self.dwin.start, self.dwin.end)
wvector = ts.data
elif dtype == "TimeSeries":
ts = WindowData(w, self.dwin.start, self.dwin.end)
wvector = ts.data
else:
wvector = w
else:
if dtype == "Seismogram":
ts = _ExtractComponent(w, component)
wvector = ts.data
elif dtype == "TimeSeries":
wvector = ts.data
else:
wvector = w
# Have to explicitly convert to ndarray because DoubleVector cannot be serialized.
self.wvector = np.array(wvector)
[docs] def loadnoise(self, n, dtype="Seismogram", component=2, window=False):
# First basic sanity checks
# Return immediately for methods that ignore noise.
# Note we do this silenetly assuming the function wrapper below
# will post an error to elog for the output to handle this nonfatal error
if self.algorithm == "LeastSquares" or self.algorithm == "WaterLevel":
return
if dtype == "raw_vector" and window:
raise RuntimeError(
"RFdeconProcessor.loadnoise: "
+ "Illegal argument combination\nwindow cannot be true with raw_vector input"
)
if not (
dtype == "Seismogram" or dtype == "TimeSeries" or dtype == "raw_vector"
):
raise RuntimeError(
"RFdeconProcessor.loadnoise: " + " Illegal dtype parameter=" + dtype
)
nvector = []
# IMPORTANT these two parameters are not required by the
# ScalarDecon C code but need to be inserted in pf for any algorithm
# that requires noise data (i.e. multitaper) and the window
# options is desired
if window:
tws = self.md.get_double("noise_window_start")
twe = self.md.get_double("noise_window_end")
if dtype == "Seismogram":
ts = _ExtractComponent(n, component)
ts = WindowData(ts, tws, twe)
nvector = ts.data
elif dtype == "TimeSeries":
ts = WindowData(n, tws, twe)
nvector = ts.data
else:
nvector = n
else:
if dtype == "Seismogram":
ts = _ExtractComponent(n, component)
nvector = ts.data
elif dtype == "TimeSeries":
nvector = ts.data
else:
nvector = n
# Have to explicitly convert to ndarray because DoubleVector cannot be serialized.
self.nvector = np.array(nvector)
[docs] def apply(self):
"""
Compute the RF estimate using the algorithm defined internally.
:return: vector of data that are the RF estimate computed from previously loaded data.
"""
if self.algorithm == "LeastSquares":
processor = LeastSquareDecon(self.md)
elif self.algorithm == "WaterLevel":
processor = WaterLevelDecon(self.md)
elif self.algorithm == "MultiTaperXcor":
processor = MultiTaperXcorDecon(self.md)
elif self.algorithm == "MultiTaperSpecDiv":
processor = MultiTaperSpecDivDecon(self.md)
if hasattr(self, "dvector"):
processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
processor.loadnoise(DoubleVector(self.nvector))
processor.process()
return processor.getresult()
[docs] def actual_output(self):
"""
The actual output of a decon operator is the inverse filter applied to
the wavelet. By design it is an approximation of the shaping wavelet
defined for this operator.
:return: Actual output of the operator as a ccore.TimeSeries object.
The Metadata of the return is bare bones. The most important factor
about this result is that because actual output waveforms are normally
a zero phase wavelet of some kind the result is time shifted to be
centered (i.e. t0 is rounded n/2 where n is the length of the vector
returned).
"""
if self.algorithm == "LeastSquares":
processor = LeastSquareDecon(self.md)
elif self.algorithm == "WaterLevel":
processor = WaterLevelDecon(self.md)
elif self.algorithm == "MultiTaperXcor":
processor = MultiTaperXcorDecon(self.md)
elif self.algorithm == "MultiTaperSpecDiv":
processor = MultiTaperSpecDivDecon(self.md)
if hasattr(self, "dvector"):
processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
processor.loadnoise(DoubleVector(self.nvector))
return processor.actual_output()
[docs] def ideal_output(self):
"""
The ideal output of a decon operator is the same thing we call a
shaping wavelet. This method returns the ideal output=shaping wavelet
as a TimeSeries object. Like the actual output method the return
function is circular shifted so the function peaks at 0 time located
at n/2 samples from the start sample. Graphic displays will then show
the wavelet centered and peaked at time 0. The prediction error
can be computed as the difference between the actual_output and
ideal_output TimeSeries objects. The norm of the prediction error
is a helpful metric to display the stability and accuracy of the
inverse.
"""
if self.algorithm == "LeastSquares":
processor = LeastSquareDecon(self.md)
elif self.algorithm == "WaterLevel":
processor = WaterLevelDecon(self.md)
elif self.algorithm == "MultiTaperXcor":
processor = MultiTaperXcorDecon(self.md)
elif self.algorithm == "MultiTaperSpecDiv":
processor = MultiTaperSpecDivDecon(self.md)
if hasattr(self, "dvector"):
processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
processor.loadnoise(DoubleVector(self.nvector))
return processor.ideal_output()
[docs] def inverse_filter(self):
"""
This method returns the actual inverse filter that if convolved with
he original data will produce the RF estimate. Note the filter is
meaningful only if the source wavelet is minimum phase. A standard
theorem from time series analysis shows that the inverse of mixed
phase wavelet is usually unstable and a maximum phase wavelet is always
unstable. Fourier-based methods can still compute a stable solution
even with a mixed phase wavelet because of the implied circular
convolution.
The result is returned as TimeSeries object.
"""
if self.algorithm == "LeastSquares":
processor = LeastSquareDecon(self.md)
elif self.algorithm == "WaterLevel":
processor = WaterLevelDecon(self.md)
elif self.algorithm == "MultiTaperXcor":
processor = MultiTaperXcorDecon(self.md)
elif self.algorithm == "MultiTaperSpecDiv":
processor = MultiTaperSpecDivDecon(self.md)
if hasattr(self, "dvector"):
processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
processor.loadnoise(DoubleVector(self.nvector))
return processor.inverse_filter()
[docs] def QCMetrics(self):
"""
All decon algorithms compute a set of algorithm dependent quality
control metrics. This method returns the metrics as a set of fixed
name:value pairs in a mspass.Metadata object. The details are
algorithm dependent. See related documentation for metrics computed
by different algorithms.
"""
if self.algorithm == "LeastSquares":
processor = LeastSquareDecon(self.md)
elif self.algorithm == "WaterLevel":
processor = WaterLevelDecon(self.md)
elif self.algorithm == "MultiTaperXcor":
processor = MultiTaperXcorDecon(self.md)
elif self.algorithm == "MultiTaperSpecDiv":
processor = MultiTaperSpecDivDecon(self.md)
if hasattr(self, "dvector"):
processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
processor.loadnoise(DoubleVector(self.nvector))
return processor.QCMetrics()
[docs] def change_parameters(self, md):
"""
Use this method to change the internal parameter setting of the
processor. It can be used, for example, to switch from the damped
least square method to the water level method. Note the input
must be a complete definition for a parameter set defining a
particular algorithm. i.e. this is not an update method but
t reinitializes the processor.
:param md: is a mspass.Metadata object containing required parameters
for the alternative algorithm.
"""
self.md = Metadata(md)
@property
def uses_noise(self):
return self.__uses_noise
@property
def dwin(self):
tws = self.md.get_double("deconvolution_data_window_start")
twe = self.md.get_double("deconvolution_data_window_end")
return TimeWindow(tws, twe)
@property
def nwin(self):
if self.__uses_noise:
tws = self.md.get_double("noise_window_start")
twe = self.md.get_double("noise_window_end")
return TimeWindow(tws, twe)
else:
return TimeWindow # always initialize even if not used
[docs]@mspass_func_wrapper
def RFdecon(
d,
alg="LeastSquares",
pf="RFdeconProcessor.pf",
wavelet=None,
noisedata=None,
wcomp=2,
ncomp=2,
object_history=False,
alg_name="RFdecon",
alg_id=None,
dryrun=False,
):
"""
Use this function to compute conventional receiver functions
from a single three component seismogram. In this function,
an instance of wrapper class RFdeconProcessor will be built and
initialized with alg and pf.
Default assumes d contains all data sections required to do
the deconvolution with the wavelet in component 2 (3 for matlab
and FORTRAN people). By default the data and noise
(if required by the algorithm) sections will be extracted from
the (assumed larger) data section using time windows defined
internally in the processor pf definition. For variations (e.g.
adding tapering to one or more of the time series inputs)
use the d, wavelet, and (if required) noise arguments to load
each component separately. Note d is dogmatically required
to be three component data while optional wavelet and noisedata
series are passed as plain numpy vectors (i.e. without the
decoration of a TimeSeries).
To make use of the extended outputs from RFdeconProcessor
algorithms (e.g. actual output of the computed operator)
call those methods after this function returns successfully
with a three-component seismogram output. That is possible
because the processor object caches the most recent wavelet
and inverse used for the deconvolution. An exception is
that all algorithms call their QCmetrics method of processor
and push them to the headers of the deconvolved output.
QCmetric attributes are algorithm dependent.
The ProcessingHistory feature can optionally be enabled by
setting the save_history argument to True. When enable one should
normally set a unique id for the algid argument.
:param d: Seismogram input data. See notes above about
time span of these data.
:param alg: The algorithm to be applied, used for initializing
a RFdeconProcessor object
:param pf: The pf file to be parsed, used for inititalizing a
RFdeconProcessor
:param wavelet: vector of doubles (numpy array or the
std::vector container internal to TimeSeries object) defining
the wavelet to use to compute deconvolution operator.
Default is None which assumes processor was set up to use
a component of d as the wavelet estimate.
:param noisedata: vector of doubles (numpy array or the
std::vector container internal to TimeSeries object) defining
noise data to use for computing regularization. Not all RF
estimation algorithms use noise estimators so this parameter
is optional. It can also be extracted from d depending on
parameter file options.
:param wcomp: When defined from Seismogram d the wavelet
estimate in conventional RFs is one of the components that
are most P wave dominated. That is always one of three
things: Z, L of LQT, or the L component from the output of
Kennett's free surface transformation operator. The
default is 2, which for ccore.Seismogram is always one of
the above. This parameter would be changed only if the
data has undergone some novel transformation not yet invented
and the best wavelet estimate was on in 2 (3 with FORTRAN
and matlab numbering).
:param ncomp: component number to use to compute noise. This is used
only if the algorithm in processor requires a noise estimate.
Normally it should be the same as wcomp and is by default (2).
:param object_history: boolean to enable or disable saving object
level history. Default is False. Note this functionality is
implemented via the mspass_func_wrapper decorator.
:param alg_name: When history is enabled this is the algorithm name
assigned to the stamp for applying this algorithm.
Default ("WindowData") should normally be just used.
Note this functionality is implemented via the mspass_func_wrapper decorator.
:param ald_id: algorithm id to assign to history record (used only if
object_history is set True.)
Note this functionality is implemented via the mspass_func_wrapper decorator.
:param dryrun: When true only the arguments are checked for validity.
When true nothing is calculated and the original data are returned.
Note this functionality is implemented via the mspass_func_wrapper decorator.
:return: Seismogram object containing the RF estimates.
The orientations are always the same as the input.
"""
processor = RFdeconProcessor(alg, pf)
try:
if wavelet is not None:
processor.loadwavelet(wavelet, dtype="raw_vector")
else:
# processor.loadwavelet(d,dtype='Seismogram',window=True,component=wcomp)
processor.loadwavelet(d, window=True)
if processor.uses_noise:
if noisedata != None:
processor.loadnoise(noisedata, dtype="raw_vector")
else:
processor.loadnoise(d, window=True, component=ncomp)
except MsPASSError as err:
d.kill()
d.elog.log_error(err)
return d
# We window data before computing RF estimates for efficiency
# Otherwise we would call the window operator 3 times below
# WindowData does will kill the output if the window doesn't match
# which is reason for the test immediately after this call
result = WindowData(d, processor.dwin.start, processor.dwin.end)
if result.dead():
return result
npts = result.npts
try:
for k in range(3):
processor.loaddata(result, component=k)
x = processor.apply()
# overwrite this component's data in the result Seismogram
# Use some caution handling any size mismatch
nx = len(x)
if nx >= npts:
for i in range(npts):
result.data[k, i] = x[i]
else:
# this may not be the fastest way to do this but it is simple and clean
# matters little since this is an error condition and should be rare
for i in range(npts):
if i < nx:
result.data[k, i] = x[i]
else:
result.data[k, i] = 0.0
# This is actually an error condition so we log it
message = (
"Windowing size mismatch.\nData window length = %d which is less than operator length= %d"
% (nx, npts)
)
result.elog.log_error("RFdecon", message, ErrorSeverity.Complaint)
except MsPASSError as err:
result.kill()
result.elog.log_error(err)
except:
print(
"RFDecon: something threw an unexpected exception - this is a bug and needs to be fixed.\nKilling result from RFdecon."
)
result.kill()
result.elog.log_error(
"RFdecon", "Unexpected exception caught", ErrorSeverity.Invalid
)
finally:
return result