Source code for mspasspy.algorithms.snr

import pickle
import numpy as np
from bson import ObjectId
from scipy.signal import hilbert
from obspy.geodetics import locations2degrees
from mspasspy.ccore.utility import MsPASSError, ErrorSeverity, ErrorLogger
from mspasspy.ccore.seismic import TimeSeries, Seismogram
from mspasspy.ccore.algorithms.deconvolution import MTPowerSpectrumEngine
from mspasspy.ccore.algorithms.amplitudes import (
    RMSAmplitude,
    PercAmplitude,
    MADAmplitude,
    PeakAmplitude,
    EstimateBandwidth,
    BandwidthStatistics,
)
from mspasspy.ccore.algorithms.basic import TimeWindow, Butterworth, _ExtractComponent
from mspasspy.algorithms.window import WindowData


def _window_invalid(d, win):
    """
    Small helper used internally in this module.  Tests if TimeWidow defined
    by win has a span inside the range of the data object d.  Return True
    if the range is invalid - somewhat reversed logic is used because of
    the name choice that make the code conditioals clearer.
    """
    if d.t0 < win.start and d.endtime() > win.end:
        return False
    else:
        return True


def _safe_snr_calculation(s, n):
    """
    Helper used in this module for all snr calculations.   snr is
    always defined as a ratio of signal amplitude divided by noise amplitude.
    An issue is that with simulation data it is very common to have a noise
    window that is pure zeros.   If computed naively snr would then be
    normally be returned as NaN.  NaNs can cause a lot of mysterious errors
    so we handle that differently here.  When noise amplitude is 0 we
    then test the signal amplitude.  If it is nonzero we return large
    number defined inside this function as 999999.9 (just under 1 million).
    If both amplitudes are zero we return -1.0 which can be properly treated
    as an error or data with low snr.
    """
    if n == 0.0:
        if s > 0.0:
            return 999999.9
        else:
            return -1.0
    else:
        return s / n


