Source code for mspasspy.preprocessing.seed.ensembles

import os
from pathlib import Path
from obspy import read, UTCDateTime
import pandas as pd
import numpy as np
from mspasspy.ccore.utility import (
    Metadata,
    MsPASSError,
    ErrorSeverity,
    AtomicType,
    ProcessingHistory,
)
from mspasspy.ccore.seismic import TimeSeriesEnsemble
from mspasspy.util.converter import Trace2TimeSeries


[docs]def obspy_mseed_file_indexer(file): """ Use obspy's miniseed reader to eat up a (potentially large) file and build an index as a table (returned) of data that can be written to a database. Obspy's reader was written to accept the abomination of miniseed with random packets scattered through the file (or so it seems, since they have no foff concept in their reader). Hence, what this reader does is make a table entry for each net:sta:chan:loc trace object their reader returns. It does this with panda dataframes to build the table. One required argument is the file name containing the miniseed data. """ try: pr = Path(file) fullpath = pr.absolute() [dirself, dfileself] = os.path.split(fullpath) dseis = read(file, format="mseed") net = [] sta = [] chan = [] loc = [] stime = [] etime = [] samprate = [] delta = [] npts = [] calib = [] dfile = [] dir = [] mover = [] tref = [] format = [] mover_self = "obspy_read" tref_self = "UTC" format_self = "miniseed" # Note obspy uses a more verbose name for net:sta:chan:loc # We change to mspass definition below that uses css3.0 names for x in dseis: net.append(x.stats["network"]) sta.append(x.stats["station"]) chan.append(x.stats["channel"]) loc.append(x.stats["location"]) sutc = x.stats["starttime"] stime.append(sutc.timestamp) eutc = x.stats["endtime"] etime.append(eutc.timestamp) samprate.append(x.stats["sampling_rate"]) delta.append(x.stats["delta"]) npts.append(x.stats["npts"]) calib.append(x.stats["calib"]) dfile.append(dfileself) dir.append(dirself) tref.append(tref_self) format.append(format_self) mover.append(mover_self) # Now convert the above to a pandas dataframe and return that # there may be a better way to do this than using this # intermediary dict object, but this should not be a hugely # compute or memory entensive operation even for large files ddict = { "net": net, "sta": sta, "chan": chan, "loc": loc, "starttime": stime, "endtime": etime, "samprate": samprate, "delta": delta, "npts": npts, "calib": calib, "dfile": dfile, "dir": dir, "treftype": tref, "format": format, "mover": mover, } return pd.DataFrame(ddict) except FileNotFoundError as err: print("mseed_file_indexer: invalid file named received") print(err)
[docs]def dbsave_raw_index(db, pdframe, collection="import_miniseed_ensemble"): """ Database save to db for a panda data frame pdframe. This crude version collection name is frozen as import_miniseed_ensemble. db is assumed to be the client root for mongodb or the mspasspy Client that is a child of MongoClient. :param db: MongoClient or mspass Client to save data desired :param pdframe: panda data frame to be saved :param collection: collection to which the data in pdframe is to be saved. Default is 'import_miniseed_ensemble' """ col = db[collection] # records is a keyword that makes rows of the dataframe docs for mongo dtmp = pdframe.to_dict("records") col.insert_many(dtmp)
[docs]def dbsave_seed_ensemble_file(db, file, gather_type="event", keys=None): """ Indexer for SEED files that are already assembled in a "gather" meaning the data have some relation through one or more keys. The association may be predefined by input though a keys array or left null for later association. There is a large overhead in this function as it has to read the entire seed file to acquire the metadata it needs. This version uses a bigger memory bloat than required because it uses obspy's seed reader that always eats up the whole file and returns a list of Trace object. A less memory intensive approach would be to scan the seed blockettes to assemble the metadata, but that would be a future development. A KEY POINT about this function is that it ONLY builds an index for the data file it is given. That index is loosely equivalent to the css3.0 wfdisc table, but organized completely differently. This function writes records into a import_miniseed_ensemble collection. The records written are a hierarchy expressed in json (bson) of how a Ensemble object is define: i.e. ensemble Metadata combined with a container of TimeSeries of Seismogram objects. Because SEED data always defines something that directly maps to TimeSeries this only works to create an index to build TimeSeriesEnsemble objects. This function was written to index ensembles that are passive array common event (source) gathers. These are the passive array equivalent of a shot gather in reflection processing. The format of the json(bson) document used for the index, however, is not limited to that case. The gather type is defined by a metadata key (for this prototype the key is "gather_type"). The concept is the gather_type can be used as both a filter to select data and as a hint to readers on how to handle the data bundle. The gather (ensemble) metadata include a list of dict data that define a json/bson document defining the members of an ensemble. Other documentation will be needed to define this concept more clearly with figures. A design constraint we impose for now is that one file generates one document in the import_miniseed_ensemble collection. This means if the data for an ensemble is spread through several files the best approach is to convert all the data to TimeSeries objects and assemble them with a different algorithm A final point about this function is that it dogmatically always produces a unique uuid string using the ProcessingHistory newid method. Be warned that that string CANNOT be converted to a MongoDB ObjectId as it is generated by a completely different algorithm. The uuid is posted with the key "seed_file_id" to provide a unique name. That id is used by the reader in this module to define a unique origin for a channel of data. We note that usage is problematic for finding a particular waveform linked to a seed_file_id because the index storeds that attribute in subdocuments for each ensemble. :param db: MongoDB database pointer - may also be a mspass Database class :param file: seed file containing the data to be indexed. :param gather_type: character string defining a name that defines a particular ensemble type. Default is "event", which is the only currently supported format. (others keyword will cause an error to be thrown) Anticipated alternatives are: "common_receiver" or "station", "image_point", and "time_window". :return: ObjectId of the document inserted that is the index for the file processed. """ try: his = ProcessingHistory() # used only to create uuids dbh = db["import_miniseed_ensemble"] pr = Path(file) fullpath = pr.absolute() [dirself, dfileself] = os.path.split(fullpath) dseis = read(file, format="mseed") # This holds the ensemble metatdata ensemblemd = {"dir": dirself} ensemblemd["dfile"] = dfileself ensemblemd["format"] = "mseed" # this is a placeholder not really necessary for seed data \ # as seed data by definition yield TimeSeries type data although # not necessarily seismic data (e.g. MT data are distributed as mseed ensemblemd["member_type"] = "TimeSeries" ensemblemd["mover"] = "obspy_seed_ensemble_reader" members = [] # this list will contain one dict for each dseis Trace # we want to put time range of the data into enemblemd - we use these for that stimes = [] etimes = [] for d in dseis: mddict = {} mddict["net"] = d.stats["network"] mddict["sta"] = d.stats["station"] mddict["chan"] = d.stats["channel"] mddict["loc"] = d.stats["location"] st = d.stats["starttime"] et = d.stats["endtime"] mddict["starttime"] = st.timestamp mddict["endtime"] = et.timestamp stimes.append(st.timestamp) etimes.append(et.timestamp) mddict["sampling_rate"] = d.stats["sampling_rate"] mddict["delta"] = d.stats["delta"] mddict["npts"] = d.stats["npts"] mddict["calib"] = d.stats["calib"] # this key name could change mddict["seed_file_id"] = his.newid() members.append(mddict) ensemblemd["members"] = members tmin = np.median(stimes) tmax = np.median(etimes) ensemblemd["starttime"] = tmin ensemblemd["endtime"] = tmax result = dbh.insert_one(ensemblemd) return result.inserted_id except: print("something threw an exception - this needs detailed handlers")
def _load_md(rec, keys): """ Helper for load ensemble. Extracts metadata defined by keys list and posts to a Metadata container that is returned. """ # do this stupid for now without error handlers md = Metadata() for k in keys: x = rec[k] md.put(k, x) return md
[docs]def load_one_ensemble( doc, create_history=False, jobname="Default job", jobid="99999", algid="99999", ensemble_mdkeys=[], # default is to load nothing for ensemble apply_calib=False, verbose=False, ): """ This function can be used to load a full ensemble indexed in the collection import_miniseed_ensemble. It uses a large memory model that eat up the entire file using obspy's miniseed reader. It contains some relics of early ideas of potentially having the function utilize the history mechanism. Those may not work, but were retained. :param doc: is one record in the import_miniseed_ensemble collection :param create_history: if true each member of the ensemble will be defined in the history chain as an origin and jobname and jobid will be be used to construct the ProcessingHistory object. :param jobname: as used in ProcessingHistory (default "Default job") :param jobid: as used in processingHistory :param algid: as used in processingHistory :param ensemble_mdkeys: list of keys to copy from first member to ensemble Metadata (no type checking is done) :param apply_calib: if True tells obspy's reader to apply the calibration factor to convert the data to ground motion units. Default is false. :param verbose: write informational messages while processing """ try: ensemblemd = Metadata() if create_history: his = ProcessingHistory(jobname, jobid) form = doc["format"] mover = doc["mover"] if form != "mseed": raise MsPASSError( "Cannot handle this ensemble - ensemble format=" + form + "\nCan only be mseed for this reader" ) if mover != "obspy_seed_ensemble_reader": raise MsPASSError( "Cannot handle this ensemble - ensemble mover parameter=" + mover + " which is not supported" ) dir = doc["dir"] dfile = doc["dfile"] fname = dir + "/" + dfile # Note this algorithm actually should work with any format # supported by obspy's read function - should generalize it for release dseis = read(fname, format="mseed", apply_calib=apply_calib) if len(ensemble_mdkeys) > 0: ensemblemd = _load_md(doc, ensemble_mdkeys) else: # default is to load everything != members members_key = "members" for k in doc: if k != members_key: x = doc[k] ensemblemd[k] = x # There is a Stream2TimeSeriesEnsemble function # but we don't use it here because we need some functionality # not found in that simple function nseis = len(dseis) result = TimeSeriesEnsemble(ensemblemd, nseis) # Secondary files get handled almost the same except for # a warning. The warning message (hopefully) explains the # problem but our documentation must warn about his if this # prototype algorithm becomes the release version count = 0 for d in dseis: # print('debug - working on data object number',count) count += 1 dts = Trace2TimeSeries(d) if create_history: # This should just define jobname and jobid dts.load_history(his) seedid = d["seed_file_id"] dts.set_as_origin( "load_ensemble", algid, seedid, AtomicType.TIMESERIES, True ) result.member.append(dts) return result except: print("something threw an exception - needs more complete error handlers")
[docs]def load_source_data_by_id(db, mspass_object): """ Prototype function to load source data to any MsPASS data object based on the normalization key. That keys is frozen in this version as "source_id" but may be changed to force constraints by the mspasspy schema classes. Handling of Ensembles and atomic objects are different conceptually but in fact do exactly the same thing. That is, in all cases the algorithm queries the input object for the key "source_id". If that fails it returns an error. Otherwise, it finds the associated document in the source collection. It then posts a frozen set of metadata to mspass_object. If that is an ensemble it is posted to the ensemble metadata area. If it is an atomic object it gets posted to the atomic object's metadata area. """ dbsource = db.source try: if not "source_id" in mspass_object: raise MsPASSError( "load_source_data_by_id", "required attribute source_id not in ensemble metadata", ErrorSeverity.Invalid, ) source_id = mspass_object["source_id"] # The way we currently do this source_id eithe rmaches one documentn in # source or none. Hence, we can jus use a find_one query srcrec = dbsource.find_one({"_id": source_id}) # note find_one returns a None if there is no match. Point this out # because if we used find we would use test size of return and use # next to get the data. Find_one return is easier but depends upon # the uniqueness assumption if srcrec == None: raise MsPASSError( "load_source_data", "no match found in source collection for source_id=" + source_id, ErrorSeverity.Invalid, ) else: mspass_object["source_lat"] = srcrec["lat"] mspass_object["source_lon"] = srcrec["lon"] mspass_object["source_depth"] = srcrec["depth"] mspass_object["source_time"] = srcrec["time"] return mspass_object except: print("something threw an unexpected exception")
[docs]def load_hypocenter_data_by_time( db=None, ens=None, dbtime_key="time", mdtime_key="time_P", event_id_key="evid", phase="P", model="iasp91", dt=10.0, t0_definition="origin_time", t0_offset=0.0, kill_null=True, ): """ Loads hypocenter data (space time coordinates) into an ensemble using an arrival time matching algorithm. This is a generalization of earlier prototypes with the objective of a single interface to a common concept - that is, matching arrival documents to ensemble data using timing based on travel times. We frequently guide processing by the time of one or more seismic phases. Those times can be either measured times done by a human or an automated system or theoretical times from an earth model. This function should work with either approach provided some earlier function created an arrival document that can be used for matching. The matching algorithm uses three keys: an exact match for net, an exact match for sta, and a time range travel time association. The algorithm is not a general associator. It assumes we have access to an arrival collection that has been previously associated. For those familiar with CSS3.0 the type example is the join of event->origin->assoc->arrival grouped by evid or orid. We use a staged match to the ensemble to reduce compute time. That is, we first run a db find on arrival to select only arrival documents within the time span of the ensemble with the defined arrival name key. From each matching arrival we compute a theoretical origin time using obspy's taup calculator and a specified model and hypocenter coordinates in the arrival document that we assume was loaded previously from a css3.0 database (i.e. the event->origin->assoc->arrival view). We then select and load source coordinates for the closest origin time match in the source collection. This is much simpler than the general phase association (determine source coordinates from a random bad of arrival times) but still has one complication - if multiple events have arrivals within the time span of the ensemble the simple match described above is ambiguous. We resolve that with a switch defined by the argument t0_definition. Currently there are two options defining the only two options I know of for time selection of downloaded segments. (1) if set to 'origin_time' we assume member t0 values are near the origin time of the event. (2) if set to 'phase_time' we assume the member t0 values are relative to the phase used as a reference. In both cases an optional t0_offset can be specified to offset each start time by a constant. The sign of the shift is to compare the data start time to the computed time MINUS the specified offset. (e.g. if the reference is a P phase time and we expect the data to have been windowed with a start time 100 s before the theoretical P time, the offset would be 100.0) Note if arrival windowing is selected the source information in arrival will not be referenced but that data is required if using origin time matching. In any case when multiple sources are found to match the ensemble the one with the smallest rms misfit for the windowing is chosen. A warning message is always posted in that situation. By default any ensemble members without a match in arrival will be killed. Note we use the kill method which only marks the data dead but does not clear the contents. Hence, on exit most ensembles will have at least some members marked dead. The function returns the number of members set. The caller should test and complain if there are no matches. """ base_error_message = "load_hypocenter_data_by_time: " if db == None: raise MsPASSError( base_error_message + "Missing required argument db=MongoDB Database handle", ErrorSeverity.Fatal, ) elif ens == None: raise MsPASSError( base_error_message + "Missing required argument ens=mspass Ensemble data", ErrorSeverity.Fatal, ) # First we need to query the arrival table to find all arrivals within # the time range of this ensemble try: dbarrival = db.arrival stime = ens["startttime"] etime = ens["endtime"] query = {{dbtime_key: {"$gte": stime, "$lte": etime}}} narr = dbarrival.count_documents(query) if narr == 0: print(base_error_message, "No arrivals found in data time range:") print(UTCDateTime(stime), " to ", UTCDateTime(etime)) if kill_null: for d in ens.member: d.kill() return 0 else: # This else block isn't essential, but makes the logic clearer # first scan the data for unique events. For now use an evid # test. This could be generalized to coordinates arrivals = dbarrival.find(query) evids = dict() for doc in arrivals: evnow = doc[event_id_key] lat = doc["source.lat"] lon = doc["source.lon"] depth = doc["source.depth"] otime = doc["source.time"] evids[evnow] = [lat, lon, depth, otime] if len(evids) > 1: # land if picks from multiple events are inside the data window # INCOMPLETE - will probably drop this function for k in evids.keys(): print(k) # Above block always sets lat,lon,depth, and otime to the # selected hypocenter data. IMPORTANT is for the unambiguous # case where len(evids) is one we depend on the python property # that lat,lon,depth, and otime are set because they have not # gone out of scope except MsPASSError as err: print(err) raise err
[docs]def load_site_data(db, ens): """ Loads site data into ens. Similar to load_source_data but uses a diffrent match: net,sta, time matching startdate->enddate. Mark members dead and post an elog message if the site coordinates are not found. """ dbsite = db.site try: for d in ens.member: if d.dead(): continue t0 = d["starttime"] net = d["net"] sta = d["sta"] query = { "net": {"$eq": net}, "sta": {"$eq": sta}, "starttime": {"$lt": t0}, "endtime": {"$gt": t0}, } n = dbsite.count_documents(query) if n == 0: d.kill() d.elog.log_error( "load_site_data", "no match found in site collection for net=" + net + " sta=" + sta + " for this event", ErrorSeverity.Invalid, ) else: siterec = dbsite.find_one(query) d["site_lat"] = siterec["lat"] d["site_lon"] = siterec["lon"] d["site_elev"] = siterec["elev"] d["site_id"] = siterec["site_id"] if n > 1: message = "Multiple ({n}) matches found for net={net} and sta={sta} with reference time {t0}".format( n=n, net=net, sta=sta, t0=t0 ) d.elog.log_error("load_site_data", message, ErrorSeverity.Complaint) return ens except Exception as err: raise MsPASSError( "Something threw an unexpected exception", ErrorSeverity.Invalid ) from err
[docs]def load_channel_data(db, ens): """ Loads channel data into ens. Similar to load_source_data but uses a diffrent match: net,sta,loc,time matching startdate->enddate. Mark members dead and post an elog message if required metadata are not found. """ dbchannel = db.channel try: for d in ens.member: if d.dead(): continue t0 = d["starttime"] # this is a sanity check to avoid throwing exceptions if ( d.is_defined("net") and d.is_defined("sta") and d.is_defined("loc") and d.is_defined("chan") ): net = d["net"] sta = d["sta"] chan = d["chan"] loc = d["loc"] query = { "net": {"$eq": net}, "sta": {"$eq": sta}, "chan": {"$eq": chan}, "loc": {"$eq": loc}, "starttime": {"$lt": t0}, "endtime": {"$gt": t0}, } n = dbchannel.count_documents(query) if n == 0: d.kill() d.elog.log_error( "load_channel_data", "no match found in channel collection for net=" + net + " sta=" + sta + " chan=" + chan + " loc=" + loc + " for this event", ErrorSeverity.Invalid, ) continue if n == 1: chanrec = dbchannel.find_one(query) else: # In this case we just complain - and keep use the first record # that is what find_one returns. We use the count to make # the eror message cleaer chanrec = dbchannel.find_one(query) message = "Multiple ({n}) matches found for net={net} and sta={sta} with reference time {t0}".format( n=n, net=net, sta=sta, t0=t0 ) d.elog.log_error("load_site_data", message, ErrorSeverity.Complaint) d["site_lat"] = chanrec["lat"] d["site_lon"] = chanrec["lon"] d["site_elev"] = chanrec["elev"] d["vang"] = chanrec["vang"] d["hang"] = chanrec["hang"] d["site_id"] = chanrec["_id"] return ens except Exception as err: raise MsPASSError( "Something threw an unexpected exception", ErrorSeverity.Invalid ) from err
[docs]def load_arrivals_by_id( db, tsens, phase="P", required_key_map={"phase": "phase", "time": "arrival_time"}, optional_key_map={"iphase": "iphase", "deltim": "deltim"}, verbose=False, ): """ Special prototype function to load arrival times in arrival collection to TimeSeries data in a TimeSeriesEnsemble. Match is a fixed query that uses source_id tht is assumed set for the ensemble AND previously defined for all arrival documents to be matched. The correct record in arrival is found by the combination of matching source_id, net, and sta. This algorithm does a lot of queries to the arrival collection so an index on net and sta is essential. Probably would be good to add source_id to the index as well. :param db: MongoDB database handle or mspsspy.Database object :param tsens: TimeSeriesEnsemble to be processed :param required_key_map: a dict of pairs defining how keys in arrival should map to metadata keys of ensemble members. The key of each entry in the dict is used to fetch the required attribute from the MongoDB doc found in a query. The value of that attribute is posted to each member TimeSeries object with the value of the key value pair associated with that entry. e.g. for the default we fetch the data from the MongoDB document with the key time and write the value retrieved to Metadata with the key arrival.time. If a required key is not found in the arrival document a MsPASSError will be logged and that member will be marked dead. Users should always test for an empty ensemble after running this function. :param optional_key_map: similar to required_key_map but missing attributes only geneate a complaint message and the data will be left live :param verbose: print some messages otherwise only posted to elog :return: count of number of live members in the ensemble at completion """ dbarrival = db.arrival algorithm = "load_arrivals_by_id" if "source_id" in tsens: source_id = tsens["source_id"] for d in tsens.member: if d.dead(): continue if ("net" in d) and ("sta" in d): net = d["net"] sta = d["sta"] query = {"source_id": source_id, "net": net, "sta": sta, "phase": phase} n = dbarrival.count_documents(query) # print('debug query=',query,' yield n=',n) if n == 0: d.elog.log_error( algorithm, "No matching arrival for source_id=" + source_id + " and net:sta =" + net + ":" + sta, ErrorSeverity.Invalid, ) if verbose: print( "No matching arrival for source_id=" + source_id + " and net:sta =" + net + ":" + sta ) d.kill() else: cursor = dbarrival.find(query) if n == 1: rec = cursor.next() elif n > 1: d.elog.log_error( algorithm, "Multiple documents match source_id=" + source_id + " and net:sta =" + net + ":" + sta + " Using first found", ErrorSeverity.Complaint, ) if verbose: print( "debug: multiple docs match - printing full documents of all matches. Will use first" ) for rec in cursor: print(rec) cursor.rewind() rec = cursor.next() for k in required_key_map: if k in rec: x = rec[k] dkey = required_key_map[k] d[dkey] = x else: d.elog.log_error( algorithm, "Required attribute with key=" + k + " not found in matching arrival document - data killed", ErrorSeverity.Invalid, ) d.kill() if verbose: print( "Required attribute with key=" + k + " not found in matching arrival document - data killed" ) for k in optional_key_map: if k in rec: x = rec[k] dkey = optional_key_map[k] d[dkey] = x else: d.elog.log_error( algorithm, "Optional attribute with key=" + k + " was not found in matching arrival document and cannot be loaded", ErrorSeverity.Complaint, ) if verbose: print( "Optional attribute with key=" + k + " was not found in matching arrival document and cannot be loaded" ) else: message = "Enemble metadata does not contain required source_id attribute - killing all data in this ensemble" if verbose: print(message) for d in tsens.member: d.elog.log_error("load_arrivals_by_id", message, ErrorSeverity.Invalid) d.kill() nlive = 0 for d in tsens.member: if d.live: nlive += 1 return nlive
[docs]def erase_seed_metadata(d, keys=["mseed", "_format"]): """ Erase a set of debris obspy's reader adds to stats array that get copied to data. We don't need that debris once data are converted to MsPASS data objects and stored in MongoDB. Use this function when importing seed data with obspy's reader to reduce junk in the database. :param d: is the data object to be cleaned (assumed only to be a child of Metadata so erase will work. :param keys: list of keys to clear. Default should be all that is needed for standard use. """ for mem in d.member: for k in keys: mem.erase(k)