Source code for mspasspy.algorithms.resample

from abc import ABC, abstractmethod
from mspasspy.util.decorators import mspass_func_wrapper
from mspasspy.ccore.utility import MsPASSError, ErrorSeverity, dmatrix
from mspasspy.ccore.seismic import (
    TimeSeries,
    Seismogram,
    TimeSeriesEnsemble,
    SeismogramEnsemble,
    DoubleVector,
)
from mspasspy.util.converter import (
    Trace2TimeSeries,
    TimeSeries2Trace,
    Seismogram2Stream,
    Stream2Seismogram,
    SeismogramEnsemble2Stream,
    Stream2SeismogramEnsemble,
)

import numpy as np
from scipy import signal


[docs]class BasicResampler(ABC): """ Base class for family of resampling operators. All this class really does is define the interface and standardize the target of the operator. A key concept of this family of operator is they are intended to be used in a map operator to regularize the sample rate to a constant. Hence, the base class defines that constant output sample rate (alternatively the sample interval, dt). ALL implementations must recognize a couple fundamental concepts: 1. This is intended to ONLY be used on waveform segments. The problem of resampling continuous data requires different algorithms. The reason is boundary effects. All implementations are subject to edge transients. How the implementation does or does not handle that issue is viewed as an implementation detail, 2. Downsampling ALWAYS requires some method to avoid aliasing of the output. How that is done is considered an implementation detail. This is a sketch of an algorithm is pseudopython code showing how a typical instance of this class (in the example ScipyResampler) would be used in parallel workflow sketch: .. rubric:: Example resamp_op = ScipyResampler(10.0) # target sample rate of 10 sps cursor = db.TimeSeries.find({}) bag = read_distributed_data(cursor,collection="wf_TimeSeries") bag = bag.map(resamp_op.resample) bag.map(db.save_data()) bag.compute() A point to emphasize is the model is to generate the operator through a constructor (ScipResampler in the example) that defines the target sample rate. All data passed through that operator through the map operator call will be returned to create a bag/rdd with a uniform sample rate. All implementations should also follow the MsPASS rule for parallel algorithms to kill data that cannot be handled and not throw exceptions unless the whole usage is wrong. """ def __init__(self, dt=None, sampling_rate=None): if dt and sampling_rate: raise MsPASSError( "BasicResample: usage error. Specify either dt or sampling_rate. You defined both", ErrorSeverity.Fatal, ) if dt: self.dt = dt self.samprate = sampling_rate elif sampling_rate: self.dt = 1.0 / sampling_rate self.samprate = sampling_rate
[docs] def target_samprate(self): return self.samprate
[docs] def target_dt(self): return self.dt
[docs] def dec_factor(self, d): """ All implementations of decimation algorithms should use this method to test if resampling by decimation is feasible. A decimation operator only works for downsampling with the restriction that the ouptut sample interval is an integer multiple of the input sample interval. The function returns a decimation factor to use for atomic data d being tested. The algorithm uses the numpy "isclose" function to establish if the sample interval is feasible. If so it returns the decimation factor as an integer that should be used on d. Not implementations should handle a return of 1 specially for efficiency. A return of 1 means no resampling is needed. A return of 0 or -1 is used for two slightly different cases that indicate a decimation operation is not feasible. A return of 0 should be taken as a signal that the data requires upsampling to match the target sampling rate (interval). A return of -1 means the data require downsampling but the a decimator operator is not feasible. (e.g. needing to create 10 sps data from 25 sps input.) :param d: input mspass data object to be tested. All that is actually required is d have a "dt" attribute (i.e. d.dt is defined) that is the sample interval for that datum. :type d: assumed to be a MsPASS atomic data object for which the dt attribute is defined. This method has no safeties to test input type. It will throw an exception if d.dt does not resolve. """ # internal use guarantees this can only be TimeSeries or Seismogram # so this resolves d_dt = d.dt float_dfac = self.dt / d_dt int_dfac = int(float_dfac) # This perhaps should use a softer constraint than default if np.isclose(float_dfac, float(int_dfac)): return int_dfac elif int_dfac == 0: return 0 else: return -1
[docs] @abstractmethod def resample(self, mspass_object): """ Main operator a concrete class must implement. It should accept any mspass data object and return a clone that has been resampled to the sample interface defined by this base class. """ pass
[docs]class ScipyResampler(BasicResampler): """ This class is a wrapper for the scipy resample algorithm. Obspy users should note that the Trace and Stream method called "resample" is only a light wrapper to apply the scipy resample function. The algorithm and its limitations are described in the scipy documentation you can easily find with a web search. A key point about this algorithm is that unlike decimate it allows resampling to something not an integer multiple or division from the input OR if you need to upsample data to match the rest of the data set (Note that is not usually a good idea unless the upsampling is followed by a decimator to get all data to a lower, uniform sample rate.) A type example where that is essential is some old OBS data from Scripps instruments that had a sample rate that was a multiple of one of the more standard rates like 20 or 100. Such data can be downsampled immediately too something like 10 sps with this operator or upsampled to something like 50 and then downsampled to something like 10 with a factor of 5 decimator. We emphasize a nice feature of the scipy implementation is that it automatically applies a rational antialiasing filter when downsampling, If, however, you need to do something like regularize a data set with irregular sample rates but preserve a common upper frequency response controlled at the high frequency end by digizer antialias filters (e.g. LH channels from Q330 data) you will need to crack the scipy documentation on setting up a custom antialias filter using FIR filters defined through the window argument. All common digitizer FIR filter coefficients can be found in appropriate response files. That should, in principle, be feasible but mspass developers have not tested that hypothesis. The concept of the window argument in this constructor is idential to that described in the documentation for scipy.signal.resample. The value passed, in fact, is used as the argument whenever the scipy function is called. Note the other optional arguments to scipy resample are always defaulted because the current default apply to all cases we handle. Be careful if resample changes. The primary method of this class is a concrete implementation of the resample method. """ def __init__(self, sampling_rate, window="hann"): """ """ super().__init__(sampling_rate=sampling_rate) self.window = window
[docs] def resample(self, mspass_object): """ Applies the scipy.signal.resample function to all data held in a mspass container passed through arg0 (mspass_object). This method will accept all supported MsPASS datat objects: TimeSeries, Seismogram, TimeSeriesEnsemble, and SeismogramEnsemble. For Ensembles the method is called recursively on each of the members. The method returns mspass_object with the sample data altered by the operator defined by a particular instance, which is defined exclusively by the target sample rate for the output. All metadata will be clone without checking. If the metadata have attributes linked to the sample interval the metadata of the result may not match the data. If the input is marked dead it will be returned immediately with no change. """ # We do this test at the top to avoid having returns testing for # a dead datum in each of the if conditional blocks below if isinstance( mspass_object, (TimeSeries, Seismogram, TimeSeriesEnsemble, SeismogramEnsemble), ): if mspass_object.dead(): return mspass_object else: message = "ScipyResampler.resample: received unsupported data type=" + str( type(mspass_object) ) raise TypeError(message) if isinstance(mspass_object, TimeSeries): data_time_span = ( mspass_object.endtime() - mspass_object.t0 + mspass_object.dt ) n_resampled = int(data_time_span * self.samprate) rsdata = signal.resample( mspass_object.data, n_resampled, window=self.window ) mspass_object.set_npts(n_resampled) mspass_object.dt = self.dt mspass_object["sampling_rate"] = self.samprate # We have to go through this conversion to avoid TypeError exceptions # i.e we can't just copy the entire vector rsdata to the data vector dv = DoubleVector(rsdata) mspass_object.data = dv elif isinstance(mspass_object, Seismogram): data_time_span = ( mspass_object.endtime() - mspass_object.t0 + mspass_object.dt ) n_resampled = int(data_time_span * self.samprate) rsdata = signal.resample( mspass_object.data, n_resampled, window=self.window, axis=1 ) mspass_object.set_npts(n_resampled) mspass_object.dt = self.dt mspass_object["sampling_rate"] = self.samprate # We have to go through this conversion to avoid TypeError exceptions # i.e we can't just copy the entire vector rsdata to the data vector dm = dmatrix(rsdata) mspass_object.data = dm else: # The else above is equivalent to the following: # elif isinstance(mspass_object,(TimeSeriesEnsemble,SeismogramEnsemble)): # Change if additional data object support is added for d in mspass_object.member: self.resample(d) return mspass_object
[docs]class ScipyDecimator(BasicResampler): """ This class defines a generic operator where a decimator can be used to downsample any input data to a common sample rate. A decimator requires the ratio of the input to output sample rate to be an integer (equivalently the ratio of the output sample interval to the input sample interval). The operator will fail on any data it receives that are irregular in that sense. For example, 10 sps data can be created by downsampling 40 sps data by a factor of 4. In constract, 10 sps can not be created by decimation of 25 sps data because the ratio is 2.5 (not an integer). This operator satisfies the concept defined in BasicResampler. That is, a particular concrete instance once constructed will define an operator that will resample any input data to a common sample rate/interval. Because of the requirement of this algorithm that the sample intervals/rates are related by integers the operator has to be able to handle irregular sample rate data. The algorithm is brutal and will kill any datum for which the integer test fails and post an elog message. This operator is really little more than a wrapper around a scipy function with a similar same name (scipy.signal.decimate). The resample method handles any supported MsPASS data object type but will fail with a TypeError if it receives any other data type. Be aware decimators all have unavoidable edge effects. The anitialias filter that has to be applied (you can get garbage otherwise) will always produce an edge transient. A key to success with any downsampling operator is to always have a pad zone if possible. That is, you start with a longer time window than you will need for final processing and discard the pad zone when you enter the final stage. Note that is actually true of ALL filtering. The constructor has almost the same arguments defined in the documentation page for scipy.signal.decimate. The only exception is the axis argument. It needs to be defined internally. For scalar data we pass 0 while for three component data we sent it 1 which is means the decimator is applied per channel. """ def __init__(self, sampling_rate, n=None, ftype="iir", zero_phase=True): """ """ super().__init__(sampling_rate=sampling_rate) self.ftype = ftype self.zero_phase = zero_phase self.order = n def _make_illegal_decimator_message(self, error_code, data_dt): """ Private method to format a common message if the data's sample interval, data_dt, is not feasible to produce by decimation. The error message is the return """ if error_code == 0: message = "Data sample interval={ddt} is smaller than target dt={sdt}. This operator can only downsample".format( ddt=data_dt, sdt=self.dt ) else: message = ( "Data sample interval={ddt} is not an integer multiple of {sdt}".format( ddt=data_dt, sdt=self.dt ) ) return message
[docs] def resample(self, mspass_object): """ Implementation of required abstract method for this operator. The only argument is mspass_object. The operator will downsample the contents of the sample data container for any valid input. If the input is not a mspass data object (i.e. atomic TimeSeries or Seismogram) or one of the enemble objects it will throw a TypeError exception. Note for ensembles the algorithm simply applies this method in a loop over all the members of the ensemble. Be aware that any members of the ensemble cannot be resampled to the target sampling frequency (interval) they will be killed. For example, if you are downsampling to 20 sps and you have 25 sps data in the ensemble the 25 sps data will be killed on output with an elog message posted. Returns an edited clone of the input with revised sample data but no changes to any Metadata. """ # We do this test at the top to avoid having returns testing for # a dead datum in each of the if conditional blocks below if isinstance( mspass_object, (TimeSeries, Seismogram, TimeSeriesEnsemble, SeismogramEnsemble), ): if mspass_object.dead(): return mspass_object else: message = "ScipyDecimator.resample: received unsupported data type=" + str( type(mspass_object) ) raise TypeError(message) if isinstance(mspass_object, TimeSeries): decfac = self.dec_factor(mspass_object) if decfac <= 0: mspass_object.kill() message = self._make_illegal_decimator_message(decfac, mspass_object.dt) mspass_object.elog.log_error( "ScipyDecimator.resample", message, ErrorSeverity.Invalid ) else: dsdata = signal.decimate( mspass_object.data, decfac, n=self.order, ftype=self.ftype, zero_phase=self.zero_phase, ) dsdata_npts = len(dsdata) mspass_object.set_npts(dsdata_npts) mspass_object.dt = self.dt mspass_object["sampling_rate"] = self.samprate # We have to go through this conversion to avoid TypeError exceptions # i.e we can't just copy the entire vector rsdata to the data vector mspass_object.data = DoubleVector(dsdata) elif isinstance(mspass_object, Seismogram): decfac = self.dec_factor(mspass_object) if decfac <= 0: mspass_object.kill() message = self._make_illegal_decimator_message(decfac, mspass_object.dt) mspass_object.elog.log_error( "ScipyDecimator.resample", message, ErrorSeverity.Invalid ) else: # note axis=1 means apply the decimator along the column # index - that means by channel. dsdata = signal.decimate( mspass_object.data, decfac, axis=1, n=self.order, ftype=self.ftype, zero_phase=self.zero_phase, ) # Seismogram stores data as a 3xnpts matrix. numpy # uses the shape attribute to hold rowsxcolumns msize = dsdata.shape dsdata_npts = msize[1] mspass_object.set_npts(dsdata_npts) mspass_object.dt = self.dt mspass_object["sampling_rate"] = self.samprate # We have to go through this conversion to avoid TypeError exceptions # i.e we can't just copy the entire vector rsdata to the data vector mspass_object.data = dmatrix(dsdata) else: # else here is equivalent to this: # elif isinstance(mspass_object,(TimeSeriesEnsemble,SeismogramEnsemble)): # Change if we add support for additional data objects like gather # version of ensemble currently under construction for d in mspass_object.member: d = self.resample(d) return mspass_object
[docs]@mspass_func_wrapper def resample( mspass_object, decimator, resampler, verify_operators=True, object_history=False, alg_name="resample", alg_id=None, dryrun=False, inplace_return=False, function_return_key=None, ): """ Resample any valid data object to a common sample rate (sample interval). This function is a wrapper that automates handling of resampling. Its main use is in a dask/spark map operator where the input can be a set of irregularly sampled data and the output is required to be at a common sample rate (interval). The problem has some complexity because decimation is normally the preferred method of resampling when possible due to speed and more predictable behavior. The problem is that downsampling by decimation is only possible if the output sampling interval is an integer multiple of the input sample interval. With modern seismology data that is usually possible, but there are some common exceptions. For example, 10 sps cannot be created from 25 sps by decimation. The algorithm tests the data sample rate and if decimation is possible it applies a decimation operator passed as the argument "decimator". If not, it calls the operator "resampler" that is assumed to be capable of handling any sample rate change. The two operators must have been constructed with the same output target sampling frequency (interval). Both must also be a subclass of BasicResampler to match the api requirements. The parameters object_history, alg_name, alg_id, dryrun, inplace_return, and function_return_key are handled by the decorator called mspass_func_wrapper used by this function. See the docstring for mspass_func_wrapper for the generic use of those parameters. :param mspass_object: mspass datum to be resampled :type mspass_object: Must a TimeSeries, Seismogram, TimeSeriesEnsemble, or SeismogramEnsemble object. :param decimator: decimation operator. :type decimator: Must be a subclass of BasicResampler :param resampler: resampling operator :type resampler: Must be a subclass of BasicResampler :param verify_operators: boolean controlling whether safety checks are applied to inputs. When True (default) the contents of decimator and resampler are verified as subclasses of BasicResampler and the function tests if the target output sampling frequency (interval) of both operators are the same. The function will throw an exception if any of the verify tests fail. Standard practice should be to verify the operators and valid before running a large workflow and running production with this arg set False for efficiency. That should acceptable in any case I can conceive as once the operators are defined in a parallel workflow they should be invariant for the entire application in a map operator. """ if verify_operators: if not isinstance(decimator, BasicResampler): raise TypeError( "resample: decimator operator (arg1) must be subclass of BasicResampler" ) if not isinstance(resampler, BasicResampler): raise TypeError( "resample: resampler operator (arg2) must be subclass of BasicResampler" ) if not np.isclose(decimator.target_dt(), resampler.target_dt()): raise MsPASSError( "resample: decimator and resampler must have the same target sampling rate", ErrorSeverity.Fatal, ) if isinstance(mspass_object, (TimeSeries, Seismogram)): # This method returns -1 if decimation is not possible because # sampling frequency is not an integer multiple of the target # df defined in decimator. Use that as switch to enable # resampling decfac = decimator.dec_factor(mspass_object) if decfac == 1: return mspass_object elif decfac > 0: return decimator.resample(mspass_object) else: return resampler.resample(mspass_object) elif isinstance(mspass_object, (TimeSeriesEnsemble, SeismogramEnsemble)): if mspass_object.dead(): return mspass_object # I tried to do this loop with recursion but spyder kept # flagging it as an error - I'm not sure it would be. # The logic for atomic data is simple anyway and this # might actually be faster avoiding the function calls nmembers = len(mspass_object.member) for i in range(nmembers): d = mspass_object.member[i] decfac = decimator.dec_factor(d) # Note when decfac is 1 the current member is not altered if decfac > 1: mspass_object.member[i] = decimator.resample(d) elif decfac <= 0: mspass_object.member[i] = resampler.resample(d) return mspass_object elif mspass_object is None: # Handle this automatically for robustness. Just silently return return mspass_object else: # It should be an exception if we get anything else raise TypeError("resample: arg0 must be a MsPASS data object")