[docs]def snr( data_object, noise_window=TimeWindow(-130.0, -5.0), signal_window=TimeWindow(-5.0, 120.0), noise_metric="mad", signal_metric="mad", perc=95.0, ): """ Compute time-domain based signal-to-noise ratio with a specified metric. Signal-to-noise ratio is a fundamental measurement in all forms of seismic data processing. There is, however, not a single unified metric that ideal for all types of signals one may want to analyze. One class of metrics used time-domain metrics to use some measure of amplitude in a signal and noise window cut from a single waveform segment. A type example is snr of some particular "seismic phase" (P, S, PP, ScS, etc) relative to some measure of background noise. e.g. for P phases it is nearly universal to try to estimate snr from some window defined by the arrival time of P and a noise window before the time P arrives (pre-event noise). This function provides a generic api to measure a large range of metrics using one of four choices for measuring the norm of the data in the signal and noise windows: 1. rms - L2 norm 2. mad - median absolute difference, which is essentially the median amplitude in this context 3. perc - percentage norm ala seismic unix. perc is defined at as the amplitude level were perc percentage of the data have an amplitude smaller than this value. It is computed by ranking (sorting) the data, computing the count of that perctage relative to the number of amplitude samples, and returning the amplitude of the nearest value to that position in the ranked data. 4. peak - is the peak value which in linear algebra is the L infinity norm Note the user can specify a different norm for the signal and noise windows. The perc metric requires specifying what percentage level to use. It is important to recognize that ALL of these metrics are scaled to amplitude not power (amplitude squared). This function will throw a MsPASSError exception if the window parameters do not define a time period inside the range of the data_object. You will need a custom function if the model of windows insider a larger waveform segment does not match your data. There is one final detail about an snr calculation that we handle carefully. With simulation data it is very common to have error free simulations where the "noise" window one would use with real data is all zeros. An snr calculated with this function in that situation would either return inf or NaN depending on some picky details. Neither is good as either can cause downstream problems. For that reason we trap any condition where the noise amplitude measure is computed as zero. If the signal amplitude is also zero we return a -1.0. Otherwise we return a large, constant, positive number. Neither condition will cause an exception to be thrown as that condition is considered somewhat to be anticipated. :param data_object: MsPASS atomic data object (TimeSeries or Seismogram) to use for computing the snr. Note that for Seismogram objects the metrix always use L2 measures of amplitude of each sample (i.e. vector amplitudes) If snr for components of a Seismogram are desired use ExtractComponent and apply this function to each component separately. :param noise_window: TimeWindow objects defining the time range to extract from data_object to define the part of the signal considered noise. Times can be absolute or relative. Default the range -5 to 120 which is makes sense only as time relative to some phase arrival time. :param signal_window: TimeWindow object defining the time range to extract from data_object to define the part of the signal defines as signal to use for the required amplitude measure. Default of -130 to -5 is consistent with the default noise window (in terms of length) and is assumes a time relative to a phase arrival time. For absolute times each call to this function may need its own time window. :param noise_metric: string defining one of the four metrics defined above ('mad','peak','perc' or 'rms') to use for noise window measurement. :param signal_metric: string defining one of the four metrics defined above ('mad','peak','perc' or 'rms') to use for signal window measurement. :return: estimated signal-to-noise ratio as a single float. Note the special returns noted above for any situation where the noise window amplitude is 0 """ if _window_invalid(data_object, noise_window): raise MsPASSError( "snr: noise_window []{wstart} - {wend}] is outside input data range".format( wstart=noise_window.start, wend=noise_window.end ), ErrorSeverity.Invalid, ) if _window_invalid(data_object, signal_window): raise MsPASSError( "snr: noise_window []{wstart} - {wend}] is outside input data range".format( wstart=noise_window.start, wend=noise_window.end ), ErrorSeverity.Invalid, ) n = WindowData(data_object, noise_window.start, noise_window.end) s = WindowData(data_object, signal_window.start, signal_window.end) if noise_metric == "rms": namp = RMSAmplitude(n) elif noise_metric == "mad": namp = MADAmplitude(n) elif noise_metric == "peak": namp = PeakAmplitude(n) elif noise_metric == "perc": namp = PercAmplitude(n, perc) else: raise MsPASSError( "snr: Illegal noise_metric argument = " + noise_metric, ErrorSeverity.Invalid, ) if signal_metric == "rms": samp = RMSAmplitude(s) elif signal_metric == "mad": samp = MADAmplitude(s) elif signal_metric == "peak": samp = PeakAmplitude(s) elif signal_metric == "perc": samp = PercAmplitude(s, perc) else: raise MsPASSError( "snr: Illegal signal_metric argument = " + signal_metric, ErrorSeverity.Invalid, ) return _safe_snr_calculation(samp, namp)
def _reformat_mspass_error( mserr, prefix_message, suffix_message="Some requested metrics may not be computed" ): """ Helper for below used to reformat a message from ccore functions that throw a MsPASSError. Needed to produce rational messages from different error metric calculations. :param mserr: MsPASSError object caught with a try: except: block :param prefix_message: string that becomes the first part of the revised message posted. :param suffix_message: string that becomes a third line of the revised message posted. Note this string is always preceded by a newline so do not put a newline in this arg unless you want a blank line. :return: expand message string """ log_message = "FD_snr_estimator: error in computing an snr metric" log_message += prefix_message log_message += mserr.message log_message += "\n" log_message += suffix_message return log_message
[docs]def FD_snr_estimator( data_object, noise_window=TimeWindow(-130.0, -5.0), noise_spectrum_engine=None, signal_window=TimeWindow(-5.0, 120.0), signal_spectrum_engine=None, band_cutoff_snr=2.0, signal_detection_minimum_bandwidth=6.0, tbp=4, ntapers=6, high_frequency_search_start=2.0, fix_high_edge=True, poles=3, perc=95.0, optional_metrics=None, save_spectra=False, ): # optional_metrics=['snr_stats','filtered_envelope','filtered_L2','filtered_Linf','filtered_MAD','filtered_perc']): """ Estimates one or more amplitude metrics of signal-to-noise from a TimeSeries object. An implicit assumption is that the analysis is centered on a timeable "phase" like P, PP, etc. This is a python function that can be used to compute one or several signal-to-noise ratio estimates based on an estimated bandwidth using the C++ function EstimateBandwidth. The function has a fair number of options, but the core metrics computed are the bandwidth estimates computed by that function. It uses a fairly simple search algorithm that functions well for most earthquake sources. For the low end the algorithm searches from the first frequency indistinguishable from DC to find the lowest frequency for which the snr exceeds a threshold specified by the input parameter 'band_cutoff_snr'. It does a similar search from the high end from a point 80% of Nyquist - a good choice for all modern digital data that use FIR antialias filters. Both searches are not just defined with just the first frequency to satisfy the snr threshold criteria. Only when a group of frequencies more than 2 times the time-bandwidth product exceed the threshold is the band edge defined. The actual band edge is then defined as the first frequency exceeding the threshold. That more elaborate algorithm was used to prevent pure lines in either the signal or noise spectrum from corrupting the estimates. A set of optional metrics can be computed. All optional metrics use the bandwidth estimates in one way or another. Optional metrics are defined by the following keywords passed through a list (actually any iterable container will work) of strings defining one or more of the keywords. The metrics and a brief description of each follow: *snr_stats* computes what are commonly plotted in box plots for the snr estimates within the estimated bandwidth: minimum, maximum, 0.25 (1/4) point, 0.75 (3/4) point, and the median. These are set with following dict keys: 'snr_band_maximum','snr_band_minimum', 'snr_band_1/4', 'srn_band_3/4', and 'snr_band_median' respectively. *filtered_envelope*, *filtered_L2*, *filtered_Linf*, *filtered_perc*, and *filtered_MAD*: All of these optional metrics first copy the data_object and then filter the copy with a Butterworth bandpass filter with the number of poles specified by the npoles argument and corners at the estimated band edge by the EstimateBandwidth function. The metrics computed are time domain snr estimates computed with he filtered data. They are actually computed from functions in this same module that can be used independently and have their own docstring description. The functions called have the following names in order of the keyword list above: *snr_envelope*, *snr_filtered_rms*, *snr_Linv*, and *snr_filtered_mad*. When the computed they are set in the output dictionary with the following (again in order) keys: 'snr_envelope','snr_filtered_rms', 'srn_Linf', and 'snr_filtered_mad'. It is important to note that all the metrics this function returns are measures of amplitude NOT power. You need to be particularly aware of this if you unpickle the spectra created if you set save_spectra true as those are power spectra. :param data_object: TimeSeries object to be processed. For Seismogram objects the assumption is algorithm would be used for a single component (e.g longitudinal or vertical for a P phase) :param noise_window: defines the time window to use for computing the spectrum considered noise. The time span can be either relative or UTC (absolute) time but we do not check for consistency. This low level function assumes they are consistent. If not, the calculations are nearly guaranteed to fail. Type must be mspasspy.ccore.TimeWindow. :param signal_window: defines the time window to use that defines what you consider "the signal". The time span can be either relative or UTC (absolute) time but we do not check for consistency. This low level function assumes they are consistent. If not, the calculations are nearly guaranteed to fail. Type must be mspasspy.ccore.TimeWindow. :param noise_spectrum_engine: is expected to either by a None type or an instance of a ccore object called an MTPowerSpectralEngine. When None an instance of MTPowerSpectralEngine is computed for each call to this function. That is a convenience for small jobs or when called with data from mixed sample rates and/or variable length time windows. It is very inefficient to use the default approach for processing large data sets and really for any use in a map operation with dask or spark. Normal use should be for the user to predefine an MtPowerSpectralEngine from the expected window size for a given data sample rate and include it in the function call. :param signal_spectrum_engine: is the comparable MTPowerSpectralEngine to use to compute the signal power spectrum. Default is None with the same caveat as above for the noise_spectrum_engine. :param band_cutoff_snr: defines the signal-to-noise ratio floor used in the search for band edges. See description of the algorithm above and in the user's manual. Default is 2.0 :param signal_detection_minimum_bandwidth: As noted above this algorithm first tries to estimate the bandwidth of data where the signal level exceeds the noise level defined by the parameter band_cutoff_snr. It then computes the bandwidth of the data in dB computed as log10(f_high/f_low). For almost any application if the working bandwidth falls below some threshold the data is junk to all intends and purpose. A factor more relevant to this algorithm is that the "optional parameters" will all be meaningless and a waste of computational effort if the bandwidth is too small. A particular extreme example is zero bandwidth that happens all the time if no frequency band exceeds the band_cutoff_snr for a range over that minimum defined by the time-bandwidth product. The default is 6.0. (One octave which is roughly the width of the traditional short-period band) which allows optional metrics to be computed but may be too small for some applications. If your application requires higher snr and wider bandwidth adjust this parameter and/or band_cutoff_snr. :param tbp: time-bandwidth product to use for computing the set of Slepian functions used for the multitaper estimator. This parameter is used only if the noise_spectrum_engine or signal_spectrum_engine arguments are set as None. The default is 4.0 :param ntapers: is the number of Slepian functions (tapers) to compute for the multitaper estimators. Like tbp it is referenced only if noise_spectrum_engine or signal_spectrum_engine are set to None. Note the function will throw an exception if the ntaper parameter is not consistent with the time-bandwidth product. That is, the maximum number of tapers is round(2*tbp-1). Default is 6 which is consistent with default tbp=4.0 where the maximum recommended is 8 :param high_frequency_search_start: Used to specify the upper frequency used to start the search for the upper end of the bandwidth by the function EstimateBandwidth. Default is 2.0 which reasonable for teleseismic P wave data. Should be change for usage other than analysis of teleseimic P phases or you the bandwidth may be grossly underestimated. :param fix_high_edge: boolean controlling upper search behavior. When set True the search from the upper frequency limit is disabled and the upper band limit edge is set as the value passed as high_frequency_search_start. False enables the search. True is most useful for teleseismic body waves as many stations have a series of closely enough spaced lines (presumably from electronic sources) that set the high edge incorrectly. False would be more appropriate for most local and regional earthquake data. The default is True. :param npoles: defines number of poles to us for the Butterworth bandpass applied for the "filtered" metrics (see above). Default is 3. :param perc: used only if 'filtered_perc' is in the optional metrics list. Specifies the perc parameter as used in seismic unix. Uses the percentage point specified of the sorted abs of all amplitudes. (Not perc=50.0 is identical to MAD) Default is 95.0 which is 2 sigma for Gaussian noise. :param optional_metrics: is an iterable container containing one or more of the optional snr metrics discussed above. :param store_as_subdocument: This parameter is included for flexibility but should not normally be changed by the user. As noted earlier the outputs of this function are best abstracted as Metadata. When this parameter is False the Metadata members are all posted with directly to data_object's Metadata container. If set True the internally generated python dict is copied and stored with a key defined through the subdocument_key argument. See use below in function arrival_snr. :param subdocument_key: key for storing results as a subdocument. This parameter is ignored unless store_as_subdocument is True. Default is "snr_data" :param save_spectra: If set True (default is False) the function will pickle the computed noise and signal spectra and save the strings created along with a set of related metadata defining the time range to the output python dict (these will be saved in MongoDB when db is defined - see below). This option should ONLY be used for spot checking, discovery of why an snr metric has unexpected results using graphics, or a research topic where the spectra would be of interest. It is a very bad idea to turn this option on if you are processing a large quantity of data and saving the results to MongoDB as it will bloat the arrival collection. Consider a different strategy if that essential for your work. :return: python tuple with two components. 0 is a python dict with the computed metrics associated with keys defined above. 1 is a mspass.ccore.ErrorLogger object. Any errors in computng any of the metrics will be posted to this logger. Users should then test this object using it's size() method and if it the log is not empty (size >0) the caller should handle that condition. For normal use that means pushing any messages the log contains to the original data object's error log. Component 0 will also be empty with no log entry if the estimated bandwidth falls below the threshold defined by the parameter signal_detection_minimum_bandwidth. """ algname = "FN_snr_estimator" my_logger = ErrorLogger() # For this algorithm we dogmatically demand the input be a TimeSeries if not isinstance(data_object, TimeSeries): raise MsPASSError( "FD_snr_estimator: Received invalid data object - arg0 data must be a TimeSeries", ErrorSeverity.Invalid, ) # MTPowerSpectrum at the moment has an issue with how it handles # a user error in specifying time-band product and number of tapers. # We put in an explicit trap here and abort if the user makes a mistake # to avoid a huge spray of error message if ntapers > round(2 * tbp): message = ( algname + "(Fatal Error): ntapers={ntapers} inconsistent with tbp={tbp}\n".format( ntapers=ntapers, tbp=tbp ) ) message += "ntapers must be >= round(2*tbp)" raise MsPASSError(message, ErrorSeverity.Fatal) if data_object.dead(): my_logger.log_error( algname, "Datum received was set dead - cannot compute anything", ErrorSeverity.Invalid, ) return [dict(), my_logger] # We enclose all the main code here in a try block and cat any MsPASSErrors # they will be posted as log message. Others will not be handled # intentionally letting python's error mechanism handle them as # unexpected exceptions - MsPASSError can be anticipated for data problems snrdata = dict() try: # First extract the required windows and compute the power spectra n = WindowData(data_object, noise_window.start, noise_window.end) s = WindowData(data_object, signal_window.start, signal_window.end) # WARNING: this handler depends upon an implementation details # that could be a maintenance issue. The python code has a catch # that kills a datum where windowing fails. The C++ code throws # an exception when that happens. The python code posts that error # message to the output which we extract here if n.dead() or s.dead(): if n.dead(): if n.elog.size() > 0: my_logger += n.elog if s.dead(): if s.elog.size() > 0: my_logger += s.elog if noise_spectrum_engine: nengine = noise_spectrum_engine else: nengine = MTPowerSpectrumEngine(n.npts, tbp, ntapers, n.npts * 2, n.dt) if signal_spectrum_engine: sengine = signal_spectrum_engine else: sengine = MTPowerSpectrumEngine(s.npts, tbp, ntapers, s.npts * 2, s.dt) N = nengine.apply(n) S = sengine.apply(s) bwd = EstimateBandwidth( S.df(), S, N, band_cutoff_snr, tbp, high_frequency_search_start, fix_high_edge, ) # here we return empty result if the bandwidth is too low if bwd.bandwidth() < signal_detection_minimum_bandwidth: return [dict(), my_logger] # These estimates are always computed and posted once we pass the above test for validity snrdata["low_f_band_edge"] = bwd.low_edge_f snrdata["high_f_band_edge"] = bwd.high_edge_f snrdata["low_f_band_edge_snr"] = bwd.low_edge_snr snrdata["high_f_band_edge_snr"] = bwd.high_edge_snr snrdata["spectrum_frequency_range"] = bwd.f_range snrdata["bandwidth_fraction"] = bwd.bandwidth_fraction() snrdata["bandwidth"] = bwd.bandwidth() if save_spectra: snrdata["signal_spectrum"] = pickle.dumps(S) snrdata["noise_spectrum"] = pickle.dumps(N) snrdata["signal_window_start_time"] = signal_window.start snrdata["signal_window_end_time"] = signal_window.end snrdata["noise_window_start_time"] = noise_window.start snrdata["noise_window_end_time"] = noise_window.end except MsPASSError as err: newmessage = _reformat_mspass_error( err, "Spectrum calculation and EstimateBandwidth function section failed with the following message\n", "No SNR metrics can be computed for this datum", ) my_logger.log_error(algname, newmessage, ErrorSeverity.Invalid) return [dict(), my_logger] # For current implementation all the optional metrics require # computed a filtered version of the data. If a new option is # desired that does not require filtering the data the logic # here will need to be changed to create a more exclusive test if optional_metrics: # use the mspass butterworth filter for speed - obspy # version requires a conversion to Trace objects BWfilt = Butterworth( False, True, True, poles, bwd.low_edge_f, poles, bwd.high_edge_f, data_object.dt, ) filtered_data = TimeSeries(data_object) BWfilt.apply(filtered_data) nfilt = WindowData(filtered_data, noise_window.start, noise_window.end) sfilt = WindowData(filtered_data, signal_window.start, signal_window.end) # In this implementation we don't need this any longer so we # delete it here. If options are added beware del filtered_data # Some minor efficiency would be possible if we avoided # duplication of computations when multiple optional metrics # are requested, but the fragility that adds to maintenance # is not justified for metric in optional_metrics: if metric == "snr_stats": try: stats = BandwidthStatistics(S, N, bwd) # stats is a Metadata container - copy to snrdata # but do so only if the results are marked valid if stats["stats_are_valid"]: for k in stats.keys(): snrdata[k] = stats[k] else: my_logger.log_error( algname, "BandwidthStatistics marked snr_stats data invalid", ErrorSeverity.Complaint, ) except MsPASSError as err: # This handler currently would never be entered but # left in place to keep code more robust in the event # of a change newmessage = _reformat_mspass_error( err, "BandwithStatistics throw the following error\n", "Five snr_stats attributes were not computed", ) my_logger.log_error(algname, newmessage, err.severity) elif metric == "filtered_envelope": try: analytic_nfilt = hilbert(nfilt.data) analytic_sfilt = hilbert(sfilt.data) nampvector = np.abs(analytic_nfilt) sampvector = np.abs(analytic_sfilt) namp = np.median(nampvector) samp = np.max(sampvector) snrdata["snr_filtered_envelope_peak"] = _safe_snr_calculation( samp, namp ) except: my_logger.log_erro( algname, "Error computing filtered_envelope metrics: snr_filtered_envelope_peak not computed", ErrorSeverity.Complaint, ) elif metric == "filtered_L2": try: namp = RMSAmplitude(nfilt) samp = RMSAmplitude(sfilt) snrvalue = _safe_snr_calculation(samp, namp) snrdata["snr_filtered_rms"] = snrvalue except MsPASSError as err: newmessage = _reformat_mspass_error( err, "Error computing filtered_L2 metric", "snr_filtered_rms attribute was not compouted", ) my_logger.log_error(algname, newmessage, err.severity) elif metric == "filtered_MAD": try: namp = MADAmplitude(nfilt) samp = MADAmplitude(sfilt) snrvalue = _safe_snr_calculation(samp, namp) snrdata["snr_filtered_mad"] = snrvalue except MsPASSError as err: newmessage = _reformat_mspass_error( err, "Error computing filtered_MAD metric", "snr_filtered_mad attribute was not computed", ) my_logger.log_error(algname, newmessage, err.severity) elif metric == "filtered_Linf": try: # the C function expects a fraction - for users a percentage # is clearer namp = PercAmplitude(nfilt, perc / 100.0) samp = PeakAmplitude(sfilt) snrvalue = _safe_snr_calculation(samp, namp) snrdata["snr_filtered_peak"] = snrvalue except MsPASSError as err: newmessage = _reformat_mspass_error( err, "Error computing filtered_Linf metric", "snr_filtered_peak attribute was not computed", ) my_logger.log_error(algname, newmessage, err.severity) elif metric == "filtered_perc": try: namp = MADAmplitude(nfilt) samp = PercAmplitude(sfilt, perc / 100.0) snrvalue = _safe_snr_calculation(samp, namp) snrdata["snr_filtered_perc"] = snrvalue snrdata["snr_perc"] = perc except MsPASSError as err: newmessage = _reformat_mspass_error( err, "Error computing filtered_perc metric", "snr_perf metric was not computed", ) my_logger.log_error(algname, newmessage, err.severity) else: message = "Illegal optional_metrics keyword=" + metric + "\n" message += ( "If that is a typo expect some metrics will be missing from output" ) my_logger.log_error(algname, message, ErrorSeverity.Complaint) return [snrdata, my_logger]
[docs]def arrival_snr( data_object, noise_window=TimeWindow(-130.0, -5.0), noise_spectrum_engine=None, signal_window=TimeWindow(-5.0, 120.0), signal_spectrum_engine=None, band_cutoff_snr=2.0, signal_detection_minimum_bandwidth=6.0, tbp=4.0, ntapers=6, high_frequency_search_start=2.0, fix_high_edge=True, poles=3, perc=95.0, save_spectra=False, phase_name="P", arrival_time_key="Ptime", metadata_output_key="Parrival", kill_null_signals=True, optional_metrics=[ "snr_stats", "filtered_envelope", "filtered_L2", "filtered_Linf", "filtered_MAD", "filtered_perc", ], ): """ Specialization of FD_snr_estimator. A common situation where snr data is a critical thing to estimate is data windowed around a given seismic phase. FD_snr_estimator is a bit more generic. This function removes some of the options from the more generic function and has a frozen structure appropriate for measuring snr of a particular phase. In particular it always stores the results as a subdocument (python dict) keyed by the name defined in the metadata_output_key argument. This function has a close sibling called "broadband_snr_QC" that has similar behavior but add some additional functionality. The most significant limitation of this function relative to broadband_snr_QC is that this function ONLY accepts TimeSeries data as input. This function is most appropriate for QC done within a workflow where the model is to process a large data set and winnow it down to separate the wheat from the chaff, to use a cliche consistent with "winnow". In that situation the normal use would be to run this function with a map operator on atomic data and follow it with a call to filter to remove dead data and/or filter with tests on the computed metrics. See User's Manual for guidance on this topic. Because that is the expected normal use of this function the kill_null_signals boolean defaults to True. To be more robust the function tries to handle a common error. That is, if the input data has a UTC time standard then the noise and signal windows would need to be shifted to some reference time to make any sense. Consequently, this algorithm silently handles that situation automatically with a simple test. If the data are relative time no test of the time window range is made. If the data are UTC, however, it tests if the signal time window is inside the data range. If not, it shifts the time windows by a time it tries to pull from the input with the key defined by "arrival_time_key". If that attribute is not defined a message is posted to elog of the input datum and it is returned with no other change. (i.e. the attribute normally output with the tag defined by metadata_output_key will not exist in the output). Large data workflows need to handle this condition,. Dead inputs are handled the standard way - returned immediately with no change. Most parameters for this function are described in detail in the docstring for FD_snr_estimator. The user is referred there to see the usage. The following are added for this specialization: :param phase_name: Name tag for the seismic phase being analyzed. This string is saved to the output subdocument with the key "phase". The default is "P" :param arrival_time_key: key (string) used to fetch an arrival time if the data are in UTC and the time window received does not overlap the data range (see above) :param kill_null_signals: boolean controlling how null snr estimator returns are handled. When True (default) if FD_snr_estimator returns a null result (no apparent signal) that input datum is killed before being returned. In that situation no snr metrics will be in the output because null means FD_snr_estimator couldn't detect a signal and the algorithm failed. When False the datum is returned silently but will have no snr data defined in a dict stored with the key metadata_output_key (i.e. that attribute will be undefined in output) :param metadata_output_key: is a string used as a key under which the subdocument (python dict) created internally is stored. Default is "Parrival". The idea is if multiple phases are being analyzed each phase should have a different key set by this argument (e.g. if PP were also being analyzed in the same workflow you might use a key like "PParrival"). :return: a copy of data_object with the the results stored under the key defined by the metadata_output_key argument. """ if not isinstance(data_object, TimeSeries): raise TypeError("arrival_snr: input arg0 must be a TimeSeries") if data_object.dead(): return data_object # here we try to recover incorrect window usage # Note we always make a deep copy for internal use signal_window = TimeWindow(signal_window) noise_window = TimeWindow(noise_window) if data_object.time_is_UTC(): # should work for anything but an absurd test near epoch 0 which should happen if signal_window.end < data_object.t0: if arrival_time_key in data_object: atime = data_object[arrival_time_key] signal_window = signal_window.shift(atime) noise_window = noise_window.shift(atime) # could test again here but we let FD_snr_estimator handle that error else: message = ( "Input has UTC time standard but windows appear to be relative time\n" + "Tried to recover with time set with key=" + arrival_time_key + " but it was not defined in this datum\n" + "Cannot compute snr metrics" ) data_object.elog.log_error( "arrival_snr", message, ErrorSeverity.Complaint ) return data_object [snrdata, elog] = FD_snr_estimator( data_object, noise_window=noise_window, noise_spectrum_engine=noise_spectrum_engine, signal_window=signal_window, signal_spectrum_engine=signal_spectrum_engine, band_cutoff_snr=band_cutoff_snr, signal_detection_minimum_bandwidth=signal_detection_minimum_bandwidth, tbp=tbp, ntapers=ntapers, high_frequency_search_start=high_frequency_search_start, fix_high_edge=fix_high_edge, poles=poles, perc=perc, optional_metrics=optional_metrics, save_spectra=save_spectra, ) if elog.size() > 0: data_object.elog += elog # FD_snr_estimator returns an empty dictionary if the snr # calculation fails or indicates no signal is present. This # block combines that with the kill_null_signals in this logic if len(snrdata) > 0: snrdata["phase"] = phase_name data_object[metadata_output_key] = snrdata elif kill_null_signals: data_object.elog.log_error( "arrival_snr", "FD_snr_estimator flagged this datum as having no detectable signal", ErrorSeverity.Invalid, ) data_object.kill() return data_object
[docs]def broadband_snr_QC( data_object, component=2, noise_window=TimeWindow(-130.0, -5.0), noise_spectrum_engine=None, signal_window=TimeWindow(-5.0, 120.0), signal_spectrum_engine=None, band_cutoff_snr=2.0, signal_detection_minimum_bandwidth=6.0, tbp=4.0, ntapers=6, high_frequency_search_start=2.0, fix_high_edge=True, kill_null_signals=False, poles=3, perc=95.0, phase_name="P", metadata_output_key="Parrival", optional_metrics=[ "snr_stats", "filtered_envelope", "filtered_L2", "filtered_Linf", "filtered_MAD", "filtered_perc", ], save_spectra=False, use_measured_arrival_time=False, measured_arrival_time_key="Ptime", taup_model=None, source_collection="source", receiver_collection=None, ): """ Compute a series of metrics that can be used for quality control filtering of seismic phase data. This function is intended as a workhorse to be used for low-level, automated QC of broadband data when the the data set is defined by signals linked to a timeable seismic phase. It can be thought of as a version of a related function called "arrival_snr" with some additional features. See the docstring for that function for what those base features are. Features this function adds not found in arrival_snr are: 1. This function allows Seismogram inputs. Only TimeSeries data are handled by arrival_snr. 2. This function provides an option to compute arrival times from source coordinates, receiver coordinates, and a handle to an obspy tau-p calculator. Otherwise it behaves the same. Note both functions may or may not choose to interact with the function save_snr_arrival. If you want to save the computed metrics into a form more easily fetched your workflow should extract the contents of the python dictionary stored under the metadata_output_key tag and save the result to MongoDB with the save_snr_arrival function. That option is most useful for test runs on a more limited data set to sort out values of the computed metrics that are appropriate for a secondary winnowing of the your data. See User's Manual for more on this concept. The input of arg 0 (data_object) can be either a TimeSeries or a Seismogram object. If a Seismogram object is passed the "component" argument is used to extract the specified single channel from the Seismogram object and that component is used for processing. That is necessary because all the algorithms used are single channel algorithms. To use this function on all components use a loop over components BUT make sure you use a unique value for the argument "metadata_output_key" for each component. Note this will also produce multiple documents per input datum. The type of the data_object also has a more subtle implication the user must be aware of. That is, in the MsPASS schema we store receiver coordinates in one of two different collections: "channel" for TimeSeries data and "site" for Seismogram data. When such data are loaded the generic keys like lat are always converted to names like channel_lat or site_lat for TimeSeries and Seismogram data respectively. This function uses the data type to set that naming. i.e. if the input is TimeSeries it tries to fetch the latitude data as channel_lat while if it the input is a Seismogram it tries to fetch site_lat. That is true of all coordinate data loaded by normalization from a source and receiver collection. The following args are passed directly to the function FD_snr_estimator: noise_window, noise_spectrum_engine, signal_window, signal_spectrum_engine, band_cutoff_snr, signal_detection_minimum_bandwidth, tbp, ntapers, high_frequency_search_start, fix_high_edge, npoles, perc, optional_metrics, and save_spectrum. Below we only describe arguments added by this function: data_object, phase_name="P", metadata_output_key="Parrival", use_measured_arrival_time=False, measured_arrival_time_key="Ptime", taup_model=None, component=2, source_collection="source", receiver_collection=None, :param data_object: An atomic MsPASS data object to which the algorithms requested should be applied. Currently that means a TimeSeries or Seismogram object. Any other input will result in a TypeError exception. As noted above for Seismogram input the component argument defines which data component is to be used for the snr computations. :param component: integer (0, 1, or 2) defining which component of a Seismogram object to use to compute the requested snr metrics. This parameter is ignored if the input is a TimeSeries. :param metadata_output_key: string defining the key where the results are to be posted to the returned data_object. The results are always posted to a python dictionary and then posted to the returned data_object with this key. Default is "Parrival" :param use_measured_arrival_time: boolean defining the method used to define the time reference for windowing used for snr calculations. When True the function will attempt to fetch a phase arrival time with the key defined by the "measured_arrival_time_key" argument. In that mode if the fetch fails the data_object will be killed and an error posted to elog. That somewhat brutal choice was intentional as the expectation is if you want to use measured arrival times you don't want data where there are no picks. The default is True to make the defaults consistent. The reason is that the tau-p calculator handle is passed to the function when using model-based travel times. There is no way to default that so it defaults to None. :param measured_arrival_time_key: is the key used to fetch a measured arrival time. This parameter is ignored if use_measured_arrival_time is False. :param taup_model: when use_measured_arrival_time is False this argument is required. It defaults as None because there is no way the author knows to initialize it to anything valid. If set it MUST be an instance of the obspy class TauPyModel (https://docs.obspy.org/packages/autogen/obspy.taup.tau.TauPyModel.html#obspy.taup.tau.TauPyModel) Mistakes in use of this argument can cause a MsPASSError exception to be thrown (not logged thrown as a fatal error) in one of two ways: (1) If use_measured_arrival_time is False this argument must be defined, and (2) if it is defined it MUST be an instance of TauPyModel. :param source_collection: normalization collection for source data. The default is the MsPASS name "source" which means the function will try to load the source hypocenter coordinates (when required) as source_lat, source_lon, source_depth, and source_time from the input data_object. The id of that document is posted to the output dictionary stored under metadata_output_key. :param receiver_collection: when set this name will override the automatic setting of the expected normalization collection naming for receiver functions (see above). The default is None which causes the automatic switching to be involked. If it is any other string the automatic naming will be overridden. :return: the data_object modified by insertion of the snr QC data in the object's Metadata under the key defined by metadata_output_key. """ if data_object.dead(): return data_object if isinstance(data_object, TimeSeries): # We need to make a copy of a TimeSeries object to assure the only # thing we change is the Metadata we add to the return data_to_process = TimeSeries(data_object) if receiver_collection: rcol = receiver_collection else: rcol = "channel" elif isinstance(data_object, Seismogram): if component < 0 or component > 2: raise MsPASSError( "arrival_snr_QC: usage error. " + "component parameter passed with illegal value={n}\n".format( n=component ) + "Must be 0, 1, or 2", ErrorSeverity.Fatal, ) data_to_process = _ExtractComponent(data_object, component) if receiver_collection: rcol = receiver_collection else: rcol = "site" else: raise MsPASSError( "arrival_snr_QC: received invalid input data\n" + "Input must be either TimeSeries or a Seismogram object", ErrorSeverity.Fatal, ) if use_measured_arrival_time: arrival_time = data_object[measured_arrival_time_key] else: # This test is essential or python will throw a more obscure, # generic exception if taup_model is None: raise MsPASSError( "arrival_snr_QC: usage error. " + "taup_model parameter is set None but use_measured_arrival_time is False\n" + "This gives no way to define processing windows. See docstring", ErrorSeverity.Fatal, ) source_lat = data_object[source_collection + "_lat"] source_lon = data_object[source_collection + "_lon"] source_depth = data_object[source_collection + "_depth"] source_time = data_object[source_collection + "_time"] receiver_lat = data_object[rcol + "_lat"] receiver_lon = data_object[rcol + "_lon"] delta = locations2degrees(source_lat, source_lon, receiver_lat, receiver_lon) arrival = taup_model.get_travel_times( source_depth_in_km=source_depth, distance_in_degree=delta, phase_list=[phase_name], ) arrival_time = source_time + arrival[0].time taup_arrival_phase = arrival[0].phase.name # not sure if this will happen but worth trapping it as a warning if # it does if phase_name != taup_arrival_phase: data_object.elog.log_error( "arrival_snr_QC", "Requested phase name=" + phase_name + " does not match phase name tag returned by obpsy taup calculator=" + taup_arrival_phase, "Complaint", ) if data_to_process.time_is_UTC(): data_to_process.ator(arrival_time) [snrdata, elog] = FD_snr_estimator( data_to_process, noise_window=noise_window, noise_spectrum_engine=noise_spectrum_engine, signal_window=signal_window, signal_spectrum_engine=signal_spectrum_engine, band_cutoff_snr=band_cutoff_snr, signal_detection_minimum_bandwidth=signal_detection_minimum_bandwidth, tbp=tbp, ntapers=ntapers, high_frequency_search_start=high_frequency_search_start, fix_high_edge=fix_high_edge, poles=poles, perc=perc, optional_metrics=optional_metrics, save_spectra=save_spectra, ) if elog.size() > 0: data_object.elog += elog # FD_snr_estimator returns an empty dictionary if the snr # calculation fails or indicates no signal is present. This # block combines that with the kill_null_signals in this logic if len(snrdata) == 0: data_object.elog.log_error( "broadband_snr_QC", "FD_snr_estimator flagged this datum as having no detectable signal", ErrorSeverity.Invalid, ) if kill_null_signals: data_object.kill() return data_object snrdata["phase"] = phase_name snrdata["snr_arrival_time"] = arrival_time snrdata["snr_signal_window_start"] = arrival_time + signal_window.start snrdata["snr_signal_window_end"] = arrival_time + signal_window.end snrdata["snr_noise_window_start"] = arrival_time + noise_window.start snrdata["snr_noise_window_end"] = arrival_time + noise_window.end # These cross-referencing keys may not always be defined when a phase # time is based on a pick so we add these cautiously scol_id_key = source_collection + "_id" rcol_id_key = rcol + "_id" if data_object.is_defined(scol_id_key): snrdata[scol_id_key] = data_object[scol_id_key] if data_object.is_defined(rcol_id_key): snrdata[rcol_id_key] = data_object[rcol_id_key] # Note we add this result to data_object NOT data_to_process because that # is not always the same thing - for a TimeSeries input it is a copy of # the original but it may have been altered while for a Seismogram it is # an extracted component data_object[metadata_output_key] = snrdata return data_object
[docs]def save_snr_arrival( db, doc_to_save, wfid, wf_collection="wf_Seismogram", save_collection="arrival", subdocument_key=None, use_update=False, update_id=None, validate_wfid=False, ) -> ObjectId: """ This function is a companion to broadband_snr_QC. It handles the situation where the workflow aims to post calculated snr metrics to an output database (normally in the "arrival" collection but optionally to a parent waveform collection. ). The alternative models as noted in the User's Manual is to use the kill option to broadband_snr_QC followed by a call to the filter method of bag/rdd to remove the deadwood and reduce the size of data passed downstream in large parallel workflow. That case is better handled by using broadband_snr_QC directly. How the data are saved is controlled by four parameters: save_collection, use_update, update_id and subdocument_key. They interact in a way that is best summarized as a set of cases that procuce behavior you may want: 1. If save_collection is not the parent waveform collection, behavior is driven by use_update combined with subdocument_key. When use_update is False (default) the contents of doc_to_save will be used to define a new document in save_collection with the MongoDB insert_one method. 2. When use_update is True the update_id will be assumed to be defined and point be the ObjectId of an existing document in save_collection. Specifically that id will be used as the query clause for a call the insert_one method. This combination is useful if a workflow is being driven by arrival data stored in save_collection created, for example, for a css3.0 a event->origin->assoc->arrival catalog of arrival picks. A variant of this mode will occur if the argument subdocument_key is defined (default is None). If you define subgdocument_key the contents of doc_to_save will be stored as a subdocument in save_collection accessible with the key defined by subdocument_key. 3. If save_collection is the same as the parent waveform collection (defined via the input parameter wf_collection) the value of use_update will be ignored and only an update will be attempted. The reason is that if one tried to save the contents of doc_to_save to a waveform collection would corrupt the database by have a bunch of documents that that could not be used to construct a valid data object (the normal use for one of the wf collections). :param db: MongoDB database handle to use for transactions that are the focus of this algorithm. :param doc_to_save: python dictionary containing data to be saved. Where and now this is saved is controlled by save_collection, use_update, and subdocument_key as described above. :param wfid: waveform document id of the parent datum. It is assumed to be an ObjectId of linking the data in doc_to_save to the parent. It is ALWAYS saved in the output with the key "wfid". :param wf_collection: string defining the collection from which the datum from which the data stored in doc_to_save are associated. wfid is assumed define a valid document in wf_collection. Default is "wf_Seismogram". :param save_collection: string defining the collection name to which doc_to_save should be pushed. See above for how this name interacts with other parameters. :param subdocument_key: Optional key for saving doc_to_save as a a subdocument in the save_collection. Default is None which means the contents of doc_to_save will be saved (or update) as is. For saves to (default) arrival collection this parameter should normally be left None, but is allowed. If save_collection is the parent waveform collection setting this to some sensible key is recommended to avoid possible name collisions with waveform Metadata key-value pairs. Default is None which means no subdocuments are created. :param use_update: boolean controlling whether or not to use updates or inserts for the contents of doc_to_save. See above for a description of how this interacts with other arguments to this function. Default is False. :param update_id: ObjectId of target document when running in update mode. When save_collection is the same as wf_collection this parameter is ignored and the required id passed as wfid will be used for the update key matching. Also ignored with the default behavior if inserting doc_to_save as a new document. Required only if running with a different collection and updating is desired. The type example noted above would be updates to existing arrival informations created from a css3.0 database. :param validate_wfid: When set True the id defined by the required argument wfid will be validated by querying wf_collection. In this mode if wfid is not found the function will silently return None. Callers using this mode should handle that condition. :return: ObjectId of saved record. None if something went wrong and nothing was saved. """ dbwfcol = db[wf_collection] if validate_wfid: query = {"_id": wfid} ndocs = dbwfcol.count_documents(query) if ndocs == 0: return None dbcol = db[save_collection] update_mode = use_update # silently enforce update mode if saving back to waveform collection if wf_collection == save_collection: update_mode = True upd_id = wfid else: upd_id = update_id # this block sets doc for save with or without subdoc option doc = dict(doc_to_save) # this acts like a deep copy doc["wfid"] = wfid if subdocument_key: # The constructor for a dict is necesary to assure a deepcopy here doc[subdocument_key] = dict(doc) if update_mode: # note update is only allowed on the parent wf collection filt = {"_id": upd_id} update_clause = {"$set": doc} dbcol.update_one(filt, update_clause) save_id = upd_id else: save_id = dbcol.insert_one(doc).inserted_id return save_id