#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Module implementing a similar algorithm to dbxcor in python.
Created on Tue Jul 9 05:37:40 2024
@author: pavlis
"""
import numpy as np
from scipy import signal
from mspasspy.ccore.algorithms.amplitudes import (
MADAmplitude,
RMSAmplitude,
PeakAmplitude,
)
from mspasspy.algorithms.window import WindowData
from obspy.geodetics.base import gps2dist_azimuth, kilometers2degrees
from obspy.taup import TauPyModel
from mspasspy.ccore.utility import ErrorLogger, ErrorSeverity, Metadata
from mspasspy.ccore.seismic import (
TimeSeries,
TimeSeriesEnsemble,
SeismogramEnsemble,
TimeReferenceType,
DoubleVector,
)
from mspasspy.util.seismic import number_live, regularize_sampling, ensemble_time_range
from mspasspy.ccore.algorithms.basic import TimeWindow
from mspasspy.algorithms.signals import filter
from mspasspy.algorithms.window import WindowData, WindowData_autopad
[docs]def estimate_ensemble_bandwidth(
ensemble,
snr_doc_key="Parrival",
):
"""
Estimate average bandwidth of an ensemble of data using the output of
broadband_snr_QC.
The original dbxcor program used a GUI that allowed the user to select one of
a set of predefined filters to be used to set the frequency band of the signal
to be processed by the equivalent of the `align_and_stack` function of this module.
The purpose of this function is to automate that process to defined an
optimal bandwidth for processing of the input ensemble.
This function only works on ensembles processed previously with the
mspass function `broadband_snr_QC`. That function computes a series of
metrics for signal-to-noise ratio. The only one this one uses is
the two values defined by "low_f_band_edge" and "high_f_band_edge".
The verbose names should make their definition obvious. This
function returns the median of the values of all values of those
two attributes extracted from all live members of the input ensemble.
The result is returned as a tuple with the low frequency edge as 0
and the high frequency edge as 1. The expectation is that the
data will be bandpass filtered between the low and high edges before
running the `align_and_stack` function.
:param ensemble: ensemble of data to be scanned
:type ensemble: `TimeSeriesEnsemble`
:param srn_doc_key: subdocument key of attributes computed by
`broadband_snr_QC` to fetch as estimates of low and high frequency
edges.
:type snr_doc_key: string (default "Parrival" = default of
`broadband_snr_QC`)
"""
f_l = []
f_h = []
for d in ensemble.member:
if d.live:
if d.is_defined(snr_doc_key):
doc = d[snr_doc_key]
f = doc["low_f_band_edge"]
f_l.append(f)
f = doc["high_f_band_edge"]
f_h.append(f)
if len(f_h) > 0:
f_low = np.median(f_l)
f_high = np.median(f_h)
return [f_low, f_high, len(f_h)]
else:
return [0, 0, 0]
[docs]def MCXcorPrepP(
ensemble,
noise_window,
noise_metric="mad",
initial_beam_metric="bandwidth",
snr_doc_key="Parrival",
low_f_corner=None,
high_f_corner=None,
npoles=4,
filter_parameter_keys=["MCXcor_f_low", "MCXcor_f_high", "MCXcor_npoles"],
coda_level_factor=4.0,
set_phases=True,
model=None,
Ptime_key="Ptime",
pPtime_key="pPtime",
PPtime_key="PPtime",
station_collection="channel",
search_window_fraction=0.9,
minimum_coda_duration=5.0,
correlation_window_start=-3.0,
) -> TimeSeriesEnsemble:
"""
Function used to preprocess an ensemble to prepare input for
running multichannel cross-correlation and stacking method
(automated dbxcor algorithm) of P phase data. This function should
not be used for anything but teleseismic P data.
The multichannel correlation and stacking function in this module called
`align_and_stack` requires one to define a number of parameters that
in the original dbxcor implementation were input through a graphical
user interface. This function can be thought of a robot that will
attempt to set all the required parameters automatically using signal processing.
It will attempt to produce two inputs required by the `align_and_stack`:
1. What I call a "correlation window".
2. The ensemble member that is to be used as the initial estimate of the
beam (stack of the data aligned by cross correlation).
A CRITICAL assumption of this function is that the input ensemble's data
have been processed with the snr module function `broadband_snr_QC`.
That function computes and posts multiple snr metrics to a subdocument
(dictionary) accessed with a single key. This function requires that
data exist and be accessible with the key defined by the argument
"snr_doc_key". The data in that dictionary are used in the following
algorithms run within this function:
1. The working bandwidth of the data is established by computing the median
of the attributes "low_f_band_edge" and "high_f_band_edge" extracted from
each live member. Those frequencies are computed using the function in this module
called `estimate_ensemble_bandwidth`. The working ensemble
(which is what is returned on success) will be filtered in the band defined
by the output of `estimate_ensemble_bandwidth` using a Butterworth,
bandpass filter with the number of poles defined by the "npoles" argument.
2. What I call the correlation window is estimated by a complex recipe
best understood from the user manual page and example jupyter notebooks
related to this module. Briefly, the correlation window is defined by
applying an envelope function to each signal and defining the coda
end using a common recipe for coda magnitudes where the end of the coda is
defined by where the coda level (envelope) first falls below a
level specified as a multiple of measured background noise.
This function REQUIRES the input for each member to have a section
that it can treat as background noise. For most uses that means some
section of data immediately before the P wave arrival time. The
noise level is estimated by the metric defined by the "noise_metric"
argument and the coda level cutoff is computed as
`noise_level*coda_level_factor` where `coda_level_factor` is the
valued passed via the function argument with that key.
Because this function is designed strictly for P phases it has to
handle the complexity of interference by secondary P phases. For
that reason it compute arrival times for pP and PP and will always
start the coda search before time of the smaller of pP or PP.
Note, however, that pP is not used a constraint if the source depth is
less than 100 km. The justification for that number can be found in
jupyter notebooks that should be part of the MsPASS documentation.
3. It then extracts one member from the ensemble the algorithm judges to
be the best choice for an initial beam estimate. That is, in
align_and_stack the first step is to align the data with a reference
signal using cross-correlation of the "beam" with each member. The
aligned data are then summed with a robust stack to refine the beam.
That is repeated until the stack does not change significantly.
That algorithm requires a seed for the initial beam estimate. That
initial signal is extracted as the ensemble member with the largest
value of the snr metric defined with the `initial_beam_metric` argument.
The correlation window parameters computed earlier are then posted
to the `Metadata` of that `TimeSeries` where `align_and_stack`
can parse them to set the correlation time window.
The function returns a copy of the input ensemble filtered to the
average bandwidth and the initial beam estimate. They are return
as a tuple with 0 the ensemble and 1 the initial beam estimate.
Callers should verify the beam signal is marked live. A dead
beam signal indicates the algorithm failed in one way or another.
Errors at the level will result in errors being posted to the
`ErrorLogger` container of the beam output. Note also this
algorithm can kill some ensemble members in the output that
were marked live in the input.
:param ensemble: input ensemble of data to use. As noted above it
must have been processed with `broadband_snr_QC` or this function will
return a null result. The function makes a tacit assumption that all
the members of this ensemble are in relative time with 0 of each member
being an estimate of the P wave arrival time. It does no tests to
validate this assumption so the assumption is wrong you will, at best,
get junk as output.
:type ensemble: `TimeSeriesEnsemble` Note for most uses this is the
output of ExtractComponent of a `SeismogramEnsemble` processed with
`broadband_snr_QC` that is the longitudinal component for the P phase.
:param noise_window: time window defining the section of each
ensemble member that is to be treated as "noise" for estimating
a relative noise level.
:type noise_window: `TimeWindow` tacitly expected to be time relative
to a measure of the P wave arrival time.
:param noise_metric: vector norm metric to use for measuring the
amplitude of the noise extracted from the data in the range defined by
noise_window.
:type noise_metric: string (Must be one of: "mad" (default), "rms",
or "peak"). Any other string will cause the function to throw a ValueError
exception.
:param initial_beam_metric: key of attribute to extract from the
output of `broadband_snr_QC` used to select initial beam signal.
This argument is passed to the `extract_initial_beam_estimate` function for
as the "metric" argument of that function.
:param initial_beam_metric: string (default "bandwidth"). For a list of
allowed values see the docstring of `extract_initial_beam_estimate`.
:param snr_doc_key: key to use to fetch the subdocument containing the
output of `broadband_snr_QC`.
:type snr_doc_key: string (default "Parrival" which is the default of
`broadband_snr_QC`).
:param low_f_corner: force the low frequency corner for the output data
bandwidth to this value. When defined, the internal call to
`estimate_ensemble_bandwidth` is bypassed and the output data are filtered
between the value defined by low_f_corner and high_f_corner.
:type low_f_corner: float (default None which is interpreted to mean
scan ensemble to estimate the frequency range)
:param high_f_corner: force the high frequency corner for the output data
bandwidth to this value. When defined, the internal call to
`estimate_ensemble_bandwidth` is bypassed and the output data are filtered
between the value defined by low_f_corner and high_f_corner.
:type high_f_corner: float (default None which is interpreted to mean
scan ensemble to estimate the frequency range)
:param npoles: number of poles to use for the Butterworth bandpass
filter.
:type npoles: integer (default 4)
:param set_phases: boolean that if set True (default) the P phase time s
P, pP (if defined), and PP (if defined) are computed and posted to the
output of each ensemble member. If False the algorith assumes the
same quantities were previously calculated and can be fetched from the
member TimeSeries Metadata container using keys defined by Ptime_key,
pPtimme_key, and PPtime_key.
:param Ptime_key:
:param pPtime_key:
:param PPtime_key: These three arguments define alternative keys that
will be used to fetch (if set_phases is false) or post (if set_phases is True)
computed P, pP, and PP times to each member's Metadata container.
Changing any of these values not really advised when set_phases is True.
These are most useful if the phase arrival times were previously
computed or measured and posted with different keys.
:param station_collection: MonogDB collection name used to fetch
receiver coordinate data. Normal practice in MsPASS is to save
receiver Metadata in two channels called "channel" and "site" and
to load the coordinate data through normalization when the data are
loaded. The MsPASS convention defines data loaded by normalization
with a leading collection name. e.g. the "lat" value extracted from a
"channel" document would be posted to the data as "channel_lat".
The same value, however, loaded from "site" would be tagged "site_lat".
The default for this argument is "channel" which means the algorithm
will require the attributes "channel_lat" and "channel_lon" as the
receiver coordinates (in degrees). The standard alternative is to
define `station_collection="site"`, in which case the function will
use "site_lat" and "site_lon".
:type station_collection: string (default "channel")
:param search_window_fraction: The window for defining the
correlation window is determined by running the internal
`_coda_duration` function on each ensemble member and computing the
range from the median of the ranges computed from all the ensemble
members. Each coda search, however, is constrained by the time so f
pP and/or PP. As noted above the function uses the pP time for
events with depths greater than 100 km but PP for shallow sources.
To allow for hypocenter errors the duration defined by
P to pP or P to PP is multiplied by this factor to define the
search start for the coda estimation.
:type search_window_fraction: float (default 0.9)
:param minimum_coda_duration: if the estimate of coda duration
computed internally is less than this value the correlation window
is set to this value - relative time from P.
:type minimum_coda_duration: float (5.0 seconds)
:param correlation_window_start: the time of the correlation window
set in the output "beam" is fixed as this value. It is normally
a negative number defining a time before P that no signal is likely to
have an arrival before this relative time.
:type correlation_window_start: float (default -3.0)
"""
alg = "MCXcorPrepP"
if not isinstance(ensemble, TimeSeriesEnsemble):
message = alg + ": Illegal type={} for arg0\n".format(type(ensemble))
message += "Must be a TimeSeriesEnsemble object"
raise TypeError(message)
if ensemble.dead():
return [ensemble, TimeSeries()]
if model is None:
model = TauPyModel(model="iasp91")
if not isinstance(noise_window, TimeWindow):
message = alg + ": Illegal type={} for arg1\n".format(type(noise_window))
message += "Must be a TimeWindow object"
raise TypeError(message)
# first sort out the issue of the passband to use for this
# analysis. Use kwargs if they are defined but otherwise assume
# we extract what we need from the output of `broadband_snr_QC`.
# effectively a declaration to keep these from going away when they
# go out of scope
f_low = 0.0
f_high = 0.0
if low_f_corner or high_f_corner:
if low_f_corner and high_f_corner:
f_low = low_f_corner
f_high = high_f_corner
if f_high <= f_low:
message = (
alg
+ ": Inconsistent input. low_f_corner={} and high_f_corner={}\n".format(
low_f_corner, high_f_corner
)
)
message += "low_f_corner value is greater than high_f_corner value"
raise ValueError(message)
else:
message = (
alg
+ ": Inconsistent input. low_f_corner={} and high_f_corner={}\n".format(
low_f_corner, high_f_corner
)
)
message += "If you specify one you must specify the other"
raise ValueError(message)
else:
[f_low, f_high, nused] = estimate_ensemble_bandwidth(
ensemble,
snr_doc_key=snr_doc_key,
)
if (f_low == 0) or (f_high == 0):
message = "estimate_ensemble_bandwidth failed\n"
message += "Either all members are dead or data were not previously processed with broadband_snr_QC"
ensemble.elog.log_error(alg, message, ErrorSeverity.Invalid)
ensemble.kill()
return [ensemble, TimeSeries()]
enswork = filter(
ensemble,
type="bandpass",
freqmin=f_low,
freqmax=f_high,
corners=npoles,
)
# using a list like this creates this odd construct but docstring (should at least) advise againtst
# changing these
enswork[filter_parameter_keys[0]] = f_low
enswork[filter_parameter_keys[1]] = f_high
enswork[filter_parameter_keys[2]] = npoles
if set_phases:
for i in range(len(enswork.member)):
# this handles dead data so don't test for life
enswork.member[i] = _set_phases(
enswork.member[i],
model,
Ptime_key=Ptime_key,
pPtime_key=pPtime_key,
PPtime_key=PPtime_key,
station_collection=station_collection,
)
if number_live(enswork) == 0:
message = "_set_phases killed all members of this ensemble\n"
message += "station_collection is probably incorrect or the data have not been normalized for source and receiver coordinates"
enswork.elog.log_error(alg, message, ErrorSeverity.Invalid)
enswork.kill()
return [enswork, TimeSeries()]
# if set_phases is False we assume P, pP, and/or PP times were previously
# set. First make sure all data are in relative time with 0 as P time.
# kill any datum for which P is not defined
for i in range(len(enswork.member)):
d = enswork.member[i]
if d.live:
if d.is_defined(Ptime_key):
Ptime = d[Ptime_key]
if d.time_is_relative():
d.rtoa()
d.ator(Ptime)
else:
message = (
"Required key={} defining P arrival time is not defined\n".format(
Ptime_key
)
)
message += "Cannot process this datum without a P wave time value"
d.elog.log_error(alg, message, ErrorSeverity.Invalid)
d.kill()
enswork.member[i] = d
# now get coda durations and set the correlation window as median of
# the coda durations
coda_duration = [] # coda estimates are placed here
search_range = [] # use this to set ranges - duration limited to min of these
for d in enswork.member:
if d.live:
sr = _get_search_range(d)
sr *= search_window_fraction
search_range.append(sr)
# compute a noise estimate without being too dogmatic about
# window range
if d.t0 < noise_window.start:
nw = TimeWindow(d.t0, noise_window.end)
else:
nw = TimeWindow(noise_window)
nd = WindowData(d, nw.start, nw.end, short_segment_handling="truncate")
# silently skip any datum for which the WindowData algorithm fails
# can happen if the noise window does not overlap with data
if nd.dead():
continue
if noise_metric == "rms":
namp = RMSAmplitude(nd)
elif noise_metric == "peak":
namp = PeakAmplitude(nd)
else:
# silently default to MAD - maybe should log an error to ensemble or throw an exception
namp = MADAmplitude(nd)
coda_window = _coda_duration(d, coda_level_factor, search_start=sr)
# silently drop any retun value less than the floor defined
# by minimum_coda_duration
duration = coda_window.end - coda_window.start
if duration > minimum_coda_duration:
coda_duration.append(duration)
if len(coda_duration) == 0:
message = "Calculation of correlation window from the envelop of filtered ensemble members failed\n"
message += "No data detected with signal level in the passband above the floor value={}\n".format(
coda_level_factor
)
message += "noise_window range or value of floor value are likely inconsistent with the data\n"
message += "Killing this ensemble"
enswork.elog.log_error(alg, message, ErrorSeverity.Invalid)
enswork.kill()
return [enswork, TimeSeries()]
correlation_window_endtime = np.median(coda_duration) # relative time
min_range = np.min(search_range)
min_range *= search_window_fraction
if correlation_window_endtime > min_range:
correlation_window_endtime = min_range
beam0 = extract_initial_beam_estimate(
enswork,
metric=initial_beam_metric,
subdoc_key=snr_doc_key,
)
beam0["correlation_window_start"] = correlation_window_start
beam0["correlation_window_end"] = correlation_window_endtime
return [enswork, beam0]
[docs]def dbxcor_weights(ensemble, stack, residual_norm_floor=0.01):
"""
Computes the robust weights used originally in dbxcor for each
member of ensemble. Result is returned in a parallel numpy
array (i.e. return[i] is computed weight for ensemble.member[i])
This function is made for speed and has no safeties. It assumes
all the members of ensemble are the same and it is the same length
as stack. It will throw an exception if that is violated so callers
should guarantee that happens.
This function adds a feature not found in the original Pavlis and Vernon(2011)
paper via the `residual_norm_floor` argument. Experience with this algorithm
showed it tends to converge to a state with very high weights on one or two
signals and low weights on the rest. The high weights are given to the
one or two signals most closely matching the stack at convergence. This
has the undesirable effect of causing a strong initial value dependence
and suboptimal noise reduction. This argument handles that by not allowing
the L2 norm of the residual to fall below a floor computed from the
ratio ||r||/||d||. i.e. if ||r|| < residual_norm_floor*||d|| it is
set to the value passed as `residual_norm_floor`.
Returns a numpy vector of weights. Any dead data will have a weight of
-1.0 (test for negative is sufficient).
:param ensemble: `TimeSeriesEnsemble` of data from which weights are to
be computed.
:type ensemble: Assumed to be a `TimeSeriesEnsemble`. No type checking
is done so if input is wrong an exception will occur but what is thown
will depend on what ensemble actually is.
:param stack: TimeSeries containing stack to be used to compute weights.
The method returns robust weights relative to the vector of data in
this object.
:type stack: `TimeSeries`.
:param residual_norm_floor: floor in the ratio norm2(r)/norm2(d)
used as described above.
:type residual_norm_floor: float
:return: numpy vector of weights parallel with ensemble.member. Dead
members will have a negative weight in this vector.
"""
norm_floor = np.finfo(np.float32).eps * stack.npts
N = len(ensemble.member)
wts = np.zeros(N)
r = np.zeros(stack.npts)
# Scale the scack vector to be a unit vector
s_unit = np.array(stack.data)
nrm_s = np.linalg.norm(stack.data)
s_unit /= nrm_s
for i in range(N):
if ensemble.member[i].dead():
wts[i] = -1.0
else:
d_dot_stack = np.dot(ensemble.member[i].data, s_unit)
r = ensemble.member[i].data - d_dot_stack * s_unit
nrm_r = np.linalg.norm(r)
nrm_d = np.linalg.norm(ensemble.member[i].data)
if nrm_d < norm_floor:
# this is a test for all zeros
# Give zero weight in this situation
wts[i] = 0.0
elif nrm_r / nrm_d < residual_norm_floor:
denom = residual_norm_floor * nrm_d
wts[i] = abs(d_dot_stack) / denom
else:
denom = nrm_r * nrm_d
# dbxcor has logic to avoid an nan from a machine 0
# denom. Not needed her because of conditional chain here
wts[i] = abs(d_dot_stack) / denom
# rescale weights so largest is 1 - easier to understand
maxwt = np.max(wts)
wts /= maxwt
return wts
[docs]def regularize_ensemble(
ensemble, starttime, endtime, pad_fraction_cutoff
) -> TimeSeriesEnsemble:
"""
Secondary function to regularize an ensemble for input to robust
stacking algorithm. ASsumes all data have the same sample rate.
Uses WindowData to assumre all data are inside the common
range startime:endtime. Silently drops dead data.
"""
ensout = TimeSeriesEnsemble(Metadata(ensemble), len(ensemble.member))
if ensemble.elog.size() > 0:
ensout.elog = ensemble.elog
for i in range(len(ensemble.member)):
d = ensemble.member[i]
if d.live:
if (
np.fabs((d.t0 - starttime)) > d.dt
or np.fabs(d.endtime() - endtime) > d.dt
):
d = WindowData_autopad(
d,
starttime,
endtime,
pad_fraction_cutoff=pad_fraction_cutoff,
)
if d.live:
message = "Dropped member number {} because undefined data range exceeded limit of {}\n".format(
i, pad_fraction_cutoff
)
ensout.member.append(d)
else:
ensout.member.append(d)
else:
message = "Dropped member number {} that was marked dead on input".format(i)
ensout.elog.log_error(
"regularize_ensemble", message, ErrorSeverity.Complaint
)
if len(ensout.member) > 0:
ensout.set_live()
return ensout
[docs]def robust_stack(
ensemble,
method="dbxcor",
stack0=None,
stack_md=None,
timespan_method="ensemble_inner",
pad_fraction_cutoff=0.05,
residual_norm_floor=0.01,
) -> TimeSeries:
"""
Generic function for robust stacking live members of a `TimeSeriesEnsemble`.
An optional initial stack estimate can be used via tha stack0 argument.
The function currently supports two methods: "median" for a median
stack and "dbxcor" to implement the robust loss function
used in the dbxcor program defined in Pavlis and Vernon (2010).
Other algorithms could easily be implemented via this same api
by adding an option for the "method" argument.
All robust estimators I am aware of that use some form of penalty
function (e.g. m-estimators or the dbxcor penalty function) require
an initial estimator for the stack. They do that because the
penalty function is defined from a metric of residuals relative to
the current estimate of center. The median, however, does not
require an initial estimator which complicates the API for this function.
For the current options understand that stack0 is required for
the dbxcor algorithm but will be ignored if median is requested.
The other complication of this function is handling of potential
irregular time ranges of the ensemble input and how to set the
time range for the output. The problem is further complicated by
use in an algorithm like `align_and_stack` in this module where
the data can get shifted to have undefined data within the
time range the data aims to utilize. The behavior of the
algorithm for this issue is controlled by the kwarg values
with the keys "timespan_method" and "pad_fraction_cutoff".
As the name imply "timespan_method" defines how the time span
for the stack should be defined. The following options
are supported:
"stack0" - sets the time span to that of the input
`TimeSeries` passed as stack0. i.e. the range is set to
stack0.dt to stack0.endtime().
"ensemble_inner" - (default) use the range defined by the "inner" method
for computing the range with the function `ensemble_time_range`.
(see `ensemble_time_range` docstring for the definition).
"ensemble_outer" - use the range defined by the "outer" method
for computing the range with the function `ensemble_time_range`.
(see `ensemble_time_range` docstring for the definition).
"ensemble_median" - use the range defined by the "median" method
for computing the range with the function `ensemble_time_range`.
(see `ensemble_time_range` docstring for the definition).
These interact with the value passed via "fractional_mismatch_level".
When the time range computed is larger than the range of a particular
member of the input ensemble this parameter determines whether or not
the member will be used in the stack. If the fraction of
undefined samples (i.e. N_undefined/Nsamp) is greater than this cutoff
that datum will be ignored. Otherwise if there are undefined
values they will be zero padded.
:param ensemble: input data to be stacked. Should all be in
relative time with all members having the same relative time span.
:type ensemble: TimeSeriesEnsemble
:param method: Defines a name string of the method to be used to
compute the stack.
:type method: string. Currently must be one of two values or the
function will abort: "median" or "dbxcor". As the names imply
"median" will cause the function to return the median of the sample
vectors while "dbxcor" applies the dbxcor method.
:param stack0: optional initial estimate for stack. Estimators
other than median I know of use a loss function for downweighting
members of the stack that do not match the stack as defined by
some misfit metric. This argument can be used to input an optional
starting estimate of the stack for the dbxcor method. By default it
uses the median as the starting point, but this can be used to
input something else. Note the function will silently ignore this
argument if method == "median".
:type stack0: TimeSeries. Note the time span of this optional input
must be the same or wider than the ensemble member range defined by
the (internal to this module) validate_ensemble function or the
return will be return as a copy of this TimeSeries marked dead.
Default for this argument is None which means the median will be
used for the initial stack for dbxcor
:param stack_md: optional Metadata container to define the
content of the stack output. By default the output will have only
Metadata that duplicate required internal attributes (e.g. t0 and npts).
An exception is if stack0 is used the Metadata of that container will
be copied and this argument will be ignored.
:type stack_md: Metadata container or None. If stack0 is defined
this argument is ignored. Otherwise it should be used to add
whatever Metadata is required to provide a tag that can be used to
identify the output. If not specified the stack Metadata
will be only those produce from default construction of a
TimeSeries. That is almost never what you want. Reiterate,
however, that if stack0 is defined the output stack will be a
clone of stack0 with possible modifications of time and data
range attributes and anything the stack algorithm posts.
:param residual_norm_floor: floor on residuals used to compute dbxcor weight
function. See docstring for `dbxcor_weights` for details. Ignored
unless method is "dbxcor"
:type residual_norm_floor: float (default 0.01)
"""
alg = "robust_stack"
# if other values for method are added they need to be added here
if method not in ["median", "dbxcor"]:
message = alg + ": Illegal value for argument method={}\n".format(method)
message += "Currently must be either median or dbxcor"
raise ValueError(message)
# don't test type - if we get illegal type let it thro0w an exception
if ensemble.dead():
return ensemble
if timespan_method == "stack0":
if stack0:
# intentionally don't test type of stack0
# if not a TimeSeries this will throw an exception
timespan = TimeWindow(stack0.t0, stack0.endtime())
else:
message = alg + ": usage error\n"
message += (
"timespan_method was set to stack0 but the stack0 argument is None\n"
)
message += "stack0 must be a TimeSeries to use this option"
raise ValueError(message)
elif timespan_method == "ensemble_inner":
timespan = ensemble_time_range(ensemble, metric="inner")
elif timespan_method == "ensemble_outer":
timespan = ensemble_time_range(ensemble, metric="outer")
elif timespan_method == "ensemble_median":
timespan = ensemble_time_range(ensemble, metric="median")
else:
message = alg + ": illegal value for argument timespan_method={}".format(
timespan_method
)
raise ValueError(message)
ensemble = regularize_ensemble(
ensemble, timespan.start, timespan.end, pad_fraction_cutoff
)
# the above can remove some members
M = len(ensemble.member)
# can now assume they are all the same length and don't need to worry about empty ensembles
N = ensemble.member[0].npts
if stack0:
stack = WindowData_autopad(
stack0,
timespan.start,
timespan.end,
pad_fraction_cutoff=pad_fraction_cutoff,
)
if stack.dead():
message = "Received an initial stack estimate with time range inconsistent with data\n"
message += "Recovery not implemented - stack returned is invalid"
stack.elog.log_error("robust_stack", message, ErrorSeverity.Invalid)
return stack
else:
# bit of a weird logic here - needed because we need option for
# dbxcor method to use median stack as starting point or use
# the input via stack0. This does that
#
# Also this is a bit of a weird trick using inheritance to construct a
# TimeSeries object for stack using an uncommon constructor.
# The actual constructor wants a BasicTimeSeries and Metadata as
# arg0 and arg1. A TimeSeries is a sublass of BasicTimeSeries so this
# resolves. Note the conditional is needed as None default for
# stack_md would abort
stack = TimeSeries(N)
if stack_md:
stack = TimeSeries(stack, stack_md)
stack.t0 = timespan.start
# this works because we can assume ensemble is not empty and clean
stack.dt = ensemble.member[0].dt
# constructor defaults time base to relative so need to handle GMT
# should not be the norm but we need to handle this
# This is a pretty obscure part of the ccore api
if ensemble.member[0].time_is_UTC:
stack.tref = TimeReferenceType.UTC
# Always compute the median stack as a starting point
# that was the algorithm of dbxcor and there are good reasons for it
data_matrix = np.zeros(shape=[M, N])
for i in range(M):
data_matrix[i, :] = ensemble.member[i].data
stack_vector = np.median(data_matrix, axis=0)
stack.data = DoubleVector(stack_vector)
stack.set_live()
if method == "dbxcor":
stack = _dbxcor_stacker(
ensemble,
stack,
residual_norm_floor=residual_norm_floor,
)
return stack
def _dbxcor_stacker(
ensemble,
stack0,
eps=0.001,
maxiterations=20,
residual_norm_floor=0.01,
) -> tuple:
"""
Runs the dbxcor robust stacking algorithm on `enemble` with initial
stack estimate stack0.
Returns a tuple with the stack as component 0 and a numpy vector
of the final robust weights as component 1.
This function is intended to be used only internally in this module
as it has no safties and assumes ensemble and stack0 are what it expects.
:param ensemble: TimeSeriesEnsemble assumed to have constant data
range and sample interval and not contain any dead data.
:param stack0: TimeSeries of initial stack estimate - assumed to have
same data vector length as all ensemble members.
:param eps: relative norm convergence criteria. Stop iteration when
norm(delta stack data)/norm(stack.data)<eps.
:param maxiterations: maximum number of iterations (default 20)
:param residual_norm_floor: floor on residuals used to compute dbxcor weight
function. See docstring for `dbxcor_weights` for details.
:type residual_norm_floor: float (default 0.01)
"""
stack = TimeSeries(stack0)
# useful shorthands
N = stack0.npts
M = len(ensemble.member)
for i in range(maxiterations):
wts = dbxcor_weights(ensemble, stack, residual_norm_floor=residual_norm_floor)
newstack = np.zeros(N)
sumwts = 0.0
for j in range(M):
if ensemble.member[i].live and wts[j] > 0.0:
newstack += wts[j] * ensemble.member[i].data
sumwts += wts[j]
newstack /= sumwts
# order may matter here. In this case delta becomes a numpy
# array which is cleaner in this context
delta = newstack - stack.data
relative_delta = np.linalg.norm(delta) / np.linalg.norm(stack.data)
# normalize by sample size or there is a window length dependency
relative_delta /= N
# update stack here so if we break loop we don't have to repeat
# this copying. Force one iteration to make this a do-while loop
# newstack is a numpy vector so this cast is necesary
stack.data = DoubleVector(newstack)
if i > 0 and relative_delta < eps:
break
return stack
[docs]def beam_align(ensemble, beam, time_shift_limit=10.0):
"""
Aligns ensemble members using signal defined by beam (arg1) argument.
Computes cross correlation between each ensemble member and the beam.
All live ensemble members are shifted to align with time base of the
beam. Note that can be a huge shift if the beam is relative and
the ensemble members are absolute time. It should work in that context
but the original context was aligning common-source gathers for
teleseismic phase alignment where the expectation is all the ensemble
members and the beam are in relative time with 0 defined by some
estimate of the phase arrival time. Will correctly handle irregular
window sizes between ensemble members and beam signal.
:param ensemble: ensemble of data to be correlated with
beam data.
:type ensemble: assume to be a TimeSeriesEnsemble
:param beam: common signal to correlate with ensemble members.
:type beam: assumed to be a TimeSeries object
:param time_shift_limit: ceiling on allowed time shift for
ensemble members. Any computed shift with absolute value
larger than this value will be reset to this value with the
sign of the shift preserved. (i.e. a negative lag will
be set to the negative of this number). The default is 10.0
which is large for most data more or less making this an optional
parameter.
:type time_shift_limit: float (may abort if you use an int
because the value can to sent to a C++ method that it type
sensitive)
:return: copy of ensemble with the members time shifted to align with
the time base of beam.
"""
# this may not be necessary for internal use but if used
# externally it is necessary to avoid mysterious results
# we don't test ensemble or beam because exceptions are
# guaranteed in that case that should allow problem solving
if time_shift_limit < 0.0:
message = "beam_align: illegal value time_shift_limit={}\n".format(
time_shift_limit
)
message += "value must be positive"
raise ValueError(message)
for i in range(len(ensemble.member)):
d = ensemble.member[i]
# in this context not needed but tiny cost for robustness
if d.live:
timelag = _xcor_shift(d, beam)
# apply a ceiling/floor to allowed time shift via
# the time_shift_limit arg
if timelag > time_shift_limit:
timelag = time_shift_limit
elif timelag < (-time_shift_limit):
timelag = -time_shift_limit
# We MUST use this method instead of dithering t0 to keep
# absolute time right. This will fail if the inputs were
# not shifted from UTC times
# also note a +lag requires a - shift
ensemble.member[i].shift(timelag)
return ensemble
[docs]def align_and_stack(
ensemble,
beam,
correlation_window=None,
correlation_window_keys=["correlation_window_start", "correlation_window_end"],
window_beam=False,
robust_stack_window=None,
robust_stack_window_keys=["robust_window_start", "robust_window_end"],
robust_stack_method="dbxcor",
output_stack_window=None,
robust_weight_key="robust_stack_weight",
time_shift_key="arrival_time_correction",
time_shift_limit=2.0,
abort_irregular_sampling=False,
convergence=0.001,
residual_norm_floor=0.01,
) -> tuple:
"""
This function uses an initial estimate of the array stack passed as
the `beam` argument as a seed to a robust algorithm that will
align all the data in the input ensemble by cross-correlation with
the beam, apply a robust stack to the aligned signals, update the
beam with the robust stack, and repeat until the changes to the
beam signal are small. Returns a copy of the ensemble time
shifted to be aligned with beam time base and an updated beam
estimate crated by the robust stack. The shifts and the weights
of each input signal are stored in the Metadata of each live ensemble
member returned with keys defined by `robust_weight_key` and
`time_shift_key`.
This function is a python implementation of the same basic
algorithm used in the dbxcor program described by
Pavlis and Vernon(2010) Array Processing of teleseismic body waves
with the USArray, Computers and Geosciences,15, 910-920.
It has additional options made possible by the python interface
and integration into MsPASS. In particular, the original algorithm
was designed to work as part of a GUI where the user had to pick
a set of required parameters for the algorithm. In this function
those are supplied through Metadata key-value pairs and/or arguments.
This function uses a pythonic approach aimed to allow this function
to be run in batch without user intervention. The original dbxcor
algorithm required four interactive picks to set the input.
The way we set them for this automated algorithm is described in the
following four itemize paragraphs:
1. The "correlation window", which is the waveform segment
used to compute cross-correlations between the beam and all
ensmebled members, is set one of three ways. The default
uses the time window defined by the starttime and endtime of
the `beam` signal as the cross-correlation window.
Alternative, this window can be specified either by
fetching values from the `Metadata` container of beam
or via the `correlation_window` argument. The algorithm
first tests if `correlation_window` is set and is an
instance of a `TimeWindow` object. If the type of
the argument is not a `TimeWindow` object an error is logged
and the program reverts to using the span of the beam
signal as the correlation window. If `correlation_window`
is a None (default) the algorithm then checks for a valid
input via the `correlation_window_keys` argument. If
defined that argument is assumed to contain a pair of strings
that can be used as keys to fetch start (component 0)
and end times (component 1) from the Metadata container of
the TimeSeries objct passed via beam. For example,
```
correlation_window_keys = ['correlation_start','correlation_end']
```
would cause the function to attempt to fetch the
start time with "correlation_start" and end time with
"correlation_end". In the default both `correlation_window`
and `correlation_window_keys` are None which cause the
function to silently use the window defined as
[beam.t0, beam.endtime()] as the correlation winow.
If the optional boolean `window_beam` argument is set True
the function will attempt to window the beam using a range
input via either of the optional methods of setting the
correlation window. An error is logged and nothing happens if
`window_beam` is set True and the default use of the beam
window is being used.
2. The "robust window" is a concept used in dbxcor to
implement a special robust stacking algorithm that is a novel
feature of the dbxcor algorithm. It uses a very aggresssive
weighting scheme to downweight signals that do not match the
beam. The Pavlis and Vernon paper shows examples of how this
algorithm can cleanly handle ensembles with a mix of high
signal-to-noise data with pure junk and produce a clean
stack that is defined. Note recent experience has shown
that with large, consistent ensembles the dbxcor robust
estimate tends to converge to the focus on the signal closest
to the median stack. The reason is that the median stack
is always used as the initial estimator. Hence, it can
be thought of as a median stack that uses the full data
set more completely.
3. dbxcor required the user to pick a seed signal to use as
the initial beam estimate. That approach is NOT used here
but idea is to have some estimate passed to the algorithm
via the beam (arg1) argument. In MsPASS the working model
is to apply the broadband_snr_QC function to the data before
running this function and select the initial seed (beam) from
one or more of the computed snr metrics. In addition,
with this approach I envision a two-stage computation where
the some initial seed is used for a first pass. The
return is then used to revise the correlation window by
examining stack coherence metrics and then rerunning the
algorithm. The point is it is a research problem for
different types of data to know how to best handle the
align and stack problem.
4. dbxcor had a final stage that required picking the arrival time
to use as the reference from the computed beam trace. That is
actually necessary if absolute times are needed because the
method used will be biased by the time shift of the beam
relative to the reference time. See the Pavlis and Vernon
paper for more on this topic. The idea here is that if
absolute times are needed some secondary processing will be used
to manually or automatically pick an arrival time from the
beam output.
This function does some consistency checking of arguments to handle
the different modes for handling the correlation and robust windows
noted above. It also applies a series of validation tests on the
input ensemble before attempting to run. Any of the following will
cause the return to be a dead ensemble with an explanation in the
elog container of the ensemble (in these situation the stack is an
empty `TimeSeries` container):
1. Irregular sample intervals of live data.
2. Any live data with the time reference set to UTC
3. Inconsistency of the time range of the data and the
time windows parsed for the correlation and robust windows.
That is, it checks the time span of all member functions and
if the time range of all members (min of start time and maximum end times).
is not outside (inclusive) the range of the correlation and robust
windows it is viewed as invalid.
4. What we called the "robust window" in the dbxcor paper isextract_input_beam_estimate
required to be inside (inclusive of endpoints) the cross-correlation window
time window. That could be relaxed but is a useful constraint because
in my (glp) experience the most coherent part of phase arrivals is the
first few cycles of the phase that is also the part cross-correlation
needs to contain if it is to be stable. The "robust window" should
be set to focus on the section of the signal that will have the most
coherent stack.
There is a further complexity in the iteration sequence used by this algorithm
for any robust stack method. That is, time shifts computed by cross-correlation
can potentially move the correlation window outside the bounds of the
data received as input. To reduce the impact of that potential problem
the function has an optional argument called `time_shift_limit`
that is validated against other inputs. In particular, the function
computes the average start and end time (keep in mind the assumption is the
time base is time relative to the arrival time a particular phase)
of the input ensemble. If the difference between the average start time
and the correlation window start time is less than `time_shift_limit`
the input is viewed as problematic. How that is handled depends on how
the correlation window is set. If it is received as constant
(`correlation_window` argument) an exception will be thrown to abort
the entire job. It that window is extracted from the beam TimeSeries
Metadata container a complaint is logged to the outputs. An
endtime inconsistency is treated the same way. i.e. it is treated as
a problem if the average ensemble endtime - the correlation window
endtime is less than the `time_shift_limit`.
Note the output stack normally spans a different time range than
either the correlation or robust windows. That property is defined
by the `output_stack_window` argument. See below for details.
:param ensemble: ensemble of data to be aligned and stacked.
This function requires all data to be on a relative time base.
It will throw a MsPASSError exception if any live datum has a UTC time base.
The assumption is all data have a time span that have the correlation
and robust windows inside the data time range. Both windows are
carved from the inputs using the WindowData function which will kill
any members that do not satisfy this requirement.
:type ensemble: `TimeSeriesEnsemble` with some fairly rigid requirements.
(see above)
:param beam: Estimate of stack (may be just one representative member)
used as the seed for initial alignment and stacking.
:type beam: `TimeSeries`. Must have a length consistent with window
parameters.
:param correlation_window: Used to specify the time window for
computing cross-correlations with the beam signal. Closely linked to
`correlation_window_keys` as described above.
:type correlation_window: `TimeWindow` to define explicitly. If None
(default) uses the recipe driven by `correlation_window_keys` (see above)
:param correlation_window_keys: optional pair of Metadata keys used to
extract cross-correlation window attributes from beam Metadata container.
If defined component 0 is taken as the key for the start time of the window and
component 1 the key for the end time.
:type correlation_window_key: iterable list containing two strings.
Default is None which is taken to mean the span of the beam signal defines
the cross-correlation window.
:param window_beam: if True the parsed cross-correlation window attributes
are applied to the beam signal as well as the data before starting
processing. Default is False which means the beam signal is used
directly in all cross-correlations.
:param robust_stack_window: Provide an explicity `TimeWindow` used for
extracting the robust window for this algorithm. Interacts with the
robust_stack_window_keys argument as described above.
:type robust_stack_window: If defined must be a `TimeWindow` object.
If a None type (default) use the logic defined above to set this time window.
:param robust_stack_window_keys: specifies a pair of strings to be used
as keys to extract the strart time (component 0) and end time (component 1)
of the robust time window to use from the beam `TimeSeries.
:type robust_stack_window_keys: iterable list of two strings
:param output_stack_window: optional `TimeWindow` to apply to the
computed robust stack output. Default returns a stack spanning the
inner range of all live members of the ensemble.
:type output_stack_window: `TimeWindow` object. If None (default) the range
is derived from the ensemble member time ranges.
:param robust_weight_key: The robust weight used for each member to
compute the robust stack output is posted to the Metadata container of
each live member with this key.
:type robust_weight_key: string
:param robust_stack_method: keyword defining the method textract_input_beam_estimateo use for
computing the robust stack. Currently accepted value are:
"dbxcor" (default) and "median".
:type robust_stack_method: string - must be one of options listed above.
:param time_shift_key: the time shift applied relative to the starting
point is posted to each live member with this key. It is
IMPORTANT to realize this is the time for this pass. If thisextract_input_beam_estimatefunctions
is applied more than once and you reuse this key the shift from the
previous run will be overwritten. If you need to accumulate shifts
it needs to be handled outside this function.
:type time_shift_key: string (default "arrival_time_correction")
:param convergence: fractional change in robust stack estimates in
iterative loop to define convergence. This should not be changed
unless you deeply understand the algorithm.
:type convergence: real number (default 0.001)
:param time_shift_limit:
:param abort_irregular_sampling: boolean that controls error
handling of data with irregular sample rates. This function uses
a generous test for sample rate mismatch. A mismatch is
detected only if the computed time skew over the time span of
the input beam signal is more than 1/2 of the beam sample interval
(beam.dt). When set true the function will
abort with a ValueError exception if any ensemble member fails the
sample interval test. If False (the default) offending ensemble
members are killed and a message is posted. Note the actual
ensemble is modified so the return may have fewer data live
than the input when this mode is enabled.
:type abort_irregular_sampling: boolean
:param residual_norm_floor: floor on residuals used to compute dbxcor weight
function. See docstring for `dbxcor_weights` for details.
:type residual_norm_floor: float (default 0.01)
:return: tuple with 0 containing the original ensemble but time
shifted by cross-correlation. Failed/discarded signals for the
stack are not killed but should be detected by not having the
time shift Metadata value set. component 1 is the computed
stack windowed to the range defined by `stack_time_window`.
"""
alg = "align_and_stack"
# xcor ensemble has the initial start time posted to each
# member using this key - that content goes away because
# xcorens has function scope
it0_key = "_initial_t0_value_"
ensemble_index_key = "_ensemble_i0_"
# maximum iterations. could be passed as an argument but I have
# never seen this algorithm not converge in 20 interation
MAXITERATION = 20
# Enformce types of ensemble and beam
if not isinstance(ensemble, TimeSeriesEnsemble):
message = alg + ": illegal type for arg0 (ensemble) = {}\n".format(
str(type(ensemble))
)
message += "Must be a TimeSeriesEnsemble"
raise TypeError(message)
if not isinstance(beam, TimeSeries):
message = alg + ": illegal type for arg1 (beam) = {}\n".format(str(type(beam)))
message += "Must be a TimeSeries"
raise TypeError(message)
if ensemble.dead():
return
ensemble = regularize_sampling(ensemble, beam.dt, Nsamp=beam.npts)
if ensemble.dead():
return [ensemble, beam]
# we need to make sure this is part of a valid set of algorithms
if robust_stack_method not in ["dbxcor", "median"]:
message = "Invalid value for robust_stack_method={}. See docstring".format(
robust_stack_method
)
raise ValueError(message)
# This section implements the somewhat complex chain of options for
# setting the correlation window
xcor_window_is_defined = False # needed for parsing logic below
# when this value is True window constraint errors cause the beam returned to be killed
# with error messages. If set in parsers to False an exception is thrown
# as in that situation both windows would be set as arguments and the function would
# always fail
windows_extracted_from_metadata = True
if correlation_window:
if isinstance(correlation_window, TimeWindow):
xcorwin = correlation_window
xcor_window_is_defined = True
windows_extracted_from_metadata = False
else:
message = "Illegal type for correlation_window={}\m".format(
str(type(correlation_window))
)
message += "For this option must be a TimeWindow object"
raise TypeError(message)
elif correlation_window_keys:
# this is a bit dogmatic - I know there is a less restrictive
# test than this
if isinstance(correlation_window_keys, list):
skey = correlation_window_keys[0]
ekey = correlation_window_keys[1]
if beam.is_defined(skey) and beam.is_defined(ekey):
stime = beam[skey]
etime = beam[ekey]
else:
message0 = "missing one or both of correlation_window_keys\n"
if beam.is_defined(skey):
stime = beam[skey]
else:
message = (
message0
+ "start time key={} is not set in beam signal\n".format(skey)
)
message += "reverting to beam signal start time"
ensemble.elog.log_error(alg, message, ErrorSeverity.Complaint)
stime = beam.t0
if beam.is_defined(ekey):
etime = beam[ekey]
else:
message = (
message0
+ "end time key={} is not set in beam signal\n".format(ekey)
)
message += "reverting to beam signal endtime() method output"
ensemble.elog.log_error(alg, message, ErrorSeverity.Complaint)
etime = beam.endtime()
xcorwin = TimeWindow(stime, etime)
xcor_window_is_defined = True
else:
message = "Illegal type={} for correlation_window_keys argument\n".format(
str(type(correlation_window_keys))
)
message += "If defined must be a list with 2 component string used as keys"
raise TypeError(message)
else:
# it isn't considered an error to land here as this is actually the default
# note it is important in the logic that xcor_window_is_defined be
# left false
xcorwin = TimeWindow(beam.t0, beam.endtime())
windows_extracted_from_metadata = False
if xcor_window_is_defined and window_beam:
beam = WindowData(beam, xcorwin.start, xcorwin.end)
# now a simpler logic to handle robust window
if robust_stack_window:
if isinstance(robust_stack_window, TimeWindow):
rwin = robust_stack_window
windows_extracted_from_metadata = False
else:
message = "Illegal type for robust_stack_window={}\m".format(
str(type(robust_stack_window))
)
message += "For this option must be a TimeWindow object"
raise ValueError(message)
elif robust_stack_window_keys:
# this is a bit dogmatic - I know there is a less restrictive
# test than this
if isinstance(robust_stack_window_keys, list):
skey = robust_stack_window_keys[0]
ekey = robust_stack_window_keys[1]
if beam.is_defined(skey) and beam.is_defined(ekey):
stime = beam[skey]
etime = beam[ekey]
else:
message = "missing one or both of robust_stack_window_keys\n"
if beam.is_defined(skey):
stime = beam[skey]
else:
message += "start time key={} is not set in beam signal\n".format(
skey
)
message += "reverting to beam signal start time"
ensemble.elog.log_error(alg, message, ErrorSeverity.Complaint)
stime = beam.t0
if beam.is_defined(ekey):
etime = beam[ekey]
else:
message += "endtime key={} is not set in beam signal\n".format(ekey)
message += "reverting to beam signal endtime() method output"
ensemble.elog.log_error(alg, message, ErrorSeverity.Complaint)
etime = beam.endtime()
rwin = TimeWindow(stime, etime)
else:
message = "Illegal type={} for robust_stack_window_keys argument\n".format(
str(type(robust_stack_window_keys))
)
message += "If defined must be a list with 2 component string used as keys"
raise ValueError(message)
else:
message = "Must specify either a value for robust_stack_window or robust_stack_window_keys - both were None"
raise ValueError(message)
# Validate the ensemble
# First verify the robust window is inside the correlation window (inclusive of edges)
if not (rwin.start >= xcorwin.start and rwin.end <= xcorwin.end):
message = (
"Cross correlation window and robust window intervals are not consistent\n"
)
message += (
"Cross-correlation window: {}->{}. Robust window: {}->{}\n".format(
xcorwin.start, xcorwin.end, rwin.start, rwin.end
)
)
message += "Robust window interval must be within bounds of correlation window"
if windows_extracted_from_metadata:
beam.elog.log_error(alg, message, ErrorSeverity.Invalid)
beam.kill()
return [ensemble, beam]
else:
raise ValueError(alg + ": " + message)
ensemble_timespan = ensemble_time_range(ensemble, metric="median")
if ensemble_timespan.start > xcorwin.start or ensemble_timespan.end < xcorwin.end:
message = "Correlation window defined is not consistent with input ensemble\n"
message += "Estimated ensemble time span is {} to {}\n".format(
ensemble_timespan.start, ensemble_timespan.end
)
message += "Correlation window time span is {} to {}\n".format(
xcorwin.start, xcorwin.end
)
message += "Correlation window range must be inside the data range"
# we don't use the windows_extracted_from_metadata boolean and never throw
# an exception in this case because data range depends upon each ensemble
# so there is not always fail case
beam.elog.log_error(alg, message, ErrorSeverity.Invalid)
beam.kill()
return [ensemble, beam]
# need this repeatedly so set it
N_members = len(ensemble.member)
if output_stack_window:
# this will clone the beam trace metadata automaticallyextract_input_beam_estimate
# using pad option assures t0 will be output_stack_window.start
# and npts is consistent with window requested
output_stack = WindowData(
beam,
output_stack_window.start,
output_stack_window.end,
short_segment_handling="pad",
)
# this is an obscure but fast way to initialize the data vector to all 0s
output_stack.set_npts(output_stack.npts)
else:
# also clones beam metadata but in this case we get the size from the ensemble time span
output_stack = TimeSeries(beam)
output_stack_window = TimeWindow(ensemble_timespan)
output_stack.set_t0(output_stack_window.start)
npts = int((output_stack_window.end - output_stack_window.start) / beam.dt) + 1
output_stack.set_npts(npts)
# the loop below builds cross-referencing index positions
# stored in the windowed ensemble's metadata container
# with the key defined by ensemble_index_key
# note that baggage is used to unscramble xcorens later but
# does not appear in the output ensemble derived form the
# content of the "ensemble" symbol
xcorens = TimeSeriesEnsemble(Metadata(ensemble), N_members)
for i in range(N_members):
if ensemble.member[i].live:
d = WindowData(
ensemble.member[i],
xcorwin.start,
xcorwin.end,
short_segment_handling="truncate",
)
if d.live:
d[ensemble_index_key] = i
xcorens.member.append(d)
if len(xcorens.member) == 0:
message = "WindowData with range {} to {} killed all members\n".format(
xcorwin.start, xcorwin.end
)
message += "All members have time ranges inconsistent with that cross-correlation window"
ensemble.elog.log_error(alg, message, ErrorSeverity.Invalid)
ensemble.kill()
return [ensemble, beam]
else:
xcorens.set_live()
# We need this Metadata posted to sort out total time
# shifts needed for arrival time estimates
for i in range(len(xcorens.member)):
xcorens.member[i].put_double(it0_key, xcorens.member[i].t0)
# above guarantees this cannot return a dead datum
rbeam0 = WindowData(beam, rwin.start, rwin.end)
nrm_rbeam = np.linalg.norm(rbeam0.data)
for i in range(MAXITERATION):
xcorens = beam_align(xcorens, beam, time_shift_limit=time_shift_limit)
rens = WindowData(xcorens, rwin.start, rwin.end, short_segment_handling="pad")
# this clones the Metadata of beam for the output
rbeam = robust_stack(
rens,
stack0=rbeam0,
method=robust_stack_method,
timespan_method="stack0",
residual_norm_floor=residual_norm_floor,
)
delta_rbeam = rbeam - rbeam0
nrm_delta = np.linalg.norm(delta_rbeam.data)
if nrm_delta / nrm_rbeam < convergence:
break
rbeam0 = rbeam
nrm_rbeam = np.linalg.norm(rbeam0.data)
if i >= MAXITERATION:
output_stack = rbeam
rbeam.kill()
message = "robust_stack iterative loop did not converge"
rbeam.elog.log_error(alg, message, ErrorSeverity.Invalid)
return [ensemble, rbeam]
# apply time shifts to original ensemble that we will return
# and set the value for the attribute defined by "time_shift_key"
# argument. This has to be done here so we can properly cut the
# window to be stacked
for i in range(len(xcorens.member)):
d = xcorens.member[i]
# this test is needed in case the processing above
# killed one of the members of xcorens. That can
# happen a number of ways. Note the index cross reference
# in xcorens may not match that in enemble
if d.live:
initial_starttime = xcorens.member[i][it0_key]
tshift = d.t0 - initial_starttime
j = d[ensemble_index_key]
# tshift = initial_starttimes[i] - xcorens.member[i].t0
# this shift maybe should be optional
# a positive lag from xcor requires a negative shift
# for a TimeSeries object
ensemble.member[j].shift(-tshift)
# not this posts the lag not the origin shift which is
# the minus of the lag
ensemble.member[j].put_double(time_shift_key, tshift)
if robust_stack_method == "dbxcor":
# We need to post the final weights to all live members
wts = dbxcor_weights(rens, rbeam, residual_norm_floor=residual_norm_floor)
# these need to be normalized so sum is 1 to simplify array stack below
sum_live_wts = 0.0
for w in wts:
# dbxcor_weights will flag dead data with a negative weight
if w > 0.0:
sum_live_wts += w
for i in range(len(wts)):
# WindowData may kill with large shifts so
# we need the live test here
if wts[i] > 0.0 and rens.member[i].live:
wts[i] /= sum_live_wts
for i in range(len(rens.member)):
# use shorthand since we aren't altering rens in this loop
d = rens.member[i]
if d.live:
j = d[ensemble_index_key]
ensemble.member[j].put_double(robust_weight_key, wts[i])
d2stack = WindowData(
ensemble.member[i],
output_stack_window.start,
output_stack_window.end,
short_segment_handling="truncate",
log_recoverable_errors=False,
)
# important assumption is that the weights are normalized
# so sum of wts is 1
d2stack.data *= wts[i]
# TimeSeries opertor+= handles irregular windows treating them
# like zero padding and truncating anything outside range of lhs
output_stack += d2stack
else:
# need to set this because we don't kill
# the input member but the one detected in stacking
ensemble.member[j].put_double(robust_weight_key, 0.0)
output_stack.set_live()
elif robust_stack_method == "median":
# recompute the median stack from the aligned (live) data cut to the output_stack_window range
nlive = 0
# note sizes passed to shape are set above when validating inputs and are assumed to not have changed
Npts = output_stack.npts
gather_matrix = np.zeros(shape=[N_members, Npts])
for d in ensemble.member:
# always use the zero padding method of WindowData
# run silently for median stack. Maybe should allow
# options for this case
dcut = WindowData(
d,
output_stack_window.start,
output_stack_window.end,
short_segment_handling="pad",
log_recoverable_errors=False,
)
if dcut.live:
# rounding effects with a window time range
# iteractions with t0 can cause the size of
# dcut to different from output_stack.npts
# logic handles that
if dcut.npts == Npts:
gather_matrix[nlive, :] = np.array(dcut.data)
elif dcut.npts < Npts:
gather_matrix[nlive, 0 : dcut.npts] = np.array(dcut.data)
else:
gather_matrix[nlive, :] = np.array(dcut.data[0:Npts])
nlive += 1
stack_vector = np.median(gather_matrix[0:nlive, :], axis=0)
# this matrix could be huge so we release it as quickly as possible
del gather_matrix
output_stack.data = DoubleVector(stack_vector)
output_stack.set_live()
else:
raise RuntimeError(
"robust_stack_method illegal value altered during run - this should not happen and is a bug"
)
# TODO: may want to always or optionally window ensemble output to output_stack_window
return [ensemble, output_stack]
def _coda_duration(ts, level, t0=0.0, search_start=None) -> TimeWindow:
"""
Low-level function to estimate the range of the "coda" of a particular
seismic phase. This function first computes the envelope of the
input signal passed via arg0 as a `TimeSeries` object. The function
then searches backward in time from `search_start` to the first
sample of the envelop function that exceeds the value defined by
`level`. It returns a `TimeWindow` object with a `start` set as the
value passed as t0 and the `end` attribute set to the time stamp
of the estimated coda end time. Normal use by processing functions
inside this module assume the input has a relative time standard
but the function could work with UTC data you treat `t0` and
`search_start` as required, not optional, arguments.
The function return a zero length `TimeWindow` (start == end) if the
level of the envelope never exceeds the value defined by the level
argument.
:param ts: Datum to be processed. The function will return a null
result (zero length window) if ts.t0> t0 (argument value). The sample
data is assumed filtered to an appropriate band where the envelope will
properly define the coda.
:type ts: `TimeSeries` is assumed. There is not explicit type checking
but a type mismatch will always cause an error.
:param level: amplitude of where the backward time search will be
terminated. This value should normally be computed from a background
noise estimate.
:type level: float
:param t0: beginning of coda search interval. This time is mainly used
as the failed search. That is, if the level never rises above the
value defined by the `level` argument a zero length window will be
returned with start and end both set to this value. If the value
is inconsistent with the start time of the data it is reset to the
time of the first data sample.
:type t0: float (default 0.0 - appropriate for normal use with data
time shifted so 0 is a P or S phase arrival time)
:param search_start: optional time to start backward search.
The default, which is defined with a None type, is to start the search
at ts.endtime(). If a value is given and it exceeds ts.endtime()
the search will silently be truncated to start at ts.endtime().
Similarly if `search_start` is less than ts.t0 the search will also
be silently reset to the ts.endtime().
:type search_start: float (time - either relative time or an epoch time)
:return: `TimeWindow` object with window `end` value defining the
end of the coda. window.end-window.start is a measure of coda duration.
Returns a zero length window if the amplitude never exceeds the
valued defined by the `level` parameter. Not the start value may
be different from the input t0 value if the data start time (ts.t0) is
greater than the value defined by the `t0` argument.
"""
# silently handle inconsistencies in t0 and search_start
# perhaps should have these issue an elog complaint
it0 = ts.sample_number(t0)
# silently reset to 0 if t0 is not valid for
if it0 < 0:
it0 = 0
t0used = ts.time(0)
else:
t0used = t0
if search_start:
itss = ts.sample_number(search_start)
if itss >= ts.npts or itss < 0:
itss = ts.npts - 1
else:
itss = ts.npts - 1
if itss <= it0:
message = "_coda_durations: values for inferred search range {} to {} do not overlap data range"
raise ValueError(message)
httsd = signal.hilbert(ts.data)
envelope = np.abs(httsd)
it0 = ts.sample_number(t0)
i = itss
while i > it0:
if envelope[i] > level:
break
i -= 1
# A failed search will have start and end the same
return TimeWindow(t0used, ts.time(i))
def _set_phases(
d,
model,
Ptime_key="Ptime",
pPtime_key="pPtime",
PPtime_key="PPtime",
station_collection="channel",
default_depth=10.0,
) -> TimeSeries:
"""
Cautiously sets model-based arrival times in Metadata of input
d for first arrival of the phases P pP, and PP. pP and PP
do not exist at all distances and depths so the times may be
left undefined. A value for P should always be set unless that
datum was marked dead on input.
The input datum must contain Metadata entries required to compute the
travel times. That can be in one of two forms:
(1) The normal expectation is to source coordinates are defined with
keys "source_iat" , "source_lon", and "source_time", and
"source_depth" while receiver coordinates are defined with
"channel_lat" and "channel_lon". A common variant is possible
if the `station_collection` argument is set to "site". Then
the function will try to fetch "site_lat" and "site_lon".
A special case is source depth. If "source_depth" is not
defined the value defined with the `default_depth` argument is
used.
(2) If the key "dist" is defined in d it is assumed to contain
a previously computed distance in degrees from source to this
receiver. In that case the only other required source property
is "source_time" - event origin time.
:param d: seismic datum to process. This datum requires source and
receiver metadata attributes as noted above
:type d: Assumed to be a `TimeSeries`. That is not tested as this is
an internal function. Use outside the module should assure input is
a `TimeSeries` or create an error handler for exceptions.
:param model: an instance of an obspy TauPyModel object used to
compute times.
:type model: obspy TauPyModel object
:param Ptime_key: key used to set the pP arrival time.
:type pPtime_key: string (default "pPtime")
:param pPtime_key: key used to set the P arrival time.
:type PPtime_key: string (default "PPtime")
:param PPtime_key: key used to set the PP arrival time.
:type PPtime_key: string (default "PPtime")
:param station_collection: normalizing collection used to define
receiver coordinates. Used only to create names for coordinates.
e.g. if set to "site" expects to find "site_lat" while if set to
default "channel" would expect to find "channel_lat".
:type station_collection: string (default "channel")
:param default_depth: source depth to use if the attribute "source_depth"
is not defined. This is a rare recovery to handle the case where
the epicenter and origin time are defined but the depth is not.
:type default_depth: float
:return: `TimeSeries` copy of input with model-based arrival times posted
to Metadata container with the specified keys.
"""
if d.dead():
return d
alg = "_set_phases"
# these symbols need to be effectively declared here or the go out of scope
# before being used as arguments for the get_travel_time method of the taup calculator
depth = 0.0
dist = 0.0
if d.is_defined("dist"):
dist = d["dist"]
origin_time = d["source_time"]
else:
if (
d.is_defined("source_lat")
and d.is_defined("source_lon")
and d.is_defined("source_time")
):
srclat = d["source_lat"]
srclon = d["source_lon"]
origin_time = d["source_time"]
# compute dist
else:
message = "Missing required source coordinates: source_lat,source_lon, and/or source_time values\n"
message += "Cannot handle this datum"
d.elog.log_error(alg, message, ErrorSeverity.Invalid)
d.kill()
return d
lat_key = station_collection + "_lat"
lon_key = station_collection + "_lon"
if d.is_defined(lat_key) and d.is_defined(lon_key):
stalat = d[lat_key]
stalon = d[lon_key]
[dist, seaz, esaz] = gps2dist_azimuth(stalat, stalon, srclat, srclon)
dist = kilometers2degrees(dist / 1000.0)
# always post these - warning produces a state dependency as there is no guarantee these
# get posted if "dist" was defined on input
# keys are frozen and based on standard mspass schema
d["dist"] = dist
d["seaz"] = seaz
d["esaz"] = esaz
else:
message = "Missing required receiver coordinates defined with keys: {} and {}\n".format(
lat_key, lon_key
)
message += "Cannot handle this datum"
d.elog.log_error(alg, message, ErrorSeverity.Invalid)
d.kill()
return d
if d.is_defined("source_depth"):
depth = d["source_depth"]
else:
depth = default_depth
message = "source_depth value was not defined - using default value={}".format(
depth
)
d.elog.log_error(alg, message, ErrorSeverity.Complaint)
arrivals = model.get_travel_times(
source_depth_in_km=depth, distance_in_degree=dist, phase_list=["P", "pP", "PP"]
)
# from all I can tell with the list above arrivals always has entries for at least one of
# the phases in phase_list. If depth is invalid it throws an exception and it seems to
# handle dist for anything. Hence, this assumes arrivals is not empty. An earlier
# had an unnecessary error handler for that case.
# only set first arrival for multivalued arrivals
P = []
pP = []
PP = []
for arr in arrivals:
if arr.name == "P":
P.append(arr.time)
elif arr.name == "pP":
pP.append(arr.time)
elif arr.name == "PP":
PP.append(arr.time)
if len(P) > 0:
d[Ptime_key] = min(P) + origin_time
if len(pP) > 0:
d[pPtime_key] = min(pP) + origin_time
if len(PP) > 0:
d[PPtime_key] = min(PP) + origin_time
return d
def _get_search_range(d, Pkey="Ptime", pPkey="pPTime", PPkey="PPtime"):
"""
Small internal function used to standardize the handling of the search
range for P coda. It returns a time duration to use as the search
range relative to 0 (P time) based on a simple recipe to avoid interference
from pP and PP phases. Specifically, if the source depth is greater than
100 km the pP phase is used as the maximum duration of the coda.
For shallower sources PP is used.
This function should not normally be used except as a component of the
MCXcorPrepP function. It has no safeties and is pretty simple.
That simple recipe, however, took some work to establish tha tis documented
a notebook in the distribution.
"""
if d.live:
depth = d["source_depth"]
if depth > 100.0:
tend = d[pPkey]
else:
tend = d[PPkey]
duration = tend - d[Pkey]
else:
duration = 0.0
return duration
def _xcor_shift(ts, beam):
"""
Internal function with no safeties to compute a time shift in
seconds for beam correlation. The shift is computed by
using the scipy correlate function. The computed shift is the
time shift that would need to be applied to ts to align with
that of beam. Note that number can be enormous if using UTC
times and it will still work. The expected use, however,
in this function is with data prealigned by an phase arrival
time (predicted from a mdoel of an estimate).
:param ts: TimeSeries datum to correlate with beam
:param beam: TimeSeries defining the common signal (beam) for
correlation - the b argument to correlation.
"""
# note this assumed default mode of "full"
xcor = signal.correlate(ts.data, beam.data)
lags = signal.correlation_lags(ts.npts, beam.npts)
# numpy/scipy treat sample 0 as time 0
# with TimeSeries we have to correct with the t0 values to get timing right
lag_of_max_in_samples = lags[np.argmax(xcor)]
lagtime = ts.dt * lag_of_max_in_samples + ts.t0 - beam.t0
return lagtime