Source code for mspasspy.util.converter

"""
Functions for converting to and from MsPASS data types.
"""
from typing import Collection
import numpy as np
import obspy.core
import collections
import pandas as pd

try:
    import dask.dataframe as daskdf

    __mspasspy_has_dask = True
except ImportError:
    __mspasspy_has_dask = False

from mspasspy.ccore.utility import Metadata, AntelopePf, MsPASSError, ErrorSeverity
from mspasspy.ccore.seismic import (
    _CoreSeismogram,
    Seismogram,
    TimeSeries,
    TimeSeriesEnsemble,
    SeismogramEnsemble,
    Keywords,
)
from mspasspy.ccore.algorithms.basic import _ExtractComponent


[docs]def dict2Metadata(dic): """ Function to convert Python dict data to Metadata. pymongo returns a Python dict container from find queries to any collection. Simple type in returned documents can be converted to Metadata that are used as headers in the C++ components of mspass. :param dict: Python dict to convert :type dict: dict :return: Metadata object translated from d :rtype: :class:`~mspasspy.ccore.Metadata` """ return Metadata(dic)
# dict.toMetadata = dict2Metadata
[docs]def Metadata2dict(md): """ Converts a Metadata object to a Python dict. This is the inverse of dict2Metadata. It converts a Metadata object to a Python dict. Note that Metadata behavies like dict, so this conversion is usually not necessay. :param md: Metadata object to convert. :type md: :class:`~mspasspy.ccore.Metadata` :return: Python dict equivalent to md. :rtype: dict """ return dict(md)
Metadata.todict = Metadata2dict
[docs]def AntelopePf2dict(pf): """ Converts a AntelopePf object to a Python dict. This converts a AntelopePf object to a Python dict by recursively decoding the tbls. :param pf: AntelopePf object to convert. :type md: :class:`~mspasspy.ccore.AntelopePf` :return: Python dict equivalent to md. :rtype: dict """ keys = pf.keys() tbl_keys = pf.tbl_keys() arr_keys = pf.arr_keys() data = collections.OrderedDict() for key in keys: val = pf.get(key) data[key] = val for key in tbl_keys: val = pf.get_tbl(key) data[key] = val for key in arr_keys: pf_branch = pf.get_branch(key) branch_dict = AntelopePf2dict(pf_branch) data[key] = branch_dict return data
AntelopePf.todict = AntelopePf2dict
[docs]def TimeSeries2Trace(ts): """ Converts a TimeSeries object to an obspy Trace object. MsPASS can handle scalar data either as an obspy Trace object or as with the mspass TimeSeries object. The capture nearly the same concepts. The main difference is that TimeSeries support the error logging and history features of mspass while obspy, which is a separate package, does not. Obspy has a number of useful algorithms that operate on scalar data, however, so it is frequently useful to switch between Trace and TimeSeries formats. The user is warned, however, that converting a TimeSeries to a Trace object with this function will result in the loss of any error log information. For production runs unless the data set is huge, we recommend saving the intermediate result AFTER calling this function if there is any possibility there are errors posted on any data. We say after because some warning errors from this function may be posted in elog. Since python uses call by reference d may thus be altered. :param ts: is the TimeSeries object to be converted :type ts: :class:`~mspasspy.ccore.TimeSeries` :return: an obspy Trace object from conversion of d. An empty Trace object will be returned if d was marked dead :rtype: :class:`~obspy.core.trace.Trace`b """ dresult = obspy.core.Trace() dresult.dead_mspass = True # Silently return an empty trace object if the data are marked dead now if not ts.live: return dresult # We first deal with attributes in BasicTimeSeries that have to # be translated into an obspy stats dictionary like object dresult.dead_mspass = False dresult.stats["delta"] = ts.dt dresult.stats["npts"] = ts.npts dresult.stats["starttime"] = obspy.core.UTCDateTime(ts.t0) # It appears obspy computes endtime - this throws an AttributeError if # included. Ratained for reference to keep someone from putting this back # dresult.stats['endtime']=obspy.core.UTCDateTime(ts.endtime()) # todo relative time attribute # These are required by obspy but optional in mspass. Hence, we have # to extract them with caution. Note defaults are identical to # Trace constructor # Check for "sampling_rate" attribute if ts.is_defined(Keywords.sampling_rate): sampling_rate = ts["sampling_rate"] # Check if sampling_rate is consistent with 1/dt if abs(sampling_rate - 1.0 / ts.dt) > 1e-6: # Record inconsistency in error log (elog) message = "sampling_rate inconsistent with 1/dt; updating to 1/dt" ts.elog.log_error("TimeSeries2Trace", message, ErrorSeverity.Complaint) # Update sampling_rate to 1/dt ts["sampling_rate"] = 1.0 / ts.dt dresult.stats["sampling_rate"] = 1.0 / ts.dt else: # Set sampling_rate to 1/dt ts["sampling_rate"] = 1.0 / ts.dt dresult.stats["sampling_rate"] = 1.0 / ts.dt if ts.is_defined(Keywords.net): dresult.stats["network"] = ts.get_string(Keywords.net) else: dresult.stats["network"] = "" if ts.is_defined(Keywords.sta): dresult.stats["station"] = ts.get_string(Keywords.sta) else: dresult.stats["station"] = "" if ts.is_defined(Keywords.chan): dresult.stats["channel"] = ts.get_string(Keywords.chan) else: dresult.stats["channel"] = "" if ts.is_defined(Keywords.loc): dresult.stats["location"] = ts.get_string(Keywords.loc) else: dresult.stats["location"] = "" if ts.is_defined("calib"): dresult.stats["calib"] = ts.get_double("calib") else: dresult.stats["calib"] = 1.0 # We have to copy other metadata to preserve them too. That is # complicated by the fact that some (notably endtime) are read only # and will abort the program if we just naively copy them. # The list below are the keys to exclude either because they # are computed by Trace (i.e. endtime) or are already set above do_not_copy = [ "delta", "sampling_rate", "npts", "starttime", "endtime", "network", "station", "channel", "location", "calib", ] for k in ts.keys(): if not (k in do_not_copy): dresult.stats[k] = ts[k] # dresult.data = np.ndarray(ts.npts) # for i in range(ts.npts): # dresult.data[i] = ts.data[i] dresult.data = np.array(ts.data) return dresult
TimeSeries.toTrace = TimeSeries2Trace
[docs]def Seismogram2Stream( sg, chanmap=["E", "N", "Z"], hang=[90.0, 0.0, 0.0], vang=[90.0, 90.0, 0.0] ): # fixme hang and vang parameters """ Convert a mspass::Seismogram object to an obspy::Stream with 3 components split apart. mspass and obspy have completely incompatible approaches to handling three component data. obspy uses a Stream object that is a wrapper around and a list of Trace objects. mspass stores 3C data bundled into a matrix container. This function takes the matrix container apart and produces the three Trace objects obspy want to define 3C data. The caller is responsible for how they handle bundling the output. A very dark side of this function is any error log entries in the part mspass Seismogram object will be lost in this conversion as obspy does not implement that concept. If you need to save the error log you will need to save the input of this function to MongoDB to preserve the errorlog it may contain. :param sg: is the Seismogram object to be converted :type sg: :class:`~mspasspy.ccore.Seismogram` :param chanmap: 3 element list of channel names to be assigned components :type chanmap: list :param hang: 3 element list of horizontal angle attributes (azimuth in degrees) to be set in Stats array of output for each component. (default is for cardinal directions) :type hang: list :param vang: 3 element list of vertical angle (theta of spherical coordinates) to be set in Stats array of output for each component. (default is for cardinal directions) :type vang: list :return: obspy Stream object containing a list of 3 Trace objects in mspass component order. Presently the data are ALWAYS returned to cardinal directions (see above). It will be empty if sg was marked dead :rtype: :class:`obspy.core.stream.Stream` """ dresult = obspy.core.Stream() dresult.dead_mspass = True # Note this logic will silently return an empty Stream object if the # data are marked dead if sg.live: dresult.dead_mspass = False uuids = sg.id() logstuff = sg.elog for i in range(3): ts = _ExtractComponent(sg, i) ts.put_string(Keywords.chan, chanmap[i]) ts.put_double(Keywords.channel_hang, hang[i]) ts.put_double(Keywords.channel_vang, vang[i]) # ts is a CoreTimeSeries but we need to add a few things to # make it mesh with TimeSeries2Trace tsex = TimeSeries(ts, uuids) tsex.elog = logstuff dobspy = TimeSeries2Trace(tsex) dresult.append(dobspy) else: for i in range(3): tc = obspy.core.Trace() tc.dead_mspass = True dresult.append(tc) return dresult
Seismogram.toStream = Seismogram2Stream
[docs]def Trace2TimeSeries(trace, history=None): """ Convert an obspy Trace object to a TimeSeries object. An obspy Trace object mostly maps directly into the mspass TimeSeries object with the stats of Trace mapping (almost) directly to the TimeSeries Metadata object that is a base class to TimeSeries. A deep copy of the data vector in the original Trace is made to the result. That copy is done in C++ for speed (we found a 100+ fold speedup using that mechanism instead of a simple python loop) There is one important type collision in copying obspy starttime and endtime stats fields. obspy uses their UTCDateTime object to hold time but TimeSeries only supports an epoch time (UTCDateTime.timestamp) so the code here has to convert from the UTCDateTime to epoch time in the TimeSeries. Note in a TimeSeries starttime is the t0 attribute. The biggest mismatch in Trace and TimeSeries is that Trace has no concept of object level history as used in mspass. That history must be maintained outside obspy. To maintain full history the user must pass the history maintained externally through the optional history parameter. The contents of history will be loaded directly into the result with no sanity checks. :param trace: obspy trace object to convert :type trace: :class:`~obspy.core.trace.Trace` :param history: mspass ProcessingHistory object to post to result. :return: TimeSeries object derived from obpsy input Trace object :rtype: :class:`~mspasspy.ccore.TimeSeries` """ # The obspy trace object stats attribute only acts like a dictionary # we can't use it directly but this trick simplifies the copy to # mesh with py::dict for pybind11 - needed in TimeSeries constructor below h = dict(trace.stats) # These tests are excessively paranoid since starttime and endtime # are required attributes in Trace, but better save in case # someone creates one outside obspy if Keywords.starttime in trace.stats: t = h[Keywords.starttime] h[Keywords.starttime] = t.timestamp else: # We have to set this to something if it isn't set or # the TimeSeries constructor may abort h[Keywords.starttime] = 0.0 # we don't require endtime in TimeSeries so ignore if it is not set if "endtime" in trace.stats: t = h["endtime"] h["endtime"] = t.timestamp # # these define a map of aliases to apply when we convert to mspass # metadata from trace - we redefined these names but others could # surface as obspy evolves independently from mspass mspass_aliases = dict() mspass_aliases["station"] = Keywords.sta mspass_aliases["network"] = Keywords.net mspass_aliases["location"] = Keywords.loc mspass_aliases["channel"] = Keywords.chan for k in mspass_aliases: if k in h: x = h.pop(k) alias_key = mspass_aliases[k] h[alias_key] = x dout = TimeSeries(h, trace.data) if history != None: dout.load_history(history) dout.set_live() # The following dead_mspass attribute is used by our decorator API # to determine whether an object was dead before the conversion. try: if trace.dead_mspass: dout.live = False except AttributeError: pass return dout
obspy.core.Trace.toTimeSeries = Trace2TimeSeries
[docs]def Stream2Seismogram(st, master=0, cardinal=False, azimuth="azimuth", dip="dip"): """ Convert obspy Stream to a Seismogram. Convert an obspy Stream object with 3 components to a mspass::Seismogram (three-component data) object. This implementation actually converts each component first to a TimeSeries and then calls a C++ function to assemble the complete Seismogram. This has some inefficiencies, but the assumption is this function is called early on in a processing chain to build a raw data set. :param st: input obspy Stream object. The object MUST have exactly 3 components or the function will throw a AssertionError exception. The program is less dogmatic about start times and number of samples as these are handled by the C++ function this python script calls. Be warned, however, that the C++ function can throw a MsPASSrror exception that should be handled separately. :param master: a Seismogram is an assembly of three channels composed created from three TimeSeries/Trace objects. Each component may have different metadata (e.g. orientation data) and common metadata (e.g. station coordinates). To assemble a Seismogram a decision has to be made on which component has the definitive common metadata. We use a simple algorithm and clone the data from one component defined by this index. Must be 0,1, or 2 or the function wil throw a RuntimeError. Default is 0. :param cardinal: boolean used to define one of two algorithms used to assemble the bundle. When true the three input components are assumed to be in cardinal directions (x1=positive east, x2=positive north, and x3=positive up) AND in a fixed order of E,N,Z. Otherwise the Metadata fetched with the azimuth and dip keys are used for orientation. :param azimuth: defines the Metadata key used to fetch the azimuth angle used to define the orientation of each component Trace object. Default is 'azimuth' used by obspy. Note azimuth=hang in css3.0. Cannot be aliased - must be present in obspy Stats unless cardinal is true :param dip: defines the Metadata key used to fetch the vertical angle orientation of each data component. Vertical angle (vang in css3.0) is exactly the same as theta in spherical coordinates. Default is obspy 'dip' key. Cannot be aliased - must be defined in obspy Stats unless cardinal is true :raise: Can throw either an AssertionError or MsPASSrror(currently defaulted to pybind11's default RuntimeError. Error message can be obtained by calling the what method of RuntimeError). """ # First make sure we have exactly 3 components assert len(st) == 3, "Stream length must be EXACTLY 3 for 3-components" assert 0 <= master < 3, "master argument must be 0, 1, or 2" # if all traces are dead in a stream, it should be converted to a dead seismogram try: size = len(st) for i in range(len(st)): if st[i].dead_mspass: size -= 1 if size == 0: res = Seismogram() res.live = False return res except AttributeError: pass # Complicated logic here, but the point is to make sure the azimuth # attribute is set. The cardinal part is to override the test if # we can assume he components are ENZ if not cardinal: if ( azimuth not in st[0].stats or azimuth not in st[1].stats or azimuth not in st[2].stats ): raise RuntimeError( "Stream2Seismogram: Required attribute " + azimuth + " must be in mdother list" ) if not cardinal: if dip not in st[0].stats or dip not in st[1].stats or dip not in st[2].stats: raise RuntimeError( "Stream2Seismogram: Required attribute " + dip + " must be in mdother list" ) # Outer exception handler to handle range of possible errors in # converting each component. Note we pass an empty list for mdother # and aliases except the master bundle = [] for i in range(3): bundle.append(Trace2TimeSeries(st[i])) # The constructor we use below has frozen names hang for azimuth and # vang for what obspy calls dip. Copy to those names - should work # even if the hang and vang are the names although with some inefficiency # assume that would not be normal so avoid unnecessary code if cardinal: bundle[0].put(Keywords.channel_hang, 90.0) bundle[1].put(Keywords.channel_hang, 0.0) bundle[2].put(Keywords.channel_hang, 0.0) bundle[0].put(Keywords.channel_vang, 90.0) bundle[1].put(Keywords.channel_vang, 90.0) bundle[2].put(Keywords.channel_vang, 0.0) else: for i in range(3): hang = bundle[i].get_double(azimuth) bundle[i].put(Keywords.channel_hang, hang) vang = bundle[i].get_double(dip) bundle[i].put(Keywords.channel_vang, vang) # Assume now bundle contains all the pieces we need. This constructor # for _CoreSeismogram should then do the job # This may throw an exception, but we require the caller to handle it # All errors returned by this constructor currenlty leave the data INVALID # so handler should discard anything with an error dout = _CoreSeismogram(bundle, master) res = Seismogram(dout, "INVALID") res.live = True return res
obspy.core.Stream.toSeismogram = Stream2Seismogram
[docs]def TimeSeriesEnsemble2Stream(tse): """ Convert a timeseries ensemble to stream. Always copies all ensemble Metadata to tse members before conversion. That is necessary to avoid loss of data in the case where the only copy is stored in the ensemble's metadata. :param tse: timeseries ensemble :return: converted stream """ res = obspy.core.Stream() # Save the set of keys in ensemble metadata and post the list # to ensmeble md with a special key. The inverse function # looks for that special list and uses it to restore the # ensemble's metadata. That handling is similar to how we handle # dead on atomic data. Note _get_ensemble_md is not a C function but # defined only in the pybind11 wrappers md = tse._get_ensemble_md() # Having this as const in is a maitenanc issue. Shouldn't be a problem # provided no other file uses this key. For now it is self contained in # this module. tse["CONVERTER_ENSEMBLE_KEYS"] = md.keys() # This pushes all conents of ensemble md to all members. That includes # the scratch list we just posted. inverse needs to clear the temp tse.sync_metadata() # Remover the temporary from the ensemble metadata or the return # will be inconsistent with the input tse.erase("CONVERTER_ENSEMBLE_KEYS") for ts in tse.member: res.append(TimeSeries2Trace(ts)) return res
TimeSeriesEnsemble.toStream = TimeSeriesEnsemble2Stream
[docs]def list2Ensemble(l, keys=None): """ Convert a list of TimeSeries or Seismograms to a corresponding type of Ensemble. This function will make copies of all the data, to create a new Ensemble. Note that the Ensemble's Metadata will always be copied from the first member. If the keys argument is specifid, it will only copy the keys specified. If a key does not exist in the first member, it will be skipped and leave a complaint in the error log of the ensemble. :param l: a list of TimeSeries or Seismograms :param keys: a list of keys to be copied from the first object to the Ensemble's Metadata :return: converted TimeSeriesEnsemble or SeismogramEnsemble """ if isinstance(l[0], TimeSeries): res = TimeSeriesEnsemble() elif isinstance(l[0], Seismogram): res = SeismogramEnsemble() else: raise MsPASSError( "list2Ensemble: The elements of the list needs to be TimeSeries or Seismogram but '{}' is given".format( type(l[0]) ), "Fatal", ) for d in l: res.member.append(d) if keys: md = Metadata() for k in keys: try: md[k] = res.member[0][k] except: res.elog.log_error( "list2SeismogramEnsemble", "key: '{}' not found in the first element".format(k), ErrorSeverity.Complaint, ) res.update_metadata(md) else: res.update_metadata(res.member[0]) return res
[docs]def Pf2AttributeNameTbl(pf, tag="attributes"): """ This function will parse a pf file to extract a tbl with a specific key and return a data structure that defines the names and types of each column in the input file. The structure returned is a tuple with three components: 1 (index 0) python array of attribute names in the original tbl order This is used to parse the text file so the order matters a lot. 2 (index 1) parallel array of type names for each attribute. These are actual python type objects that can be used as the second arg of isinstance. 3 (index 2) python dictionary keyed by name field that defines what a null value is for this attribute. :param pf: AntelopePf object to be parsed :param tag: &Tbl tag for section of pf to be parsed. """ tbl = pf.get_tbl(tag) names = [] dtypes = [] nullvalues = [] i = 0 for line in tbl: temp = line.split() names.append(temp[0]) typenamein = temp[1].lower() # allows upper or lower case in names if typenamein == "string" or typenamein == "str": typ = type("foobar") nullval = temp[2] elif typenamein == "integer" or typenamein == "int" or typenamein == "long": typ = type(1) nullval = int(temp[2]) elif ( typenamein == "float" or typenamein == "double" or typenamein == "real" or typenamein == "epochtime" ): typ = type(1.0) nullval = float(temp[2]) elif typenamein == "bool" or typenamein == "boolean": typ = type(True) nullval = bool(temp[2]) else: raise MsPASSError( "parse_attribute_name_tbl: unsupported data type file=" + typenamein, "Fatal", ) dtypes.append(typ) nullvalues.append(nullval) i += 1 nulls = dict() i = 0 for k in names: nulls[k] = nullvalues[i] i += 1 return tuple([names, dtypes, nulls])
[docs]def Textfile2Dataframe( filename, separator="\s+", type_dict=None, header_line=0, attribute_names=None, rename_attributes=None, attributes_to_use=None, one_to_one=True, parallel=False, insert_column=None, ): """ Import a text file representation of a table and store its representation as a pandas dataframe. Note that even in the parallel environment, a dask dataframe will be transfered back to a pandas dataframe for the consistency. :param filename: path to text file that is to be read to create the table object that is to be processed (internally we use pandas or dask dataframes) :param separator: The delimiter used for seperating fields, the default is "\s+", which is the regular expression of "one or more spaces". For csv file, its value should be set to ','. This parameter will be passed into pandas.read_csv or dask.dataframe.read_csv. To learn more details about the usage, check the following links: https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html https://docs.dask.org/en/latest/generated/dask.dataframe.read_csv.html :param type_dict: pairs of each attribute and its type, usedd to validate the type of each input item :param header_line: defines the line to be used as the attribute names for columns, if is < 0, an attribute_names is required. Please note that if an attribute_names is provided, the attributes defined in header_line will always be override. :param attribute_names: This argument must be either a list of (unique) string names to define the attribute name tags for each column of the input table. The length of the array must match the number of columns in the input table or this function will throw a MsPASSError exception. This argument is None by default which means the function will assume the line specified by the "header_line" argument as column headers defining the attribute name. If header_line is less than 0 this argument will be required. When header_line is >= 0 and this argument (attribute_names) is defined all the names in this list will override those stored in the file at the specified line number. :param rename_attributes: This is expected to be a python dict keyed by names matching those defined in the file or attribute_names array (i.e. the panda/dataframe column index names) and values defining strings to use to override the original names. That usage, of course, is most common to override names in a file. If you want to change all the name use a custom attributes_name array as noted above. This argument is mostly to rename a small number of anomalous names. :param attributes_to_use: If used this argument must define a list of attribute names that define the subset of the dataframe dataframe attributes that are to be saved. For relational db users this is effectively a "select" list of attribute names. The default is None which is taken to mean no selection is to be done. :param one_to_one: is an important boolean use to control if the output is or is not filtered by rows. The default is True which means every tuple in the input file will create a single row in dataframe. (Useful, for example, to construct an wf_miniseed collection css3.0 attributes.) If False the (normally reduced) set of attributes defined by attributes_to_use will be filtered with the panda/dask dataframe drop_duplicates method. That approach is important, for example, to filter things like Antelope "site" or "sitechan" attributes created by a join to something like wfdisc and saved as a text file to be processed by this function. :param parallel: When true we use the dask dataframe operation. The default is false meaning the simpler, identical api panda operators are used. :param insert_column: a dictionary of new columns to add, and their value(s). If the content is a single value, it can be passedto define a constant value for the entire column of data. The content can also be a list, in that case, the list should contain values that are to be set, and it must be the same length as the number of tuples in the table. """ if ( attribute_names is not None and len(attribute_names) > 0 ): # If given attribute_names, header_line would be overriden header_line = None if ( header_line is None or header_line < 0 ): # Header_line not given, using attribute_names if parallel and __mspasspy_has_dask: df = daskdf.read_csv(filename, sep=separator, names=attribute_names) else: if parallel: print("WARNING: No dask detected. Running pandas dataframe") df = pd.read_csv(filename, sep=separator, names=attribute_names) else: # header_line is given and attribute_names is not given if parallel and __mspasspy_has_dask: df = daskdf.read_csv(filename, sep=separator, header=header_line) else: if parallel: print("WARNING: No dask detected. Running pandas dataframe") df = pd.read_csv(filename, sep=separator, header=header_line) # Convert data in each column to the type given in type_dict if type_dict is not None: for field, type in type_dict.items(): if field in df: df[field].astype(type) if attributes_to_use is not None: df = df[attributes_to_use] if not one_to_one: df = df.drop_duplicates() # Intentionally left to last as the above can reduce the size of df if rename_attributes is not None: df = df.rename(columns=rename_attributes) # Add new columns to the dataframe if insert_column is not None: for key, val in insert_column.items(): df[key] = val # Transfer dask dataframe back to pandas dataframe to keep the consistency in internal representation if parallel and __mspasspy_has_dask: df = df.compute() return df
def _converter_get_ensemble_keys(ens): """ Small helper for converting from Stream to either of the mspass ensemble objects. Returns a list of keys from the first live member using the internal key CONVERTER_ENSEMBLE_KEYS. Normally should return a list of any ensemble keys. If the special key is not found it returns an empty list. """ for d in ens.member: if d.live: if d.is_defined("CONVERTER_ENSEMBLE_KEYS"): return d["CONVERTER_ENSEMBLE_KEYS"] else: return list()
[docs]def Stream2TimeSeriesEnsemble(stream): """ Convert a stream to timeseries ensemble. :param stream: stream input :return: converted timeseries ensemble """ size = len(stream) tse = TimeSeriesEnsemble() for i in range(size): tse.member.append(Trace2TimeSeries(stream[i])) # potential dead loss problem is resolved by saving the info in converted objects # Handle the ensemble metadata. The little helper we call here # get the list set with CONVERTER_ENSEMBLE_KEYS. enskeys = _converter_get_ensemble_keys(tse) if len(enskeys) > 0: post_ensemble_metadata(tse, enskeys) for d in tse.member: if d.is_defined("CONVERTER_ENSEMBLE_KEYS"): d.erase("CONVERTER_ENSEMBLE_KEYS") return tse
obspy.core.Stream.toTimeSeriesEnsemble = Stream2TimeSeriesEnsemble
[docs]def SeismogramEnsemble2Stream(sge): """ Convert a seismogram ensemble to stream :param sge: seismogram ensemble input :return: stream """ # This uses the same approach as TimeSeriesEnsemblet2Stream to handle # ensemble metadata. See comments there for potential maintenanc issues md = sge._get_ensemble_md() sge["CONVERTER_ENSEMBLE_KEYS"] = md.keys() sge.sync_metadata() # as above remove this temporary from sge or it alters the # input - python gives us a pointer to this thing so it is mutable sge.erase("CONVERTER_ENSEMBLE_KEYS") res = obspy.core.Stream() for sg in sge.member: res += Seismogram2Stream(sg) return res
SeismogramEnsemble.toStream = SeismogramEnsemble2Stream
[docs]def Stream2SeismogramEnsemble(stream): """ Convert a stream to seismogram ensemble. :param stream: stream input :return: converted seismogram ensemble """ size = len(stream) res = SeismogramEnsemble() for i in range(int(size / 3)): res.member.append(Stream2Seismogram(stream[i * 3 : i * 3 + 3], cardinal=True)) # fixme cardinal # Handle the ensemble metadata. The little helper we call here # get the list set with CONVERTER_ENSEMBLE_KEYS. The # code here is identical to that for TimeSeriesEnsemble version # because the ensemble containers have parallel symbols and the atomic # members are close relatives enskeys = _converter_get_ensemble_keys(res) if len(enskeys) > 0: post_ensemble_metadata(res, enskeys) # By default the above leaves copies of the ensemble md in each member # Treat that a ok, but we do need to clear the temporary we posted for d in res.member: if d.is_defined("CONVERTER_ENSEMBLE_KEYS"): d.erase("CONVERTER_ENSEMBLE_KEYS") return res
obspy.core.Stream.toSeismogramEnsemble = Stream2SeismogramEnsemble def _all_members_match(ens, key): """ This is a helper function for below. I scans ens to assure all members of the ensemble have the same value for the requested key. It uses the python operator == for testing. That can fail for a variety of reasons the "match" may be overly restrictive for some types of data linked to key. :param ens: ensemble data to scan. Function will throw a MsPASS error if the data this symbol is associated with is not a mspass ensemble object. :param key: key whose values are to be tested for all members of ens. :return: True of all members match, false if there are any differences. Note if a key is not defined in a live member the result will be false. Dead data are ignored. """ if isinstance(ens, TimeSeriesEnsemble) or isinstance(ens, SeismogramEnsemble): nlive = 0 for d in ens.member: if d.live: if nlive == 0: val0 = d[key] nlive += 1 else: if not key in d: return False val = d[key] if val0 != val: return False nlive += 1 return True else: raise MsPASSError( "_all_members_match: input is not a mspass ensemble object", "Invalid" )
[docs]def post_ensemble_metadata(ens, keys=[], check_all_members=False, clean_members=False): """ It may be necessary to call this function after conversion from an obspy Stream to one of the mspass Ensemble classes. This function is necessary because a mspass Ensemble has a concept not part of the obspy Stream object. That is, mspass ensembles have a global Metadata container. That container is expected to contain Metadata common to all members of the ensemble. For example, for data from a single earthquake it would be sensible to post the source location information in the ensemble metadata container rather than having duplicates in each member. Two different approaches can be used to do this copy. The faster, but least reliable method is to simply copy the values from the first member of the ensemble. That approach is enabled by default. It is completely reliable when used after a conversion from an obspy Stream but ONLY if the data began life as a mspass ensemble with exactly the same keys set as global. The type example of that is after an obspy algorithm is applied to a mspass ensemble via the mspass decorators. A more cautious algorithm can be enabled by setting check_all_members True. In that mode the list of keys received is tested with a not equal test for against each member. Note we do not do anything fancy with floating point data to allow for finite precision. The reason is Metadata float values are normally expected to be constant data. In that case an != test will yield false when the comparison is between two copies. The not equal test may fail, however, if used with computed floating point numbers. An example where that is possible would be spatial gathers like PP data assembled by midpoint coordinates. If you need to build gathers in such a context we recommend you use an integer image point tied to a specialized document collection in MongoDB that defines the geometry of that point. There may be other examples, but the point is don't trust computed floating point values to work. It will also not work if the values of a key-value pair don't support an != comparison. That could be common if the value request for copy was a python object. :param ens: ensemble data to be processed. The function will throw a MsPASSError exception of ens is not either a TimeSeriesEnsemble or a SeismogramEnsemble. :param keys: is expected to be a list of metadata keys (required to be strings) that are to be copied from member metadata to ensemble metadata. :param check_all_members: switch controlling method used to extract metadata that is to be copied (see above for details). Default is False :param clean_members: when true data copied to ensemble metadata will be removed from all members. This option is only allowed if check_all_members is set True. It will be silently ignored if check_all_members is False. """ alg = "post_ensemble_metadata" if isinstance(ens, TimeSeriesEnsemble) or isinstance(ens, SeismogramEnsemble): md = Metadata() for d in ens.member: if d.live: for k in keys: if not k in d: raise MsPASSError( alg + ": no data matching requested key=" + k + " Cannot post to ensemble", "Invalid", ) md[k] = d[k] if check_all_members: for d in ens.member: for k in keys: if not _all_members_match(ens, k): raise MsPASSError( alg + ": Data mismatch data members with key=" + k + "\n In check_all_members mode all values associated with this key must match", "Invalid", ) if clean_members: for d in ens.member: for k in keys: d.erase(k) ens.update_metadata(md) else: raise MsPASSError( alg + ": Illegal data received. This function runs only on mspass ensemble objects", "Invalid", )