#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This file contains a class definition for a wrapper for
the suite of scalar deconvolution methods supported by mspass.
It demonstrates the concept of a processing object created
by wrapping C code. It also contains a top-level function
that is a pythonic interface that meshes with MsPASS schedulers
for parallel processing called `RFdecon`. `RFdecon` is a
wrapper for all single-station methods. It cannot be used for
array methods.
Created on Fri Jul 31 06:24:10 2020
@author: Gary Pavlis
"""
import numpy as np
from mspasspy.ccore.seismic import DoubleVector, Seismogram
from mspasspy.ccore.utility import AntelopePf, Metadata, MsPASSError, ErrorSeverity
from mspasspy.util.converter import Metadata2dict
from mspasspy.algorithms.window import WindowData
from mspasspy.ccore.algorithms.basic import TimeWindow, _ExtractComponent
from mspasspy.ccore.algorithms.deconvolution import (
LeastSquareDecon,
WaterLevelDecon,
MultiTaperXcorDecon,
MultiTaperSpecDivDecon,
)
from mspasspy.util.decorators import mspass_func_wrapper
[docs]class RFdeconProcessor:
"""
This class is a wrapper for the suite of receiver function deconvolution
methods we call scalar methods. That is, the operation is reducable to
two time series: wavelet signal and the data (TimeSeries) signal.
That is in contrast to three component methods that always treat the
data as vector samples. The class should be created as a global
processor object to be used in a spark job. The design assumes the
processor object will be passed as an argument to the RFdecon
function that should appear as a function in a spark map call.
"""
def __repr__(self) -> str:
repr_str = "{type}(alg='{alg}', md='{md}')".format(
type=str(self.__class__), alg=self.algorithm, md=self.md
)
return repr_str
def __str__(self) -> str:
md_str = str(Metadata2dict(self.md))
processor_str = "{type}(alg='{alg}', md='{md}')".format(
type=str(self.__class__), alg=self.algorithm, md=self.md
)
return processor_str
def __init__(self, alg="LeastSquares", pf="RFdeconProcessor.pf"):
self.algorithm = alg
# use a copy in what is more or less a switch-case block
# to be robust - I don't think any of the constructors below
# alter pfhandle but the cost is tiny for this stability
pfhandle = AntelopePf(pf)
if self.algorithm == "LeastSquares":
# In this and elif blocks below we convert
# return of get_branch to a Metadata container
# that is necessary because get_branch retuns the
# AntelopePf subclass and we want this to be a clean
# Metdata object. Further, at present a pf will not
# serialize
self.md = Metadata(pfhandle.get_branch("LeastSquare"))
self.processor = LeastSquareDecon(self.md)
self.__uses_noise = False
elif alg == "WaterLevel":
self.md = Metadata(pfhandle.get_branch("WaterLevel"))
self.processor = WaterLevelDecon(self.md)
self.__uses_noise = False
elif alg == "MultiTaperXcor":
self.md = Metadata(pfhandle.get_branch("MultiTaperXcor"))
self.processor = MultiTaperXcorDecon(self.md)
self.__uses_noise = True
elif alg == "MultiTaperSpecDiv":
self.md = Metadata(pfhandle.get_branch("MultiTaperSpecDiv"))
self.processor = MultiTaperSpecDivDecon(self.md)
self.__uses_noise = True
elif alg == "GeneralizedIterative":
raise RuntimeError("Generalized Iterative method not yet supported")
else:
raise RuntimeError("Illegal value for alg=" + alg)
[docs] def loaddata(self, d, dtype="Seismogram", component=0, window=False):
"""
Loads data for processing. When window is set true
use the internal pf definition of data time window
and window the data. The dtype parameter changes the
behavior of this algorithm significantly depending on
the setting. It can be one of the following:
Seismogram, TimeSeries, or raw_vector. For the first
two the data to process will be extracted in a
pf specfied window if window is True. If window is
False TimeSeries data will be passed directly and
Seismogram data will have the data defined by the
component parameter copied to the internal data
vector workspace. If dtype is set to raw_vector
d is assumed to be a raw numpy vector of doubles or
an the aliased std::vector used in ccore, for example,
in the TimeSeries object s vector. Setting dtype
to raw_vector and window True will result in this
method throwing a RuntimeError exception as the
combination is not possible since raw_vector data
have no time base.
:param d: input data (contents expected depend upon
value of dtype parameter).
:param dtype: string defining the form d is expected
to be (see details above)
:param component: component of Seismogram data to
load as data vector. Ignored if dtype is raw_vector
or TimeSeries.
:param window: boolean controlling internally
defined windowing. (see details above)
:return: Nothing (not None nothing) is returned
"""
# First basic sanity checks
if dtype == "raw_vector" and window:
raise RuntimeError(
"RFdeconProcessor.loaddata: "
+ "Illegal argument combination\nwindow cannot be true with raw_vector input"
)
if not (
dtype == "Seismogram" or dtype == "TimeSeries" or dtype == "raw_vector"
):
raise RuntimeError(
"RFdeconProcessor.loaddata: " + " Illegal dtype parameter=" + dtype
)
dvector = []
if window:
if dtype == "Seismogram":
ts = _ExtractComponent(d, component)
ts = WindowData(ts, self.dwin.start, self.dwin.end)
dvector = ts.data
elif dtype == "TimeSeries":
ts = WindowData(d, self.dwin.start, self.dwin.end)
dvector = ts.data
else:
dvector = d
else:
if dtype == "Seismogram":
ts = _ExtractComponent(d, component)
dvector = ts.data
elif dtype == "TimeSeries":
dvector = ts.data
else:
dvector = d
# Have to explicitly convert to ndarray because DoubleVector cannot be serialized.
self.dvector = np.array(dvector)
[docs] def loadwavelet(self, w, dtype="Seismogram", component=2, window=False):
# This code is painfully similar to loaddata. To reduce errors
# only the names have been changed to protect the innocent
if dtype == "raw_vector" and window:
raise RuntimeError(
"RFdeconProcessor.loadwavelet: "
+ "Illegal argument combination\nwindow cannot be true with raw_vector input"
)
if not (
dtype == "Seismogram" or dtype == "TimeSeries" or dtype == "raw_vector"
):
raise RuntimeError(
"RFdeconProcessor.loadwavelet: " + " Illegal dtype parameter=" + dtype
)
wvector = []
if window:
if dtype == "Seismogram":
ts = _ExtractComponent(w, component)
ts = WindowData(ts, self.dwin.start, self.dwin.end)
wvector = ts.data
elif dtype == "TimeSeries":
ts = WindowData(w, self.dwin.start, self.dwin.end)
wvector = ts.data
else:
wvector = w
else:
if dtype == "Seismogram":
ts = _ExtractComponent(w, component)
wvector = ts.data
elif dtype == "TimeSeries":
wvector = ts.data
else:
wvector = w
# Have to explicitly convert to ndarray because DoubleVector cannot be serialized.
self.wvector = np.array(wvector)
[docs] def loadnoise(self, n, dtype="Seismogram", component=2, window=False):
# First basic sanity checks
# Return immediately for methods that ignore noise.
# Note we do this silently assuming the function wrapper below
if not self.__uses_noise:
return
if dtype == "raw_vector" and window:
raise RuntimeError(
"RFdeconProcessor.loadnoise: "
+ "Illegal argument combination\nwindow cannot be true with raw_vector input"
)
if not (
dtype == "Seismogram" or dtype == "TimeSeries" or dtype == "raw_vector"
):
raise RuntimeError(
"RFdeconProcessor.loadnoise: " + " Illegal dtype parameter=" + dtype
)
nvector = []
# IMPORTANT these two parameters are not required by the
# ScalarDecon C code but need to be inserted in pf for any algorithm
# that requires noise data (i.e. multitaper) and the window
# options is desired
if window:
tws = self.md.get_double("noise_window_start")
twe = self.md.get_double("noise_window_end")
if dtype == "Seismogram":
ts = _ExtractComponent(n, component)
ts = WindowData(ts, tws, twe)
nvector = ts.data
elif dtype == "TimeSeries":
ts = WindowData(n, tws, twe)
nvector = ts.data
else:
nvector = n
else:
if dtype == "Seismogram":
ts = _ExtractComponent(n, component)
nvector = ts.data
elif dtype == "TimeSeries":
nvector = ts.data
else:
nvector = n
# Have to explicitly convert to ndarray because DoubleVector cannot be serialized.
self.nvector = np.array(nvector)
[docs] def apply(self):
"""
Compute the RF estimate using the algorithm defined internally.
:return: vector of data that are the RF estimate computed from previously loaded data.
"""
if hasattr(self, "dvector"):
self.processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
self.processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
self.processor.loadnoise(DoubleVector(self.nvector))
self.processor.process()
return self.processor.getresult()
[docs] def actual_output(self):
"""
The actual output of a decon operator is the inverse filter applied to
the wavelet. By design it is an approximation of the shaping wavelet
defined for this operator.
:return: Actual output of the operator as a ccore.TimeSeries object.
The Metadata of the return is bare bones. The most important factor
about this result is that because actual output waveforms are normally
a zero phase wavelet of some kind the result is time shifted to be
centered (i.e. t0 is rounded n/2 where n is the length of the vector
returned).
"""
if hasattr(self, "dvector"):
self.processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
self.processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
self.processor.loadnoise(DoubleVector(self.nvector))
self.processor.process()
return self.processor.actual_output()
[docs] def ideal_output(self):
"""
The ideal output of a decon operator is the same thing we call a
shaping wavelet. This method returns the ideal output=shaping wavelet
as a TimeSeries object. Like the actual output method the return
function is circular shifted so the function peaks at 0 time located
at n/2 samples from the start sample. Graphic displays will then show
the wavelet centered and peaked at time 0. The prediction error
can be computed as the difference between the actual_output and
ideal_output TimeSeries objects. The norm of the prediction error
is a helpful metric to display the stability and accuracy of the
inverse.
"""
if hasattr(self, "dvector"):
self.processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
self.processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
self.processor.loadnoise(DoubleVector(self.nvector))
self.processor.process()
return self.processor.ideal_output()
[docs] def inverse_filter(self):
"""
This method returns the actual inverse filter that if convolved with
he original data will produce the RF estimate. Note the filter is
meaningful only if the source wavelet is minimum phase. A standard
theorem from time series analysis shows that the inverse of mixed
phase wavelet is usually unstable and a maximum phase wavelet is always
unstable. Fourier-based methods can still compute a stable solution
even with a mixed phase wavelet because of the implied circular
convolution.
The result is returned as TimeSeries object.
"""
if hasattr(self, "dvector"):
self.processor.loaddata(DoubleVector(self.dvector))
if hasattr(self, "wvector"):
self.processor.loadwavelet(DoubleVector(self.wvector))
if self.__uses_noise and hasattr(self, "nvector"):
self.processor.loadnoise(DoubleVector(self.nvector))
self.processor.process()
return self.processor.inverse_filter()
[docs] def QCMetrics(self, prediction_error_key="prediction_error") -> dict:
"""
All decon algorithms compute a set of algorithm dependent quality
control metrics. The return is a Metadata container.
All this wrapper really does is translate that return into
a python dictionary that can be used as the base of a subdocument
posting to outputs. This method MUST ONLY BE CALLED after
calling the process method of the C++ engine.
"""
# the base of what is returned is an echo of the input parameter set
qcmd = dict(self.md)
# merge in an output of the implementations QCMetrics method
qcmeth_output = dict(self.processor.QCMetrics())
qcmd.update(qcmeth_output)
# always compute the prediction error
perr = self._prediction_error()
qcmd[prediction_error_key] = perr
return dict(qcmd)
[docs] def change_parameters(self, md):
"""
Use this method to change the internal parameter setting of the
processor. It can only change the parameters for a particular
algorithm. A new instance of this class needs to be created if
you need to switch to a different algorithm. It does little
more than call the read_metadata of the already loaded processor.
All the scalar decon methods implement that method.
:param md: is a mspass.Metadata object containing required parameters
for the alternative algorithm.
"""
self.md = Metadata(md)
self.processor.read_metadata(self.md)
@property
def uses_noise(self):
return self.__uses_noise
@property
def dwin(self):
tws = self.md.get_double("deconvolution_data_window_start")
twe = self.md.get_double("deconvolution_data_window_end")
return TimeWindow(tws, twe)
@property
def nwin(self):
if self.__uses_noise:
tws = self.md.get_double("noise_window_start")
twe = self.md.get_double("noise_window_end")
return TimeWindow(tws, twe)
else:
return TimeWindow # always initialize even if not used
def _prediction_error(self) -> float:
"""
Small internal function used to compute prediction error of
deconvolution operator defined as norm(ao-io)/norm(io) where
norm is L2.
"""
ao = self.actual_output()
io = self.ideal_output()
# with internal use can assume ao and io are the same length
err = ao - io
return np.linalg.norm(err.data) / np.linalg.norm(io.data)
[docs]@mspass_func_wrapper
def RFdecon(
d,
engine=None,
alg="LeastSquares",
pf="RFdeconProcessor.pf",
wavelet=None,
noisedata=None,
wcomp=2,
ncomp=2,
QCdocument_key="RFdecon_properties",
object_history=False,
alg_name="RFdecon",
alg_id=None,
dryrun=False,
):
"""
Use this function to compute conventional receiver functions
from a single three component seismogram. In this function,
an instance of wrapper class RFdeconProcessor will be built and
initialized with alg and pf.
Default assumes d contains all data sections required to do
the deconvolution with the wavelet in component 2 (3 for matlab
and FORTRAN people). By default the data and noise
(if required by the algorithm) sections will be extracted from
the (assumed larger) data section using time windows defined
internally in the processor pf definition. For variations (e.g.
adding tapering to one or more of the time series inputs)
use the d, wavelet, and (if required) noise arguments to load
each component separately. Note d is dogmatically required
to be three component data while optional wavelet and noisedata
series are passed as plain numpy vectors (i.e. without the
decoration of a TimeSeries).
To make use of the extended outputs from RFdeconProcessor
algorithms (e.g. actual output of the computed operator)
call those methods after this function returns successfully
with a three-component seismogram output. That is possible
because the processor object caches the most recent wavelet
and inverse used for the deconvolution. An exception is
that all algorithms call their QCmetrics method of processor
and push them to the headers of the deconvolved output.
QCmetric attributes are algorithm dependent.
The ProcessingHistory feature can optionally be enabled by
setting the save_history argument to True. When enable one should
normally set a unique id for the algid argument.
:param d: Seismogram input data. See notes above about
time span of these data.
:type d: Must be a `Seismogram` object or the function will throw
a TypeError exceptionl
:param engine: optional instance of a RFdeconProcessor
object. By default the function instantiates an instance of
a processor for each call to the function. For algorithms
like the multitaper based algorithms with a high initialization
cost performance will improve by sending an instance to the
function via this argument.
:type engine: None or an instance of `RFdeconProcessor`.
When None (default) an instance of an `RFdeconProcessor` is
created on entry based on the keyword defined by the `alg`
argument. The algorithm built into the instance of
`RFdeconProcessor` is used if engine is not null.
:param alg: The algorithm to be applied, used for initializing
a RFdeconProcessor object. Ignored if `engine` is used.
:param pf: The pf file to be parsed, used for inititalizing a
RFdeconProcessor. Ignored if `engine` is used.
:type pf: string defining an absolute path for the file name
or a path relative to a directory defined by PFPATH.
:param wavelet: vector of doubles (numpy array or the
std::vector container internal to TimeSeries object) defining
the wavelet to use to compute deconvolution operator.
Default is None which assumes processor was set up to use
a component of d as the wavelet estimate.
:type wavelet: None or an iterable vector container
(in MsPASS that means a python array, a numpy array, or a DoubleVector)
:param noisedata: vector of doubles (numpy array or the
std::vector container internal to TimeSeries object) defining
noise data to use for computing regularization. Not all RF
estimation algorithms use noise estimators so this parameter
is optional. It can also be extracted from d depending on
parameter file options.
:type noisedata: None or an iterable vector container
(in MsPASS that means a python array, a numpy array, or a DoubleVector)
:param wcomp: When defined from Seismogram d the wavelet
estimate in conventional RFs is one of the components that
are most P wave dominated. That is always one of three
things: Z, L of LQT, or the L component from the output of
Kennett's free surface transformation operator. The
default is 2, which for ccore.Seismogram is always one of
the above. This parameter would be changed only if the
data has undergone some novel transformation not yet invented
and the best wavelet estimate was on in 2 (3 with FORTRAN
and matlab numbering).
:type wcomp: int (must 0, 1, or 2)
:param ncomp: component number to use to compute noise. This is used
only if the algorithm in processor requires a noise estimate.
Normally it should be the same as wcomp and is by default (2).
:type ncomp: int (must be 0, 1, or 2)
:param QCdocument_key: A summary of the parameters defining the
deconvolution operator (really a dump of the pf content used for
creating the engine) and computed QC attributes are posted to a
python dictionary. That content is posted to the outputs
Metadata container with the key defined by this argument.
In MongoDB lingo that means when saved to the database the
dictionary content associated with this key becomes a "subdocument".
:type QCdocument_key: string (default is "RFdecon_properties")
:param object_history: boolean to enable or disable saving object
level history. Default is False. Note this functionality is
implemented via the mspass_func_wrapper decorator.
:param alg_name: When history is enabled this is the algorithm name
assigned to the stamp for applying this algorithm.
Default ("WindowData") should normally be just used.
Note this functionality is implemented via the mspass_func_wrapper decorator.
:param ald_id: algorithm id to assign to history record (used only if
object_history is set True.)
Note this functionality is implemented via the mspass_func_wrapper decorator.
:param dryrun: When true only the arguments are checked for validity.
When true nothing is calculated and the original data are returned.
Note this functionality is implemented via the mspass_func_wrapper decorator.
:return: Normally returns Seismogram object containing the RF estimates.
The orientations are always the same as the input. If `return-wavelets` is set
True returns a tuple with three components: 0 - `Seismogram` returned as with
default, 1 - ideal output wavelet `TimeSeries`, 2 - actual output wavelet
stored as a `TimeSeries` object.
"""
if not isinstance(d, Seismogram):
message = "RFdecon: arg0 is of type={}. Must be a Seismogram object".format(
str(type(d))
)
raise TypeError(message)
if d.dead():
return d
if engine:
if isinstance(engine, RFdeconProcessor):
processor = engine
else:
message = (
"RFdecon: illegal type for define by engine argment = {}\n".format(
type(engine)
)
)
message += "If defined must be an instance of RFdeconProcessor"
raise TypeError(message)
else:
processor = RFdeconProcessor(alg, pf)
try:
if wavelet is not None:
processor.loadwavelet(wavelet, dtype="raw_vector")
else:
# processor.loadwavelet(d,dtype='Seismogram',window=True,component=wcomp)
processor.loadwavelet(d, window=True, component=wcomp)
if processor.uses_noise:
if noisedata != None:
processor.loadnoise(noisedata, dtype="raw_vector")
else:
processor.loadnoise(d, window=True, component=ncomp)
except MsPASSError as err:
d.kill()
d.elog.log_error(err)
return d
# We window data before computing RF estimates for efficiency
# Otherwise we would call the window operator 3 times below
# WindowData does will kill the output if the window doesn't match
# which is reason for the test immediately after this call
result = WindowData(d, processor.dwin.start, processor.dwin.end)
if result.dead():
return result
npts = result.npts
try:
for k in range(3):
processor.loaddata(result, component=k)
x = processor.apply()
# overwrite this component's data in the result Seismogram
# Use some caution handling any size mismatch
nx = len(x)
if nx >= npts:
for i in range(npts):
result.data[k, i] = x[i]
else:
# this may not be the fastest way to do this but it is simple and clean
# matters little since this is an error condition and should be rare
for i in range(npts):
if i < nx:
result.data[k, i] = x[i]
else:
result.data[k, i] = 0.0
# This is actually an error condition so we log it
message = (
"Windowing size mismatch.\nData window length = %d which is less than operator length= %d"
% (nx, npts)
)
result.elog.log_error("RFdecon", message, ErrorSeverity.Complaint)
except MsPASSError as err:
result.kill()
result.elog.log_error(err)
except:
print(
"RFDecon: something threw an unexpected exception - this is a bug and needs to be fixed.\nKilling result from RFdecon."
)
result.kill()
result.elog.log_error(
"RFdecon", "Unexpected exception caught", ErrorSeverity.Invalid
)
finally:
# assume this method creates the dictionary we use as base for the
# QC subdocument. Note that always includes the prediction error
subdoc = processor.QCMetrics()
result[QCdocument_key] = subdoc
return result