Source code for mspasspy.db.normalize

from abc import ABC, abstractmethod

from mspasspy.ccore.utility import MsPASSError, ErrorSeverity, Metadata
from mspasspy.util.error_logger import PyErrorLogger
from mspasspy.ccore.seismic import (
    TimeSeries,
    Seismogram,
    TimeSeriesEnsemble,
    SeismogramEnsemble,
)
from mspasspy.db.database import Database
from mspasspy.util.decorators import mspass_func_wrapper

from bson import ObjectId

from obspy import UTCDateTime
import pymongo
import copy
import pandas as pd
import dask
import numpy as np

type_pdd = pd.core.frame.DataFrame
type_ddd = dask.dataframe.core.DataFrame


[docs]class BasicMatcher(ABC): """ This base class defines the api for a generic matching capability for MongoDB normalization. The base class is a mostly a skeleton that defines on required abstract methods and initializes a set of universal attributes all matchers need. It cannot be instatiated directly. Matching is defined as one of two things: (1) a one-to-one match algorithm is guaranteed to have each search yield either exactly one match or none. That is defined through a find_one method following the same concept in MongoDB. (2) some matches are not unique and yield more than one document. For that case use the find method. Unlike the MongoDB find method, however, find in this context returns a list of Metadata containers holding the set of attributes requested in lists defined on constuction. Another way of viewing this interface, in fact, is an abstraction of the find and find_one methods of MongoDB to a wider class of algorithms that may or may not utilize MongoDB directly. In particular, intermediate level classes defined below that implement different cache data structures allow input either by loading data from a MongoDB collection of from a pandas DataFrame. That can potentially provide a wide variety of applications of matching data to tabular data contained in files loaded into pandas by any of long list of standard dataframe read methods. Examples are any SQL database or antelope raw tables or views loaded as text files. """ def __init__( self, attributes_to_load=None, load_if_defined=None, aliases=None, ): """ Base class constructor sets only attributes considered necessary for all subclasses. Most if not all subclasses will want to call this constructor driven by an arg list passed to the subclass constructor. :param attributes_to_load: is a list of keys (strings) that are to be loaded from the normalizing table/collection. Default for this base class is None, but subclasses should normally define a default list. It is important to realize that all subclasses should normally treat this list as a set of required attributes. Optional values should appear in the load_if_defined list. :param load_if_defined: is a secondary list of keys (strings) that should be loaded only if they are defined. Default is None here, subclass should set their own default values. :param aliases: should be a python dictionary used to define alternative keys to access a data object's Metadata from that defining the same attribute in the collection/table being matched. Note carefully the key of the dictionary is the collection/table attribute name and the value associated with that key is the alias to use to fetch Metadata. When matchers scan the attributes_to_load and load_if_defined list they should treat missing entries in alias as meaning the key in the collection/table and Metadata are identical. Default is a None which is used as a signal to this constructor to create an empty dictionary meaning there are no aliases. :type aliases: python dictionary """ if attributes_to_load is None: self.attributes_to_load = [] else: self.attributes_to_load = attributes_to_load if load_if_defined is None: self.load_if_defined = [] else: # make sure self.load_if_defined and self.attributes_to_load do # not contain the same keys. self.attributes_to_load will supercede # a duplicate in self.load_if_defined. self.load_if_defined = [] for item in load_if_defined: if item not in self.attributes_to_load: self.load_if_defined.append(item) if aliases is None: self.aliases = dict() elif isinstance(aliases, dict): self.aliases = aliases else: raise TypeError( "BasicMatcher constructor: aliases argument must be either None or define a python dictionary" ) def __call__(self, d, *args, **kwargs): """ Convenience method that allows the shorthand of the find_one method using the standard python meaning of this symbol. e.g. if we have an concrete instance of a subclass of this base class for matching against ObjectIds using the implementation below called IDMatcher we assign to the symbol "matcher" we can get a Metadata container of the matching content for a MsPASS data object with symbol d using md = matcher(d) versus md = matcher.find_one(d). All subclasses have this interface define because it is a (nonvirtual) base class method. """ return self.find_one(d, *args, **kwargs)
[docs] @abstractmethod def find(self, mspass_object, *args, **kwargs) -> tuple: """ Abstraction of the MongoDB database find method with the matching criteria defined when a concrete instance is instantiated. Like the MongoDB method implementations it should return a list of containers matching the keys found in the data passed through the mspass_object. A key difference from MongoDB, however, is that instead of a MongoDB cursor we return a python list of Metadata containers. In some instances that is a direct translation of a MongoDB cursor to a list of Metadata objects. The abstraction is useful to allow small collections to be accessed faster with a generic cache algorithm (see below) and loading of tables of data through a file-based subclass of this base. All can be treated through a common interface. WE STRESS STRONGLY that the abstraction assumes returns are always small enough to not cause a memory bloating problem. If you need the big-memory model of a cursor use it directly. All subclasses must implement this virtual method to be concrete or they cannot be instantiated. If the matching algorithm implemented is always expected to be a unique one-to-one match applications may want to have this method throw an exception as a use error. That case should use the find_one interface defined below. All implementations should return a pair (2 component tuple). 0 is expected to hold a list of Metadata containers and 1 is expected to contain either a None type or an PyErrorLogger object. The PyErrorLogger is a convenient way to pass error messages back to the caller in a manner that is easier to handle with the MsPASS error system than an exception mechanism. Callers should handle four cases that are possible for a return (Noting [] means an empty list and [...] a list with data) 1. [] None - notmatch found 2. [] ErrorLog - failure with an informational message in the ErrorLog that should be preserved. The presence of an error should imply something went wrong and it was simply a null result. 3. [...] None - all is good with no detected errors 4. [...] ErrorLog - valid data returned but there is a warning or informational message posted. In this case handlers may want to examine the ErrorSeverity components of the log and handle different levels differently (e.g. Fatal and Informational should always be treated differently) """
[docs] @abstractmethod def find_one(self, mspass_object, *args, **kwargs) -> tuple: """ Abstraction of the MongoDB database find_one method with the matching criteria defined when a concrete instance is instantiated. Like the MongoDB method implementations should return the unique document that the keys in mspass_object are expected to define with the matching criteria defined by the instance. A type example of an always unique match is ObjectIds. When a match is found the result should be returned in a Metadata container. The attributes returned are normally a subset of the document and are defined by the base class attributes "attributes_to_load" and "load_if_defined". For database instances this is little more than copying desired attributes from the matching document returned by MongoDB, but for abstraction more may be involved. e.g., implemented below is a generic cached algorithm that stores a collection to be matched in memory for efficiency. That implementation allows the "collection" to be loaded from MongoDB or a pandas DataFrame. All implementations should return a pair (2 component tuple). 0 is expected to hold a Metadata containers that was yielded by the match. It should be returned as None if there is no match. 1 is expected to contain either a None type or an PyErrorLogger object. The PyErrorLogger is a convenient way to pass error messages back to the caller in a manner that is easier to handle with the MsPASS error system than an exception mechanism. Callers should handle four cases that are possible for a return: 1, None None - no match found 2. None ErrorLog - failure with an informational message in the ErrorLog that the caller may want be preserved or convert to an exception. 3. Metadata None - all is good with no detected errors 4. Metadata ErrorLog - valid data was returned but there is a warning or informational message posted. In this case handlers may want to examine the ErrorSeverity components of the log and handle different levels differently (e.g. Fatal and Informational should always be treated differently) """ pass
[docs] def find_doc(self, doc) -> Metadata: """ find a unique match using a python dictionary as input. The bulk_normalize function requires an implementation of a method with this name. It is conceptually similar to find_one but it uses a python dictionary (the doc argument) as input instead of a mspass seismic data object. It also returns only a Metadata container on success or None if it fails to find a match. This method is little more than a thin wrapper around an implementation of the find_one method. It checkes the elog for entries marked Invalid and if so returns None. Otherwise it converts the Metdata container to a python dictionary it return. It is part of the base class because it depends only on the near equivalence of a python dictionary and the MsPASS Metadata containers. When find_one returns a ErrorLogger object the contents are inspected. Errors less severe than "Invalid" are ignored and dropped. If the log contains a message tagged "Invalid" this function will silently return None. That could be problematic as it is indistinguishable from the return when there is no match, but is useful to simply the api. If an entry is tagged "Fatal" a MsPASSError exception will be thrown with the message posted to the MsPASSError container. Subclasses may wish to override this method if the approach used here is inappropriate. i.e. if this were C++ this method would be declared virtual. """ md2test = Metadata(doc) [md, elog] = self.find_one(md2test) if elog: elist = elog.worst_errors() # Return no success if Invalid worst = elist[0].badness if worst == ErrorSeverity.Invalid: return None elif worst == ErrorSeverity.Fatal: message = "find_doc method failed. Messages posted:\n" for e in elist: message += e.message + "\n" raise MsPASSError(message, ErrorSeverity.Fatal) return dict(md)
[docs]class DatabaseMatcher(BasicMatcher): """ Matcher using direct database queries to MongoDB. Each call to the find method of this class constructs a query, calls the MongoDB database find method with that query, and extracts desired attributes from the return in the form of a Metadata container. The query construction is abstracted by a virtual method called query_generator. This is an intermediate class that cannot be instantiated directly because it contains a virtual method. User's should consult docstrings for constructors for subclasses of this intermediate class. """ def __init__( self, db, collection, attributes_to_load=None, load_if_defined=None, aliases=None, require_unique_match=False, prepend_collection_name=False, ): """ Constructor for this intermediate class. It should not be used except by subclasses as this intermediate class is not concrete. """ super().__init__( attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, ) if not isinstance(collection, str): raise TypeError( "{} constructor: required arg1 must be a collection name - received invalid type".format( self.__class__.__name__ ) ) self.collection = collection if isinstance(db, pymongo.database.Database): self.dbhandle = db[collection] else: message = "DatabaseMatcher constructor: db argument is not a valid Database handle\n" message += "Actual type of db arg={}".format(str(type(db))) raise TypeError(message) self.require_unique_match = require_unique_match self.prepend_collection_name = prepend_collection_name
[docs] @abstractmethod def query_generator(self, mspass_object) -> dict: """ Subclasses of this intermediate class MUST implement this method. It should extract content from mspass_object and use that content to generate a MongoDB query that is passed directly to the find method of the MongoDB database handle stored within this object (self) during the class construction. Since pymongo uses a python dict for that purpose it must return a valid query dict. Implementations should return None if no query could be generated. Common, for example, if a key required to generate the query is missing from mspass_object. """ pass
[docs] def find(self, mspass_object): """ Generic database implementation of the find method for this abstraction. It returns what the base class api specifies. That is, normally it returns a tuple with component 0 being a python list of Metadata containers. Each container holds the subset of attributes defined by attributes_to_load and (if present) load_if_defined. The list is the set of all documents matching the query, which at this level of the class structure is abstract. The method dogmatically requires data for all keys defined by attributes_to_load. It will throw a MsPASSError exception with a Fatal tag if any of the required attributes are not defined in any of the documents. The return matches the API specification for BasicMatcher. It also handles failures of the abstract query_generator through the mechanism the base class api specified: a None return means the method could not create a valid query. Failures in the query will always post a message to elog tagging the result as "Invalid". It also handles the common problem of dead data or accidentally receiving invalid data like a None. The later may cause other algorithms to abort, but we handle it here return [None,None]. We don't return an PyErrorLogger in that situation as the assumption is there is no place to put it and something else has gone really wrong. """ if not isinstance(mspass_object, Metadata): elog = PyErrorLogger() message = "received invalid data. Arg0 must be a valid MsPASS data object" elog.log_error(message, ErrorSeverity.Invalid) if hasattr(mspass_object, "dead"): if mspass_object.dead(): return [None, None] query = self.query_generator(mspass_object) if query is None: elog = PyErrorLogger() message = "query_generator method failed to generate a valid query - required attributes are probably missing" elog.log_error(message, ErrorSeverity.Invalid) return [None, elog] number_hits = self.dbhandle.count_documents(query) if number_hits <= 0: elog = PyErrorLogger() message = "query = " + str(query) + " yielded no documents" elog.log_error(message, ErrorSeverity.Complaint) return [None, elog] cursor = self.dbhandle.find(query) elog = PyErrorLogger() metadata_list = [] for doc in cursor: try: md = _extractData2Metadata( doc, self.attributes_to_load, self.aliases, self.prepend_collection_name, self.collection, self.load_if_defined, ) metadata_list.append(md) except MsPASSError as e: raise MsPASSError("DatabaseMatcher.find: " + e.message, e.severity) if elog.size() <= 0: return [metadata_list, None] else: return [metadata_list, elog]
[docs] def find_one(self, mspass_object): """ Generic database implementation of the find_one method. The tacit assumption is that if you call find_one you are expecting a unique match to the algorithm implemented. The actual behavior for a nonunique match is controlled by the class attribute require_unique_match. Subclasses that want to dogmatically enforce uniqueness (appropriate for example with ObjectIds) should set require_unique_match True. In that case if a match is not unique the method will throw an exception. When False, which is the default, an informational message is posted and the method returns the first list element returned by find. This method is actually little more than a wrapper around find to handle that uniqueness issue. """ find_output = self.find(mspass_object) if find_output[0] is None: return [None, find_output[1]] mdlist_length = len(find_output[0]) if mdlist_length == 1: return [find_output[0][0], find_output[1]] else: # somewhat messy logic to handle differnt situations # we throw an exception if the constructor set require_unique_match # True. Otherwise we need to handle the distinction on whether or # not the return from find had an PyErrorLogger defined with data. if self.require_unique_match: message = "query of collection {col} did not yield a unique match. Found {n} matching documents. Aborting".format( col=self.collection, n=mdlist_length ) raise MsPASSError( "DatabaseMatcher.find_one: " + message, ErrorSeverity.Fatal ) else: if find_output[1] is None: elog = PyErrorLogger() else: elog = find_output[1] message = "query of collection {col} did not yield a unique match. Found {n} matching documents. Using first one in list".format( col=self.collection, n=mdlist_length ) elog.log_error( "DatabaseMatcher.find_one", message, ErrorSeverity.Complaint ) return [find_output[0][0], elog]
[docs]class DictionaryCacheMatcher(BasicMatcher): """ Matcher implementing a caching method based on a python dictionary. This is an intermediate class for instances where the database collection to be matched is small enough that the in-memory model is appropriate. It should also only be used if the matching algorithm can be reduced to a single string that can serve as a unique id for each tuple. The class defines a generic dictionary cache with a string key. The way that key is define is abstracted through two virtual methods: (1) The cache_id method creates a match key from a mspass data object. That is normally from the Metadata container but it is not restricted to that. e.g. start time for TimeSeries or Seismogram objects can be obtained from the t0 attribute directly. (2) The db_make_cache_id is called by the internal method of this intermediate class (method name is _load_normalization_cache) to build the cache index from MongoDB documents scanned to construct the cache. Two different methods to define the cache index are necessary as a generic way to implement aliases. A type example is the mspass use of names like "channel_id" to refer to the ObjectId of a specific document in the channel collection. When loading channel the name key is "_id" but data objects would normally have that same data defined with the key "channel_id". Similarly, if data have had aliases applied a key in the data may not match the name in a collection to be matched. The dark side of this is it is very easy when running subclasses of this to get null results with all members of a dataset. As always testing with a subset of data is strongly recommended before running versions of this on a large dataset. This class cannot be instantiated because it is not concrete (has abstract - virtual - methods that must be defined by subclasses) See implementations for constructor argument definitions. """ def __init__( self, db_or_df, collection, query=None, attributes_to_load=None, aliases=None, load_if_defined=None, require_unique_match=False, prepend_collection_name=False, use_dataframe_index_as_cache_id=False, ): """ Constructor for this intermediate class. It should not be used except by subclasses as this intermediate class is not concrete. It calls the base class constructor and then loads two internal attributes: query and collection. It then creates the normalization python dict that applies the abstract cache_id method. Note that only works for concrete subclasses of this intermediate class. """ super().__init__( attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, ) if not isinstance(collection, str): raise TypeError( "{} constructor: required arg1 must be a collection name - received invalid type".format( self.__class__.__name__ ) ) if query == None: self.query = dict() elif isinstance(query, dict): self.query = query else: raise TypeError( "{} constructor: query argument must be a python dict container".format( self.__class__.__name__ ) ) self.collection = collection self.require_unique_match = require_unique_match self.prepend_collection_name = prepend_collection_name self.use_dataframe_index_as_cache_id = use_dataframe_index_as_cache_id # This is a redundant initialization but a minor cost for stability self.normcache = dict() # Reference the base class to avoid a type error # This seems to be an oddity from using the same name Database # in mspass as pymongo if isinstance(db_or_df, pymongo.database.Database): self._db_load_normalization_cache(db_or_df, collection) elif isinstance(db_or_df, (type_ddd, type_pdd)): self._df_load_normalization_cache(db_or_df, collection) else: message = "{} constructor: required arg0 must be a Database handle or panda Dataframe\n".format( self.__class__.__name__ ) message += "Actual type = {}".format(str(type(db_or_df))) raise TypeError(message)
[docs] @abstractmethod def cache_id(self, mspass_object) -> str: """ Concrete implementations must implement this method to define how a mspass data object, mspass_object, is to be used to construct the key to the cache dict container. It is distinct from db_make_cache_id to allow differences in naming or even the algorithm used to construct the key from a datum relative to the database. This complicates the interface but makes it more generic. :param mspass_object: is expected to be a MsPASS object. Any type restrictions should be implemented in subclasses that implement the method. :return: should always return a valid string and never throw an exception. If the algorithm fails the implementation should return a None. """ pass
[docs] @abstractmethod def db_make_cache_id(self, doc) -> str: """ Concrete implementation must implement this method to define how the cache index is to be created from database documents passed through the arg doc, which pymongo always returns as a python dict. It is distinct from cache_id to allow differences in naming or the algorithm for loading the cache compared to accessing it using attributes of a data object. If the id string cannot be created from doc an implementation should return None. The generic loaders in this class, db_load_normalization_cache and df_load_normalization_class, handle that situation cleanly but if a subclass overrides the load methods they should handle such errors. "cleanly" in this case means they throw an exception which is appropriate since they are run during construction and any invalid key is not acceptable in that situation. """ pass
[docs] def find_one(self, mspass_object) -> tuple: """ Implementation of find for generic cached method. It uses the cache_id method to create the indexing string from mspass_object and then returns a match to the cache stored in self. Only subclasses of this intermediate class can work because the cache_id method is defined as a pure virtual method in this intermediate class. That construct is used to simplify writing additional matcher classes. All extensions need to do is define the cache_id and db_make_cache_id algorithms to build that index. :param mspass_object: Any valid MsPASS data object. That means TimeSeries, Seismogram, TimeSeriesEnsemble, or SeismogramEnsemble. This datum is passed to the (abstract) cache_id method to create an index string and the result is used to fetch the Metadata container matching that key. What is required of the input is dependent on the subclass implementation of cache_id. :return: 2-component tuple following API specification in BasicMatcher. Only two possible results are possible from this implementation: None ErrorLog - failure with an error message that can be passed on if desired or printed Metadata None - all is good with no detected errors. The Metadata container holds all attributes_to_load and any defined load_if_defined values. """ find_output = self.find(mspass_object) if find_output[0] is None: return find_output elif len(find_output[0]) == 1: return [find_output[0][0], find_output[1]] else: # as with the database version we use require_unique_match # to define if we should be dogmatic or not if self.require_unique_match: message = "query does not yield a unique match and require_unique_match is set true" raise MsPASSError( "DictionaryCacheMatcher.find: " + message, ErrorSeverity.Fatal ) else: message = "encountered a nonunique match calling find_one - returning contents of first matching document found" if find_output[1] is None: elog = PyErrorLogger() else: elog = find_output[1] elog.log_error( "DictionaryCacheMatcher.find: ", message, ErrorSeverity.Complaint ) return [find_output[0][0], elog]
[docs] def find(self, mspass_object) -> tuple: """ Generic implementation of find method for cached tables/collections. This method is a generalization of the MongoDB database find method. It differs in two ways. First, it creates the "query" directly from a MsPASS data object (pymongo find requires a dict as input). Second, the result is return as a python list of Metadata containers containing what is (usually) a subset of the data stored in the original collection (table). In contrast pymongo database find returns a database "Cursor" object which is their implementation of a large list that may exceed the size of memory. A key point is the model here makes sense only if the original table itself is small enough to not cause a memory problem. Further, find calls that yield long list may cause efficiency problems as subclasses that build on this usually will need to do a linear search through the list if they need to find a particular instance (e.g. call to find_one). :param mspass_object: data object to match against data in cache (i.e. query). :type mspass_object: must be a valid MsPASS data object. currently that means TimeSeries, Seismogram, TimeSeriesEnsemble, or SeismogramEnsemble. If it is anything else (e.g. None) this method will return a tuple [None, elog] with elog being a PyErrorLogger with a posted message. :return: tuple with two elements. 0 is either a list of valid Metadata container(s) or None and 1 is either None or an PyErrorLogger object. There are only two possible returns from this method: [None, elog] - find failed. See/save elog for why it failed. [ [md1, md2, ..., mdn], None] - success with 0 a list of Metadata containing attributes_to_load and load_if_defined (if appropriate) in each component. """ if not isinstance(mspass_object, Metadata): elog = PyErrorLogger() elog.log_error( "Received datum that was not a valid MsPASS data object", ErrorSeverity.Invalid, ) return [None, elog] if hasattr(mspass_object, "dead"): if mspass_object.dead(): return [None, None] thisid = self.cache_id(mspass_object) # this should perhaps generate two different messages as the # they imply slightly different things - the current message # is accurate though if (thisid == None) or (thisid not in self.normcache): error_message = "cache_id method found no match for this datum" elog = PyErrorLogger() elog.log_error(error_message, ErrorSeverity.Invalid) return [None, elog] else: return [self.normcache[thisid], None]
def _db_load_normalization_cache(self, db, collection): """ This private method abstracts the process of loading a cached version of a normalizing collection. It creates a python dict stored internally with the name self.normcache. The container is keyed by a string created by the virtual method cache_id. The value associated with each dict key is a python list of Metadata containers. Each component is constructed from any document matching the algorithm defined by cache_id. The list is constructed by essentially appending a new Metadata object whenever a matching cache_id is returned. The Metadata containers normally contain only a subset of the original attributes in the collection. The list attributes_to_load is treated as required and this method will throw a MsPASSError exception if any of them are missing from any document parsed. Use load_if_define for attributes that are not required for your workflow. This method will throw a MsPASS fatal error in two situations: 1. If the collection following the (optional) query is empty 2. If any attribute in the self.attributes_to_load is not defined in any document loaded. In all BasicMatcher subclasses attributes_to_load are considered required. :param db: MsPASS Database class MongoDB database handle. Note it can be the subclass of the base class MongooDB handle as extensions for MsPASS to the handle are not used in this method. :param collection: string defining the MongoDB collection that is to be loaded and indexed - the normalizing collection target. """ dbhandle = db[collection] self.collection_size = dbhandle.count_documents(self.query) if self.collection_size <= 0: message = "Query={} of collection {} yielded no documents\n".format( str(self.query), collection ) message += "Cannot construct a zero length object" raise MsPASSError( "DictionaryCacheMatcher._load_normalization_cache: " + message, ErrorSeverity.Fatal, ) cursor = dbhandle.find(self.query) self.normcache = dict() count = 0 for doc in cursor: cache_key = self.db_make_cache_id(doc) # This error trap may not be necessary but the api requires us # to handle a None return if cache_key == None: raise MsPASSError( "DictionaryCacheMatcher._load_normalization_cache: " + "db_make_cache_id failed - coding problem or major problem with collection=" + collection, ErrorSeverity.Fatal, ) try: md = _extractData2Metadata( doc, self.attributes_to_load, self.aliases, self.prepend_collection_name, collection, self.load_if_defined, ) if cache_key in self.normcache: self.normcache[cache_key].append(md) else: self.normcache[cache_key] = [md] count += 1 except MsPASSError as e: raise MsPASSError( e.message + " in document number {n} of collection {col}".format( n=count, col=collection ), e.severity, ) def _df_load_normalization_cache(self, df, collection): """ This function does the same thing as _db_load_normalization_cache, the only difference is that this current function takes one argument, which is a dataframe. :param df: a pandas/dask dataframe where we load data from """ query_result = df if self.query is not None and len(self.query) > 0: # Create a query # There are multiple ways of querying in a dataframe, according to # the experiments in https://stackoverflow.com/a/46165056/11138718 # We pick the following approach: sub_conds = [df[key].values == val for key, val in self.query.items()] cond = np.logical_and.reduce(sub_conds) query_result = df[cond] if len(query_result.index) <= 0: message = ( "Query={query_str} of dataframe={dataframe_str}" " yielded 0 documents - cannot construct this object".format( query_str=str(self.query), dataframe_str=str(df) ) ) raise MsPASSError( "DictionaryCacheMatcher._load_normalization_cache: " + message, ErrorSeverity.Fatal, ) self.normcache = dict() count = 0 for index, doc in query_result.iterrows(): cache_key = index if not self.use_dataframe_index_as_cache_id: cache_key = self.db_make_cache_id(doc) # This error trap may not be necessary but the api requires us # to handle a None return if cache_key == None: raise MsPASSError( "DictionaryCacheMatcher._load_normalization_cache: " + "db_make_cache_id failed - coding problem or major problem with collection=" + collection, ErrorSeverity.Fatal, ) try: md = _extractData2Metadata( doc, self.attributes_to_load, self.aliases, self.prepend_collection_name, collection, self.load_if_defined, ) if cache_key in self.normcache: self.normcache[cache_key].append(md) else: self.normcache[cache_key] = [md] count += 1 except MsPASSError as e: raise MsPASSError( e.message + " in document number {n} of collection {col}".format( n=count, col=collection ), e.severity, )
[docs]class DataFrameCacheMatcher(BasicMatcher): """ Matcher implementing a caching method based on a Pandas DataFrame This is an intermediate class for instances where the database collection to be matched is small enough that the in-memory model is appropriate. It should be used when the matching algorithm is readily cast into the subsetting api of a pandas DataFrame. The constructor of this intermediate class first calls the BasicMatcher (base class) constructor to initialize some common attribute including the critical lists of attributes to be loaded. This constructor then creates the internal DataFrame cache by one of two methods. If arg0 is a MongoDB database handle it loads the data in the named collection to a DataFrame created during construction. If the input is a DataFrame already it is simply copied selecting only columns defined by the attributes_to_load and load_if_defined lists. There is also an optional parameter, custom_null_values, that is a python dictionary defining values in a field that should be treated as a definition of a Null for that field. The constuctor converts such values to a standard pandas null field value. This class implements generic find and find_one methods. Subclasses of this class must implement a "subset" method to be concrete. A subset method is the abstract algorithm that defines a match for that instance expressed as a pandas subset operation. (For most algorithms there are multiple ways to skin that cat or is it a panda?) See concrete subclasses for examples. This class cannot be instantiated because it is not concrete (has abstract - virtual - methods that must be defined by subclasses) See implementations for constructor argument definitions. """ def __init__( self, db_or_df, collection=None, attributes_to_load=None, load_if_defined=None, aliases=None, require_unique_match=False, prepend_collection_name=False, custom_null_values=None, ): """ Constructor for this intermediate class. It should not be used except by subclasses as this intermediate class is not concrete. """ self.prepend_collection_name = prepend_collection_name self.require_unique_match = require_unique_match self.custom_null_values = custom_null_values # this is a necessary sanity check if collection is None: raise TypeError( "DataFrameCacheMatcher constructor: collection name must be defined when prepend_collection_name is set True" ) elif isinstance(collection, str): self.collection = collection else: raise TypeError( "DataFrameCacheMatcher constructor: collection argument must be a string type" ) # We have to reference the base class pymongo.database.Database here because # the MsPASS subclass name is also Database. That make this # conditional fail in some uses. Using the base class is totally # appropriate here anyway as no MsPASS extension methods are used if not isinstance(db_or_df, (type_pdd, type_ddd, pymongo.database.Database)): raise TypeError( "DataFrameCacheMatcher constructor: required arg0 must be either a pandas, dask Dataframe, or database handle" ) if attributes_to_load is None: if load_if_defined is None: raise MsPASSError( "DataFrameCacheMatcher constructor: usage error. Cannot use default of attributes_to_load (triggers loading all columns) and define a list of names for argument load_if_defined" ) aload = list() for key in db_or_df.columns: aload.append(key) else: aload = attributes_to_load super().__init__( attributes_to_load=aload, load_if_defined=load_if_defined, aliases=aliases, ) df = ( db_or_df if isinstance(db_or_df, (type_pdd, type_ddd)) else pd.DataFrame(list(db_or_df[self.collection].find({}))) ) self._load_dataframe_cache(df)
[docs] def find(self, mspass_object) -> tuple: """ DataFrame generic implementation of find method. This method uses content in any part of the mspass_object (data object) to subset the internal DataFrame cache to return subset of tuples matching some condition defined computed through the abstract (virtual) methdod subset. It then copies entries in attributes_to_load and when not null load_if_defined into one Metadata container for each row of the returned DataFrame. """ if not isinstance(mspass_object, Metadata): elog = PyErrorLogger( "DataFrameCacheMatcher.find", "Received datum that was not a valid MsPASS data object", ErrorSeverity.Invalid, ) return [None, elog] subset_df = self.subset(mspass_object) # assume all implementations will return a 0 length dataframe # if the subset failed. if len(subset_df) <= 0: error_message = "subset method found no match for this datum" elog = PyErrorLogger() elog.log_error(error_message, ErrorSeverity.Invalid) return [None, elog] else: # This loop cautiously fills one or more Metadata # containers with each row of the DataFrame generating # one Metadata container. mdlist = list() elog = None for index, row in subset_df.iterrows(): md = Metadata() notnulltest = row.notnull() for k in self.attributes_to_load: if notnulltest[k]: if k in self.aliases: key = self.aliases[k] else: key = k if self.prepend_collection_name: if key == "_id": mdkey = self.collection + key else: mdkey = self.collection + "_" + key else: mdkey = key md[mdkey] = row[key] else: if elog is None: elog = PyErrorLogger() error_message = "Encountered Null value for required attribute {key} - repairs of the input DataFrame are required".format( key=k ) elog.log_error( error_message, ErrorSeverity.Invalid, ) return [None, elog] for k in self.load_if_defined: if notnulltest[k]: if k in self.aliases: key = self.aliases[k] else: key = k if self.prepend_collection_name: if key == "_id": mdkey = self.collection + key else: mdkey = self.collection + "_" + key else: mdkey = key md[mdkey] = row[key] mdlist.append(md) return [mdlist, None]
[docs] def find_one(self, mspass_object) -> tuple: """ DataFrame implementation of the find_one method. This method is mostly a wrapper around the find method. It calls the find method and then does one of two thing s depending upon the value of self.require_unique_match. When that boolean is True if the match is not unique it creates an PyErrorLogger object, posts a message to the log, and then returns a [Null,elog] pair. If self.require_unique_match is False and the match is not ambiguous, it again creates an PyErrorLogger and posts a message, but it also takes the first container in the list returned by find and returns in as component 0 of the pair. """ findreturn = self.find(mspass_object) mdlist = findreturn[0] if mdlist is None: return findreturn elif len(mdlist) == 1: return [mdlist[0], findreturn[1]] elif len(mdlist) > 1: if self.require_unique_match: raise MsPASSError( "DataFrameCacheMatcher.find_one: found {n} matches when require_unique_match was set true".format( n=len(mdlist) ), ErrorSeverity.Invalid, ) if findreturn[1] is None: elog = PyErrorLogger() else: elog = findreturn[1] # This maybe should be only posted with a verbose option???? error_message = "found {n} matches. Returned first one found. You should use find instead of find_one if the match is not unique".format( n=len(mdlist) ) elog.log_error(error_message, ErrorSeverity.Complaint) return [mdlist[0], elog] else: # This is a safety purely for code maintenance. # currently find either returns None or a list with data in it # we enter this safety ONLY if find returns a zero length list # we raise an exception if that happens because it is # not expeted. raise MsPASSError( "DataFrameCacheMatchter.find_one: find returned an empty list. Can only happen if custom matcher has overridden find. Find should return None if the match fails", ErrorSeverity.Fatal, )
[docs] @abstractmethod def subset(self, mspass_object) -> pd.DataFrame: """ Required method defining how the internal DataFrame cache is to be subsetted using the contents of the data object mspass_object. Concrete implementation must implement this class. The point of this abstract method is that the way one defines how to get the information needed to define a match with the cache is application dependent. An implementation can use Metadata attributes, data object attributes (e.g. TimeSeries t0 attribute), or even sample data to compute a value to use in DataFrame subset condition. This simplifies writing a custom matcher to implementing only this method as find and find_one use it. Implementations should return a zero length DataFrame if the subset condition yields a null result. i.e. the test len(return_result) should work and return 0 if the subset produced no rows. """ pass
def _load_dataframe_cache(self, df): # This is a bit error prone. It assumes the BasicMatcher # constructor initializes a None default to an empty list fulllist = self.attributes_to_load + self.load_if_defined self.cache = df.reindex(columns=fulllist)[fulllist] if self.custom_null_values is not None: for col, nul_val in self.custom_null_values.items(): self.cache[col] = self.cache[col].replace(nul_val, np.nan)
[docs]class ObjectIdDBMatcher(DatabaseMatcher): """ Implementation of DatabaseMatcher for ObjectIds. In this class the virtual method query_generator uses the mspass convention of using the collection name and the magic string "_id" as the data object key (e.g. channel_id) but runs the query using the "_id" magic string used in MongoDB for the ObjectId of each document. Users should only utilize the find_one method of this class as find, by definition, will always return only one record or None. The find method, in fact, is overloaded and attempts to use it will result in raising a MsPASSError exception. :param db: MongoDB database handle (positional - no default) :type db: normally a MsPASS Database class but with this algorithm it can be the superclass from which Database is derived. :param collection: Name of MongoDB collection that is to be queried (default "channel"). :type collection: string :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find_one will fail. Default ["_id","lat","lon","elev","hang","vang"]. :type attributes_to_load: list of string defining keys in collection documents :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. :param type: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by the find and find_one method will all have the collection name prepended with a (fixed) separator. For example, if the collection name is "channel" the "lat" attribute in the channel document would be returned as "channel_lat". :type prepend_collection_name: boolean """ def __init__( self, db, collection="channel", attributes_to_load=["_id", "lat", "lon", "elev", "hang", "vang"], load_if_defined=None, aliases=None, prepend_collection_name=True, ): """ Class Constructor. Just calls the superclass constructor directly with no additions. Sets unique match, however, to be dogmatically enforce uniqueness. """ super().__init__( db, collection, attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=True, prepend_collection_name=prepend_collection_name, )
[docs] def query_generator(self, mspass_object) -> dict: data_object_key = self.collection + "_id" if mspass_object.is_defined(data_object_key): query_id = mspass_object[data_object_key] # This is a bombproof way to create an ObjectId # works the same if query_id is a string representation of # the id or an actual ObjectId. In the later case it calls # the copy constructor. testid = ObjectId(query_id) return {"_id": testid} else: return None
[docs]class ObjectIdMatcher(DictionaryCacheMatcher): """ Implement an ObjectId match with caching. Most of the code for this class is derived from the superclass DictionaryCacheMatcher. It adds only a concrete implementation of the cache_id method used to construct a key for the cache defined by a python dict (self.normcache). In this case the cache key is simply the string representation of the ObjectId of each document in the collection defined in construction. The cache is then created by the superclass generic method _load_normalization_cache. :param db: MongoDB database handle (positional - no default) :type db: normally a MsPASS Database class but with this algorithm it can be the superclass from which Database is derived. :param collection: Name of MongoDB collection that is to be loaded and cached to memory inside this object (default "channel") :type collection: string :param query: optional query to apply to collection before loading document attributes into the cache. A typical example would be a time range limit for the channel or site collection to avoid loading instruments not operational during the time span of a data set. Default is None which causes the entire collection to be parsed. :type query: python dict with content that defines a valid query when be passed to MongoDB the MongoDB find method. If query is a type other than a None type or dict the constructor will throw a TypeError. :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find will fail. Default ["_id","lat","lon","elev","hang","vang"] :type attributes_to_load: list of string defining keys in collection documents :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. :param type: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when set true all attributes loaded from the normalizing collection will have the channel name prepended. That is essential if the collection contains generic names like "lat" or "depth" that would produce ambiguous keys if used directly. (e.g. lat is used for source, channel, and site collections in the default schema.) :type prepend_collection_name: boolean """ def __init__( self, db, collection="channel", query=None, attributes_to_load=["_id", "lat", "lon", "elev", "hang", "vang"], load_if_defined=None, aliases=None, prepend_collection_name=True, ): """ Class Constructor. Just calls the superclass constructor directly with no additions. It does, however, set the require_unique_match boolean True which cause the find_one method to be dogmatic in enforcing unique matches. """ # require_unique_match is alwatys set True here as that is # pretty much by definition. super().__init__( db, collection, query=query, attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=True, prepend_collection_name=prepend_collection_name, ) if query is None: self.query = dict() elif isinstance(query, dict): self.query = query else: raise TypeError( "ObjectIdMatcher constructor: optional query argument must be either None or a python dict" )
[docs] def cache_id(self, mspass_object) -> str: """ Implementation of virtual method with this name for this matcher. It implements the MsPASS approach of defining the key for a normalizing collection as the collection name and the magic string "_id" (e.g. channel_id or site_id). It assumes the collection name is define as self.collection by the constructor of the class when it is instantiated. It attempts to extract the expanded _id name (e.g. channel_id) from the input mspass_object. If successful it returns the string representation of the resulting (assumed) ObjectId. If the key is not defined it returns None as specified by the superclass api. It is important to note the class attribute, self.prepend_collection_name, is indendent of the definition of the cache_id. i.e. what we attempt to extract as the id ALWAYS used the collection name as a prefix (channel_id and not "_id"). The internal boolean controls if the attributes returned by find_one will have the collection name prepended. :param mspass_object: key-value pair container containing an id that is to be extracted. :type mspass_object: Normally this is expected to be a mspass data object (TimeSeries, Seismogram, or ensembles of same) but it can be as simple as a python dict or Metadata with the required key defined. :return: string representation of an ObjectId to be used to matching the cache index stored internally - find method. """ testid = self.collection + "_id" if testid in mspass_object: return str(mspass_object[testid]) else: return None
[docs] def db_make_cache_id(self, doc) -> str: """ Implementation of virtual methods with this name for this matcher. It does nothing more than extract the magic "_id" value from doc and returns its string representation. With MongoDB that means the string representation of the ObjectId of each collection document is used as the key for the cache. :param doc: python dict defining a document return by MongoDB. Only the "_id" value is used. :type doc: python dict container returned by pymongo - usually a cursor component. """ if "_id" in doc: return str(doc["_id"]) else: return None
[docs]class MiniseedDBMatcher(DatabaseMatcher): """ Database implementation of matcher for miniseed data using SEED keys net:sta:chan(channel only):loc and a time interval test. Miniseed data uses the exessively complex key that combines four unique string names (net, sta, chan, and loc) and a time interval of operation to define a unique set of station metadata for each channel. In mspass we also create the site collection without the chan attribute. This implementation works for both channel and site under control of the collection argument. This case is the complete opposite of something like the ObjectId matcher above as the match query we need to generate is excessively long requiring up to 6 fields. The default collection name is channel which is the only correct use if applied to data created through readers applied to wf_miniseed. The implementation can also work on Seismogram data if and only if the channel argument is then set to "site". The difference is that for Seismogram data "chan" is a undefined concept. In both cases the keys and content assume the mspass schema for the channel or site collections. The constructor will throw a MsPASSError exception if the collection argument is anything but "channel" or "site" to enforce this limitation. If you use a schema other than the mspass schema the methods in this class will fail if you change any of the following keys: net, sta, chan, loc, starttime, endtime, hang, vang Users should only call the find_one method for this application. The find_one algorithm first queries for any matches of net:sta:chan(channel only):loc(optional) and data t0 within the startime and endtime of the channel document attributes (an interval match). That combination should yield either 1 or no matches if the channel collection is clean. However, there are known issues with station metadata that can cause multiple matches in unusual cases (Most notably overlapping time intervals defined for the same channel.) The find_one method will handle that case returning the first one found and posting an error message that should be handled by the caller. Instantiation of this class is a call to the superclass constructor with specialized defaults and wrapper code to automatically handle potential mismatches between site and channel. The arguments for the constructor follow: :param db: MongoDB database handle (positional - no default) :type db: normally a MsPASS Database class but with this algorithm it can be the superclass from which Database is derived. :param collection: Name of MongoDB collection that is to be queried The default is "channel". Use "site" for Seismogram data. Use anything else at your own risk as the algorithm depends heavily on mspass schema definition and properties guaranteed by using the converter from obspy Inventory class loaded through web services or stationml files. :type collection: string :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find will fail. Default ["_id","lat","lon","elev","hang","vang"] "hang","vang","starttime","endtime"] when collection is set as channel. Smaller list of ["_id","lat","lon","elev"] is default when collection is set as "site". :type attributes_to_load: list of string defining keys in collection documents :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. Default is ["loc"]. A common addition here may be response data (see schema definition for keys) :type load_if_defined: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by the find and find_one method will all have the collection name prepended with a (fixed) separator. For example, if the collection name is "channel" the "lat" attribute in the channel document would be returned as "channel_lat". :type prepend_collection_name: boolean """ def __init__( self, db, collection="channel", attributes_to_load=["starttime", "endtime", "lat", "lon", "elev", "_id"], load_if_defined=None, aliases=None, prepend_collection_name=True, ): aload_tmp = attributes_to_load if collection == "channel": if "hang" not in aload_tmp: aload_tmp.append("hang") if "vang" not in aload_tmp: aload_tmp.append("vang") elif collection != "site": raise MsPASSError( "MiniseedDBMatcher: " + "Illegal collection argument=" + collection + " Must be either channel or site", ErrorSeverity.Fatal, ) super().__init__( db, collection, attributes_to_load=aload_tmp, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=False, prepend_collection_name=prepend_collection_name, )
[docs] def query_generator(self, mspass_object): """ Concrete implementation of (required) abstract method defined in superclass DatabaseMatcher. It generates the complex query for matching net, sta, chan, and optional loc along with the time interval match of data start time between the channel defined "starttime" and "endtime" attributes. This method provides one safety for a common data problem. A common current issue is that if miniseed data are saved to wf_TimeSeries and then read back in a later workflow the default schema will alter the keys for net, sta, chan, and loc to add the prefix "READONLY_" (e.g. READONLY_net). The query automatically tries to recover any of the station name keys using that recipe. :param mspass_object: assumed to be a TimeSeries object with net, sta, chan, and (optional) loc defined. The time for the time interval test is translation to MongoDB syntax of: channel["starttime"] <= mspass_object.to <= channel["endtime"] This algorithm will abort if the statement mspass_object.t0 does not resolve, which means the caller should assure the input is a TimeSeries object. :return: normal return is a string defining the query. If any required station name keys are not defined the method will silently return a None. Caller should handle a None condition. """ query = dict() net = _get_with_readonly_recovery(mspass_object, "net") if net is None: return None query["net"] = net sta = _get_with_readonly_recovery(mspass_object, "sta") if sta is None: return None query["sta"] = sta if self.collection == "channel": chan = _get_with_readonly_recovery(mspass_object, "chan") if chan is None: return None query["chan"] = chan loc = _get_with_readonly_recovery(mspass_object, "loc") # loc being null is not unusual so if is is null we just add it to query if loc != None: query["loc"] = loc # We don't verify mspass_object is a valid TimeSeries here # assuming it was done prior to calling this method. # fetching t0 could cause an abort if mspass_object were not # previously validated. An alternative would be to test # here and return None if mspass_object was not a valid TimeSeries # done this way because other bad things would happen if find # if that assumption was invalid query["starttime"] = {"$lte": mspass_object.t0} query["endtime"] = {"$gte": mspass_object.endtime()} return query
[docs] def find_one(self, mspass_object): """ We overload find_one to provide the unique match needed. Most of the work is done by the query_generator method in this case. This method is little more than a wrapper to run the find method and translating the output into the slightly different form required by find_one. More important is the fact that the wrapper implements two safties to make the code more robust: 1. It immediately tests that mspass_object is a valid TimeSeries or Seismogram object. It will raise a TypeError exception if that is not true. That is enforced because find_one in this context make sense only for atomic objects. 2. It handles dead data cleanly logging a message complaining that the data was already marked dead. In addition note the find method this calls is assumed to handle the case of failures in the query_generator function if any of the required net, sta, chan keys are missing from mspass_object. """ # Be dogmatic and demand mspass_object is a TimeSeries or Seismogram (atomic) if _input_is_atomic(mspass_object): if mspass_object.live: # trap this unlikely but possible condition as this # condition could produce mysterious behavior if mspass_object.time_is_relative(): elog = PyErrorLogger() message = "Usage error: input has a relative time standard but miniseed matching requires a UTC time stamp" elog.log_error(message, ErrorSeverity.Invalid) return [None, elog] find_output = self.find(mspass_object) if find_output[0] is None: return [None, find_output[0]] number_matches = len(find_output[0]) if number_matches == 1: return [find_output[0][0], find_output[1]] else: if find_output[1] is None: elog = PyErrorLogger() else: elog = find_output[1] message = "{n} channel recorded match net:sta:chan:loc:time_interval query for this datume\n".format( n=number_matches ) message += "Using first document in list returned by find method" elog.log_error(message, ErrorSeverity.Complaint) return [find_output[0][0], elog] else: # logged as complaint because by definition if it was # already killed it is Invalid elog = PyErrorLogger() elog.log_error( "Received a datum marked dead - will not attempt match", ErrorSeverity.Complaint, ) return [None, elog] else: raise TypeError( "MiniseedDBMatcher.find_one: this class method can only be applied to TimeSeries or Seismogram objects" )
[docs]class MiniseedMatcher(DictionaryCacheMatcher): """ Cached version of matcher for miniseed station/channel Metadata. Miniseed data require 6 keys to uniquely define a single channel of data (5 at the Seismogram level where the channels are merged). A further complication for using the DictionaryCacheMatcher interface is that part of the definition is a UTC time interval defining when the metadata is valid. We handle that in this implementation by implementing a two stage search algorithm for the find_one method. First, the cache index is defined by a unique string created from the four string keys of miniseed that the MsPASS default schema refers to with the keywords net, sta, chan, and loc. At this point in time we know of no examples of a seismic instrument where the number of distinct time intervals with different Metadata are huge so the secondary search is a simple linear search through the python list return by the generic find method using only the net, sta, chan, and loc keys as the index. The default collection name is channel which is the only correct use if applied to data created through readers applied to wf_miniseed. The implementation can also work on Seismogram data if and only if the channel argument is then set to "site". The difference is that for Seismogram data "chan" is a undefined concept. In both cases the keys and content assume the mspass schema for the channel or site collections. The constructor will throw a MsPASSError exception if the collection argument is anything but "channel" or "site" to enforce this limitation. If you use a schema other than the mspass schema the methods in this class will fail if you change any of the following keys: net, sta, chan, loc, starttime, endtime, hang, vang Users should only call the find_one method for this application. The find_one method here overrides the generic find_one in the superclass DictionaryCacheMatcher. It implements the linear search for a matching time interval test as noted above. Note also this class does not support Ensembles directly. Matching instrument data is by definition what we call atomic. If you are processing ensembles you will need to write a small wrapper function that would run find_one and handle the out looping over each member of the ensemble. :param db: MongoDB database handle containing collection to be loaded. :type db: mspass Database handle(mspasspy.db.database.Database). :param collection: Name of MongoDB collection that is to be queried The default is "channel". Use "site" for Seismogram data. Use anything else at your own risk as the algorithm depends heavily on the mspass schema definition and properties guaranteed by using the converter from obspy Inventory class loaded through web services or stationml files. :type collection: string :param query: optional query to apply to collection before loading data from the database. This parameter is ignored if the input is a DataFrame. A common use would be to reduce the size of the cache by using a time range limit on station metadata to only load records relevant to the dataset being processed. :type query: python dictionary. :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find will fail. Default ["_id","lat","lon","elev","hang","vang"] "hang","vang","starttime","endtime"] when collection is set as channel. Smaller list of ["_id","lat","lon","elev"] is default when collection is set as "site". In either case the list MUST contain "starttime" and "endtime". The reason is the linear search step will always use those two fields in the linear search for a time interval match. Be careful in how endtime is defined that resolves to an epoch time in the distant future and not some null database attribute; a possible scenario with DataFrame input but not a concern if using mspass loaders from StationML data. Note there is also an implicit assumption that the keys "net" and "sta" are always defined. "chan" must also be defined if the collection name is "channel". "loc" is handled as optional for database input but required if the input is via a Dataframe because we use the same cache id generator for all cases. :type attributes_to_load: list of string defining keys in collection documents :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. Default is ["loc"]. A common addition here may be response data (see schema definition for keys) :type load_if_defined: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by the find and find_one method will all have the collection name prepended with a (fixed) separator. For example, if the collection name is "channel" the "lat" attribute in the channel document would be returned as "channel_lat". :type prepend_collection_name: boolean """ def __init__( self, db, collection="channel", query=None, attributes_to_load=["starttime", "endtime", "lat", "lon", "elev", "_id"], load_if_defined=None, aliases=None, prepend_collection_name=True, ): aload_tmp = attributes_to_load if collection == "channel": # forcing this may be a bit too dogmatic but hang and vang # are pretty essential metadata for any channel if "hang" not in aload_tmp: aload_tmp.append("hang") if "vang" not in aload_tmp: aload_tmp.append("vang") elif collection != "site": raise MsPASSError( "MiniseedMatcher: " + "Illegal collection argument=" + collection + " Must be either channel or site", ErrorSeverity.Fatal, ) if "starttime" not in aload_tmp or "endtime" not in aload_tmp: raise MsPASSError( "MiniseedMatcher: " + "Error in attribute_to_load list - List must contain starttime and endtime keys", ErrorSeverity.Fatal, ) super().__init__( db, collection, query=query, attributes_to_load=aload_tmp, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=True, prepend_collection_name=prepend_collection_name, )
[docs] def cache_id(self, mspass_object) -> str: """ Concrete implementations of this required method. The cache_id in this algorithm is a composite key made from net, sta, chan, and loc with a fixed separator string of "_". A typical example is IU_AAK_BHZ_00_. An added feature to mesh with MsPASS conventions is a safety for attributes that are automatically renamed when saved that are marked readonly in the schema. Such attributes have a prepended tag, (at this time "READONLYERROR_"). If one of the required keys for the index is missing (e.g. "net") the function tries to then fetch the modified name (e.g. "READONLYERROR_net"). If that also fails it returns a None as specified by the API. :param mspass_object: mspass object to be matched with cache. Must contain net, sta fpr site matching and net, sta, and chan for the channel collection. If loc is not defined for any case an emtpy string in defined an the key has two trailing separator characters (e.g. IU_AAK_BHZ__) :return: normal return is a string that can be used as an index string. If any of the required keys is missing it will return a None """ # we make this a const SEPARATOR = "_" net = _get_with_readonly_recovery(mspass_object, "net") if net is None: return None sta = _get_with_readonly_recovery(mspass_object, "sta") if sta is None: return None if self.collection == "channel": chan = _get_with_readonly_recovery(mspass_object, "chan") if chan is None: return None loc = _get_with_readonly_recovery(mspass_object, "loc") if loc == None: loc = "" idstring = net + SEPARATOR + sta + SEPARATOR + loc + SEPARATOR # a bit nonstandard to put chan at end but this isn't for human # consumption anyway if self.collection == "channel": idstring += chan + SEPARATOR return idstring
[docs] def db_make_cache_id(self, doc) -> str: """ Concrete implementations of this required method. The cache_id in this algorithm is a composite key made from net, sta, chan, and loc with a fixed separator string of "_". A typical example is IU_AAK_BHZ_00_. This method creates this string from a MongoDB document assumed passed through a python dictionary as argument doc. Unlike the cache_id method this function does not have a safety for readonly errors. The reason is that it is designed to be used only while loading the cache from site or channel documents. :param doc: python dict containing a site or channel document. Must contain net, sta fpr site matching and net, sta, and chan for the channel collection. If loc is not defined for any case an emtpy string in defined an the key has two trailing separator characters (e.g. IU_AAK_BHZ__) :return: normal return is a string that can be used as an index string. If any of the required keys is missing it will return a None """ # we make this a const SEPARATOR = "_" # superclass must handle None returns for invalid document if "net" in doc: net = doc["net"] else: return None if "sta" in doc: sta = doc["sta"] else: return None if self.collection == "channel": if "chan" in doc: chan = doc["chan"] else: return None if "loc" in doc: loc = doc["loc"] else: loc = "" idstring = net + SEPARATOR + sta + SEPARATOR + loc + SEPARATOR # a bit nonstandard to put chan at end but this isn't for human # consumption anyway if self.collection == "channel": idstring += chan + SEPARATOR return idstring
[docs] def find_one(self, mspass_object): """ We overload find_one to provide the unique match needed. The algorithm does a linear search for the first time interval for which the start time of mspass_object is within the startime <= t0 <= endtime range of a record stored in the cache. This works only if starttime and endtime are defined in the set of attributes loaded so the constructor of this class enforces that restriction. The times in starttime and endtime are assumed to be defined as epoch times as the algorithm uses a simple numeric test of the data start time with the two times. An issue to watch out for is endtime not being set to a valid distant time but some null field that resolves to something that doesn't work in a numerical test for < endtime. :param mspass_object: data to be used for matching against the cache. It must contain the required keys or the matching will fail. If the datum is marked dead the algorithm will return immediately with a None response and an error message that would usually be dropped by the call in that situation. :type mspass_object: must be one of the atomic data types of mspass (currently TimeSeries and Seismogram) with t0 defined as an epoch time computed from a UTC time stamp. """ # Be dogmatic and demand mspass_object is a TimeSeries if isinstance(mspass_object, (TimeSeries, Seismogram)): if mspass_object.live: # trap this unlikely but possible condition as this # condition could produce mysterious behavior if mspass_object.time_is_relative(): elog = PyErrorLogger() message = "Usage error: input has a relative time standard but miniseed matching requires a UTC time stamp" elog.log_error(message, ErrorSeverity.Invalid) return [None, elog] find_output = self.find(mspass_object) if find_output[0] is None: # Current implementation posts a elog message that is # returned in file_output[1]. Could consider adding to # that log message here to clarify it was the miniseed # instance return [None, find_output[1]] else: for md in find_output[0]: stime_key = ( "starttime" if not self.prepend_collection_name else self.collection + "_starttime" ) etime_key = ( "endtime" if not self.prepend_collection_name else self.collection + "_endtime" ) stime = md[stime_key] etime = md[etime_key] t0 = mspass_object.t0 if t0 >= stime and t0 <= etime: return [md, find_output[1]] # we land here if the linear search failed and no # t0 is within the ranges defined if find_output[1] is None: elog = PyErrorLogger() else: elog = find_output[1] message = ( "No matching records found in " + self.collection + " collection for:\n" ) message += "net=" + mspass_object["net"] message += ", sta=" + mspass_object["sta"] message += ", chan=" + mspass_object["chan"] if mspass_object.is_defined("loc"): message += ", loc=" + mspass_object["loc"] message += "time=" + str(UTCDateTime(mspass_object.t0)) message += "\nFound a match for station codes but no channel time range contains that time " elog.log_error(message, ErrorSeverity.Invalid) return [None, elog] else: elog = PyErrorLogger() elog.log_error( "Received a datum marked dead - will not attempt match", ErrorSeverity.Invalid, ) return [None, elog] else: raise TypeError( "MiniseedMatcher.find_one: this class method can only be applied to TimeSeries or Seismogram objects" )
[docs] def find_doc(self, doc, wfdoc_starttime_key="starttime"): """ Function to support application to bulk_normalize to set channel_id or site_id. Acts like find_one but without support for readonly recovery. The bigger difference is that this method accepts a python dict retrieved in a cursor loop for bulk_normalize. Returns the Metadata container that is matched from the cache. This uses the same algorithm as the overloaded find_one above where a linear search is used to handle the time interval matching. Here, however, the time field is extracted from doc with the key defined by starttime. This method overrides the generic version in BasicMatcher due to some special peculiarities of miniseed. :param doc: document (pretty much assumed to be from wf_miniseed) to be matched with channel or site. :type doc: python dictionary - document from MongoDB :param wfdoc_starttime_key: optional parameter to change the key used to fetch the start time of waveform data from doc. Default is "starttime"/ :type wfdoc_starttime_key: string :return: matching Metadata container if successful. None if matching fails for any reason. """ # always abort if the starttime key is not defined. For # current implementation wf_miniseed documents always have that # key defined so throwing an exception marked Fatal is appropriate. if wfdoc_starttime_key not in doc: raise MsPASSError( "MiniseedMatcher: " + "Required key defining waveform start time=" + wfdoc_starttime_key + " is missing from document received", ErrorSeverity.Fatal, ) # we use this version of the method to generate the cache key as # it works with the dict, BUT it doesn't allow the READONLYERROR # recovery testid = self.db_make_cache_id(doc) if testid is None: return None if testid in self.normcache: matches = self.normcache[testid] else: return None # linear search similar to that in find_one above for md in matches: if self.prepend_collection_name: stkey = self.collection + "_" + "starttime" etkey = self.collection + "_" + "endtime" else: stkey = "starttime" etkey = "endtime" # let this throw an exception if fetch fails. Constructor # should guarantee these two attributes are loaded stime = md[stkey] etime = md[etkey] dt0 = doc[wfdoc_starttime_key] if dt0 >= stime and dt0 <= etime: return md return None
[docs]class EqualityMatcher(DataFrameCacheMatcher): """ Match with an equality test for the values of one or more keys with possible aliasing between data keys and database keys. This class can be used for matching a set of keys that together provide a unique matching capability. Note the keys are applied sequentially to reduce the size of internal DataFrame cache in stages. If the DataFrame is large it may improve performance if the most unique key in a series appears first. A special feature of the implementation is that we allow what is best thought of as reverse aliasing for the keys to be used for matching. That is, the base class of this family has an attribute self.aliases that allow mapping from collection names to data object names. The match_keys parameter here is done in the reverse order. That is, the key of the match_keys dictionary is the data object key while the value associated with that key is the DataFrame column name to match. The constructor of the class does a sanity check to verify the two are consistent. The constructor will throw an exception if the two dictionaries are inconstent. Note that means if you use an actual alias through match_keys (i.e. the key and value are different) you must define the aliases dictionary with the same combination reversed. (e.g. matchkeys={"KSTA":"sta"} requires aliases={"sta":"KSTA"}) :param db_or_df: MongoDB database handle or a pandas DataFrame. Most users will use the database handle version. In that case the collection argument is used to determine what collection is loaded into the cache. If using a DataFrame is used the collection name is only a tag defined by the user. For a DataFrame a column index is required that contains at least the attributes defined in attribute_to_load. :type db_or_df: MongoDB database handle or pandas DataFrame. :param collection: When using database input this is expected to be a string defining a valid MongoDB collection with documents that are to be scanned and loaded into the internal cache. With DataFrame input this string is only a tag. It is relevant then only if the prepend_collection_name boolean is set True. There is no default for this parameter so it must be specified as arg 1. :type collection: string :param match_keys: python dict of keys that are to be used for the equality match. The dict is used as an alias mechanism allowing different keys to be used for the Metadata container in data to be tested relative to the keys used in the database for the same attribute. (a typical common example would be something like "source_lat" in the data matching "lat" in the source collection). The key for each entry in this dict is taken as the key for the data side (mspass_object) and the value assigned to that key in this input is taken as the mongoDB/DataFrame key. :type match_keys: python dictionary :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find will fail. There is currently no default for this parameter and it must be defined as arg 3. :type attributes_to_load: list of string defining keys in collection documents :param query: optional query to apply to collection before loading data from the database. This parameter is ignored if the input is a DataFrame. A common use would be to reduce the size of the cache by using a time range limit on station metadata to only load records relevant to the dataset being processed. This parameter is currently ignored for DataFrame input as we assume pandas subsetting would be used for the same functionality in the workflow prior to calling the class constructor for this object. :type query: python dictionary. :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. Default resolves to an empty list. Note this parameter is ignored for DataFrame input. :type load_if_defined: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by the find and find_one method will all have the collection name prepended with a (fixed) separator. Default is False. :type prepend_collection_name: boolean :param require_unique_match: boolean handling of ambiguous matches. When True find_one will throw an error if an entry is tries to match is not unique. When False find_one returns the first document found and logs a complaint message. (default is True) :type require_unique_match: boolean """ def __init__( self, db_or_df, collection, match_keys, attributes_to_load, query=None, load_if_defined=None, aliases=None, require_unique_match=True, prepend_collection_name=False, custom_null_values=None, ): super().__init__( db_or_df, collection, attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=require_unique_match, prepend_collection_name=prepend_collection_name, custom_null_values=custom_null_values, ) if isinstance(match_keys, dict): self.match_keys = match_keys for key in match_keys: testkey = match_keys[key] # this means no aliasing for key if testkey == key: continue if testkey in self.aliases: backtestkey = self.aliases[testkey] if backtestkey != key: error_message = ( "EqualityMatcher constructor: " + "match_keys and aliases are inconsistent.\n" + "match_keys=" + str(match_keys) + " aliases=" + str(self.aliases) ) raise MsPASSError(error_message, ErrorSeverity.Fatal) else: error_message = ( "EqualityMatcher constructor: " + "match_keys and aliases are inconsistent.\n" + "match_key defines key=" + key + " to have alias name=" + testkey + " but alias name is not defined by aliases parameter" ) raise MsPASSError(error_message, ErrorSeverity.Fatal) else: raise TypeError( "EqualityMatcher Constructor: required argument 2 (matchkeys) must be a python dictionary" )
[docs] def subset(self, mspass_object) -> pd.DataFrame: """ Concrete implementation of this virtual method of DataFrameMatcher for this class. The subset is done sequentially driven by the order key order of the self.match_keys dictionary. i.e. the algorithm uses the row reduction operation of a dataframe one key at a time. An implementation detail is that there may be a more clever way instead create a single conditional clause to pass to the DataFrame operator [] combining the key matches with "and". That would likely improve performance, particulary on large tables. Note the alias is applied using the self.match_keys. i.e. one can have different keys on the left (mspass_data side is the match_keys dictionary key) than the right (dataframe column name). :param mspass_object: Any valid mspass data object with a Metadata container. The container must contain all the required match keys or the function will return an error condition (see below) :type mspass_object: TimeSeries, Seismogram, TimeSeriesEnsemble or SeismogramEnsemble object :return: DataFrame containing all data satisying the match series of match conditions defined on construction. Silently returns a zero length DataFrame if is no match. Be warned two other situations can cause the return to have no data: (1) dead input, and (2) match keys missing from mspass_object. """ if hasattr(mspass_object, "dead"): if mspass_object.dead(): return pd.DataFrame() # I don't think this can cause a memory problem as in python # this make dfret a temporary alias for self.cache # In the loop it is replaced by subset dataframes dfret = self.cache for key in self.match_keys.keys(): if mspass_object.is_defined(key): testval = mspass_object[key] # this allows an alias between data and dataframe keys dfret = dfret[dfret[self.match_keys[key]] == testval] else: return pd.DataFrame() return dfret
[docs]class EqualityDBMatcher(DatabaseMatcher): """ Database equivalent of EqualityMatcher. param db: MongoDB database handle (positional - no default) :type db: normally a MsPASS Database class but with this algorithm it can be the superclass from which Database is derived. :param collection: Name of MongoDB collection that is to be queried. This arg is required by the constructor and has not default. :type collection: string :param match_keys: python dict of keys that are to be used for the equality match. The dict is used as an alias mechanism allowing different keys to be used for the Metadata container in data to be tested relative to the keys used in the database for the same attribute. (a typical common example would be something like "source_lat" in the data matching "lat" in the source collection). The key for each entry in this dict is taken as the key for the data side (mspass_object) and the value assigned to that key in this input is taken as the mongoDB/DataFrame key. :type match_keys: python dictionary :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find will fail. There is no default for this class and the list must be defined as arg3. :type attributes_to_load: list of string defining keys in collection documents. :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. Default is to add load no optional data. :param type: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by find and find_one method will all have the collection name prepended with a (fixed) separator. For example, if the collection name is "channel" the "lat" attribute in the channel document would be returned as "channel_lat". Default is False. :type prepend_collection_name: boolean :param require_unique_match: boolean handling of ambiguous matches. When True find_one will throw an error if an entry is tries to match is not unique. When False find_one returns the first document found and logs a complaint message. (default is False) :type require_unique_match: boolean """ def __init__( self, db, collection, match_keys, attributes_to_load, load_if_defined=None, aliases=None, require_unique_match=False, prepend_collection_name=False, ): super().__init__( db, collection, attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=require_unique_match, prepend_collection_name=prepend_collection_name, ) if isinstance(match_keys, dict): self.match_keys = match_keys else: raise TypeError( "{} constructor: required arg2 must be a python dictionary - received invalid type. See docstring".format( self.__class__.__name__ ) )
[docs] def query_generator(self, mspass_object) -> dict: """ Implementation of required method for this class. It simply applies an equality test for all the keys defined by the values in the self.match_keys dict. """ query = dict() for dkey in self.match_keys: if dkey in mspass_object: value = mspass_object[dkey] query[self.match_keys[dkey]] = value else: # API says return none if generator fails return None return query
[docs]class OriginTimeDBMatcher(DatabaseMatcher): """ Generic class to match data by comparing a time defined in data to an origin time using a database query algorithm. The default behavior of this matcher class is to match data to source documents based on origin time with an optional time offset. Conceptually the data model for this matching is identical to conventional multichannel shot gathers where the start time is usually the origin time. It is also a common model for downloaded source oriented waveform segments from FDSN web services with obspy. Obspy has an example in their documentation for how to download data defined exactly this way. In that mode we match each source document that matches a projected origin time within a specified tolerance. Specifically, let t0 be the start time extracted from the data. We then compute the projected, test origin time as test_otime = t0 - t0offset. Note the sign convention that a positive offset means the time t0 is after the event origin time. We then select all source records for which the time field satisifies: source.time - tolerance <= test_time <= source.time + tolerance The test_time value for matching from a datum can come through one of two methods driven by the constructor argument "time_key". When time_key is a None (default) the algorithm assumes all input are mspass atomic data objects that have the start time defined by the attribute "t0" (mspass_object.t0). If time_key is a string it is assumed to be a Metadata key used to fetch an epoch time to use for the test. The most likely use of that feature would be for ensemble processing where test_time is set as a field in the ensemble Metadata. Note that form of associating source data to ensembles that are common source gathers can be much faster than the atomic version because only one query is needed per ensemble. :param db: MongoDB database handle (positional - no default) :type db: normally a MsPASS Database class but with this algorithm it can be the superclass from which Database is derived. :param collection: Name of MongoDB collection that is to be queried (default "source"). :type collection: string :param t0offset: constant offset from data start time that is expected as origin time. A positive t0offset means the origin time is before the data start time. Units are always assumed to be seconds. :type t0offset: float :param tolerance: time tolerance to test for match of origin time. (see formula above for exact use) If the source estimates are exactly the same as the ones used to define data start time this number can be a few samples. Otherwise a few seconds is safter for teleseismic data and less for local/regional events. i.e. the choice depends up on how the source estimates relate to the data. :type tolerance: float :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find_one will fail. Default is ["lat","lon","depth","time"] :type attributes_to_load: list of string defining keys in collection documents :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. Default is ["magnitude"] :param type: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by the find and find_one method will all have the collection name prepended with a (fixed) separator. For example, if the collection name is "channel" the "lat" attribute in the channel document would be returned as "channel_lat". :type prepend_collection_name: boolean :param require_unique_match: boolean handling of ambiguous matches. When True find_one will throw an error if an entry is tries to match is not unique. When False find_one returns the first document found and logs a complaint message. (default is False) :type require_unique_match: boolean """ def __init__( self, db, collection="source", t0offset=0.0, tolerance=4.0, query=None, attributes_to_load=["_id", "lat", "lon", "depth", "time"], load_if_defined=["magnitude"], aliases=None, require_unique_match=False, prepend_collection_name=True, data_time_key=None, source_time_key=None, ): super().__init__( db, collection, attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=require_unique_match, prepend_collection_name=prepend_collection_name, ) self.t0offset = t0offset self.tolerance = tolerance self.data_time_key = data_time_key self.source_time_key = source_time_key if query is None: query = {} if isinstance(query, dict): self.query = query else: raise TypeError( "{} constructor: query argument must define a python dictionary or a None: received invalid type. See docstring".format( self.__class__.__name__ ) )
[docs] def query_generator(self, mspass_object) -> dict: """ Concrete implementation of this required method for a subclass of DatabaseMatcher. This algorithm implements the time test described in detail in docstring for this class. Note the fundamental change in how the test time is computed that depends on the internal (self) attribute time_key. When None we use the data's t0 attribute. Otherwise self.time_key is assumed to be a string key to fetch the test time from the object's Metadata container. :param mspass_object: MsPASS defined data object that contains data to be used for this match (t0 attribute or content of self.time_key). :type mspass_object: Any valid MsPASS data object. :return: query python dictionary on sucess. Return None if a query could not be constructed. That happens two ways here. (1) If the input is not a valid mspass data object or marked dead. (2) if the time_key algorithm is used and time_key isn't defined in the input datum. """ # This could generate mysterious results if a user messes up # badly, but it makes the code more stable - otherwise # a parallel job could, for example, abort if one of the # components in a bag/rdd got set to None if not isinstance(mspass_object, Metadata): return None if hasattr(mspass_object, "dead"): if mspass_object.dead(): return None if self.data_time_key is None: # this maybe should have a test to assure UTC time standard # but will defer for now test_time = mspass_object.t0 else: if mspass_object.is_defined(self.data_time_key): test_time = mspass_object[self.data_time_key] else: return None test_time -= self.t0offset # depends upon self.query being initialized by constructor # as python dictionary query = copy.deepcopy(self.query) query["time"] = { "$gte": test_time - self.tolerance, "$lte": test_time + self.tolerance, } return query
[docs]class OriginTimeMatcher(DataFrameCacheMatcher): """ Generic class to match data by comparing a time defined in data to an origin time using a cached DataFrame. The default behavior of this matcher class is to match data to source documents based on origin time with an optional time offset. Conceptually the data model for this matching is identical to conventional multichannel shot gathers where the start time is usually the origin time. It is also a common model for downloaded source oriented waveform segments from FDSN web services with obspy. Obspy has an example in their documentation for how to download data defined exactly this way. In that mode we match each source document that matches a projected origin time within a specified tolerance. Specifically, let t0 be the start time extracted from the data. We then compute the projected, test origin time as test_otime = t0 - t0offset. Note the sign convention that a positive offset means the time t0 is after the event origin time. We then select all source records for which the time field satisifies: source.time - tolerance <= test_time <= source.time + tolerance The test_time value for matching from a datum can come through one of two methods driven by the constructor argument "time_key". When time_key is a None (default) the algorithm assumes all input are mspass atomic data objects that have the start time defined by the attribute "t0" (mspass_object.t0). If time_key is a string it is assumed to be a Metadata key used to fetch an epoch time to use for the test. The most likely use of that feature would be for ensemble processing where test_time is set as a field in the ensemble Metadata. Note that form of associating source data to ensembles that are common source gathers can be much faster than the atomic version because only one query is needed per ensemble. This implentation should be used only if the catalog of events is reasonably small. If the catalog is huge the database version may be more appropriate. :param db: MongoDB database handle (positional - no default) :type db: normally a MsPASS Database class but with this algorithm it can be the superclass from which Database is derived. :param collection: Name of MongoDB collection that is to be queried (default "source"). :type collection: string :param t0offset: constant offset from data start time that is expected as origin time. A positive t0offset means the origin time is before the data start time. Units are always assumed to be seconds. :type t0offset: float :param tolerance: time tolerance to test for match of origin time. (see formula above for exact use) If the source estimates are exactly the same as the ones used to define data start time this number can be a few samples. Otherwise a few seconds is safter for teleseismic data and less for local/regional events. i.e. the choice depends up on how the source estimates relate to the data. :type tolerance: float :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find_one will fail. Default is ["_id","lat","lon","depth","time"]. Note if constructing from a DataFrame created from something like a Datascope origin table this list will need to be changed to remove _id as it in that context no ObjectID would normally be defined. Be warned, however, that if used with a normalize function the _id may be required to match a "source_id" cross reference in a seismic data object. Also note that the list must contain the key defined by the related argument "source_time_key" as that is used to match times in the source data with data start times. :type attributes_to_load: list of string defining keys in collection documents :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. Default is ["magnitude"] :param type: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by the find and find_one method will all have the collection name prepended with a (fixed) separator. For example, if the collection name is "channel" the "lat" attribute in the channel document would be returned as "channel_lat". :type prepend_collection_name: boolean :param require_unique_match: boolean handling of ambiguous matches. When True find_one will throw an error if an entry is tries to match is not unique. When False find_one returns the first document found and logs a complaint message. (default is False) :type require_unique_match: boolean :param data_time_key: data object Metadata key used to fetch time for testing as alternative to data start time. If set None (default) the test will use the start time of an atomic data object for the time test. If nonzero it is assumed to be a string used to fetch a time from the data's Metadata container. That is the best way to run this matcher on Ensembles. :type data_time_key: string :param source_time_key: dataframe column name to use as source origin time field. Default is "time". This key must match a key in the attributes_to_load list or the constructor will throw an exception. Note this should match the key definingn origin time in the collection not the common actual value stored with data. I.e. normal usage is "time" not "source_time" :type source_time_key: string Can also be a None type which is causes the internal value to be set to "time" """ def __init__( self, db_or_df, collection="source", t0offset=0.0, tolerance=4.0, attributes_to_load=["_id", "lat", "lon", "depth", "time"], load_if_defined=["magnitude"], aliases=None, require_unique_match=False, prepend_collection_name=True, data_time_key=None, source_time_key="time", custom_null_values=None, ): super().__init__( db_or_df, collection, attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=require_unique_match, prepend_collection_name=prepend_collection_name, custom_null_values=custom_null_values, ) self.t0offset = t0offset self.tolerance = tolerance self.data_time_key = data_time_key if source_time_key is None: self.source_time_key = "time" else: self.source_time_key = source_time_key if self.source_time_key not in attributes_to_load: message = "OriginTimeMatcher constructor: " message += "key for fetching origin time=" + self.source_time_key message += " is not in attributes_to_load list\n" message += "Required for matching with waveform start times" raise MsPASSError(message, ErrorSeverity.Fatal)
[docs] def subset(self, mspass_object) -> pd.DataFrame: """ Implementation of subset method requried by inheritance from DataframeCacheMatcher. Returns a subset of the cache Dataframe with source origin times matching the definition of this object. i.e. a time interval relative to the start time defined by mspass_object. Note that if a key is given the time will be extrated from the Metadata container of mspass_object. If no key is defined (self.data_time_key == None) the t0 attribute of mspass_object will be used. """ if not isinstance(mspass_object, Metadata): return pd.DataFrame() if hasattr(mspass_object, "dead"): if mspass_object.dead(): return pd.DataFrame() if self.data_time_key is None: # this maybe should have a test to assure UTC time standard # but will defer for now test_time = mspass_object.t0 else: if mspass_object.is_defined(self.data_time_key): test_time = mspass_object[self.data_time_key] else: return pd.DataFrame() test_time -= self.t0offset tmin = test_time - self.tolerance tmax = test_time + self.tolerance # For this matcher we dogmatically use <= equivalent in the between # construct here - inclusive=True. In this context seems appropriate inclusive = '"both"' dfquery = ( self.source_time_key + ".between({tmin},{tmax},inclusive={inclusive})".format( tmin=tmin, tmax=tmax, inclusive=inclusive ) ) dfret = self.cache.query(dfquery) return dfret
[docs] def find_one(self, mspass_object) -> tuple: """ Override of find_one method of DataframeMatcher. The override is necessary to handle the ambiguity of a timer interval match for source origin times. That is, there is a finite probability that tow earthquakes can occur with the interval of this matcher defined by the time projected from the waveform start time (starttime - self.t0offset) + or - self.tolerance. When multiple matches are found this method handles that ambiguity by finding the source where the origin time is closest to the waveform start time corrected by self.t0offset. Note this method normally expects input to be an atomic seismic object. It also, however, accepts any object that is a subclass of Metadata. The most important example of that is `TimeSeriesEnsemble` and `SeismogramEnsemble` objects. For that to work, however, you MUST define a key to use to fetch a reference time in the constructor to this object via the `data_time_key` argument. If you then load the appropriate reference time in the ensemble's Metadata container you can normalize a common source gather's ensemble container with a workflow. Here is a code fragment illustrating the idea: ``` source_matcher = OriginTimeMatcher(db,data_time_key="origin_time") e = db.read_data(cursor, ... read args...) # read ensemle e # assume we got ths time (otime)vsome other way above e['origin_time'] = otime e = normalize(e,source_matcher) ``` If the match suceeds the attributes defined in te Dataframe cache will be loaded into the Metadata contaienr of e. That is the defiition of a common source gather. :param mspass_object: atomic seismic data object to be matched. The match is normally made against the datum's t0 value so there is an implict assumption the datum is a UTC epoch time. If a data set is passed through this operator and the data are relative time all will fail. The function intentionaly avoids that test for efficiency. A plain Metadata container can be passed through mspass_object if and only if it contains a value associated with the key defined by the starttime_key attibute. :return: a tuple consistent with the BasicMatcher API definition. (i.e. pair [Metadata,ErrorLogger]) """ findreturn = self.find(mspass_object) mdlist = findreturn[0] if mdlist is None: return findreturn elif len(mdlist) == 1: md2use = mdlist[0] elif len(mdlist) > 1: md2use = self._nearest_time_source(mspass_object, mdlist) return [md2use, findreturn[1]]
def _nearest_time_source(self, mspass_object, mdlist): """ Private method to define the algorithm used to resolve an ambiguity when multipe sources are returned by find. This returns the Metadata container for the source most whose offset origin time most closely matches te content defined my mspass_object. """ if self.prepend_collection_name: # the find method returns modified names if # prepend_collection_names is True. Note the # actual DAtaframe uses the names without thae prepend string # This is needed to handle that property of find time_key = self.collection + "_" + self.source_time_key else: time_key = self.source_time_key N_matches = len(mdlist) # find component of list with the minimum projected time offset dt = np.zeros(N_matches) i = 0 for md in mdlist: dt[i] = md[time_key] i += 1 # always use t0 if possile. # this logic, however, allows mspass_object to be a # plain Metadata container or a python dictionary # intentinally let this throw an exception for Metadata if the # required key is missing. If t0 is not defined it tries to # use self.data_time_key (normaly "startttme") if hasattr(mspass_object, "t0"): test_time = mspass_object.t0 else: test_time = mspass_object[self.data_time_key] test_time -= self.t0offset dt -= test_time dt = np.abs(dt) component_to_use = np.argmin(dt) return mdlist[component_to_use]
[docs] def find_doc(self, doc, starttime_key="starttime") -> dict: """ Override of the find_doc method of BasicMatcher. This method acts lke find_one but the inputs and outputs are different. The input to this method is a python dictionary that is expected to normally be a MongoDB document. The output is also a python dictionary without (normally) a reduced set of attributes defined by self.attributes_to_load and self.load_if_defined. We need to override the base class version of ths method because the base class version by default requires an atomic seismic data object (TimeSEries or Seismogram). The algorithm used is a variant of that in the subset method of this class. This method also differs from find_one it that it has no mechanism to log errors. find_one returns a Metadata container and an ErrorLogger container used to post messages. This method will return a None if there are errors that cause it to fail. That can be ambiguous because a None return also is used to indicate failure to match anything. The primary use of this method is normalizing an entire data set with the ObjetIds of source documnts with the `bulk_normaize` function. In that case additional forensic work is possible with MongoDB to uncover why a given document match failed. Because the interval match relative to a waveform start time can be ambiguous from global events (Although rare earthquakes can easily occur with + or - self.tolerance time) when multiple rows of the dataframe match the interval test the one returned is the one for which the time projected from the waveform start time (uses self.t0offset value) is defined as the match that is returned. :param doc: wf document (i.e. a document used to construct an atomic datum) to be matched with content of this object (assued the source collection or a variant that contains source origin times). :type doc: python dictionary :param starttime_key: key that can be used fetch the waveform segment start time that is to be used to match against origin times loaded in the object's cache. ' :type starttime_key: str (default "starttime") :return: python dictionary of the best match or None if there is no match or in nonfatal error conditions. """ if starttime_key in doc: test_time = doc[starttime_key] test_time -= self.t0offset # copied from subset method tmin = test_time - self.tolerance tmax = test_time + self.tolerance # For this matcher we dogmatically use <= equivalent in the between # construct here - inclusive=True. In this context seems appropriate inclusive = '"both"' dfquery = ( self.source_time_key + ".between({tmin},{tmax},inclusive={inclusive})".format( tmin=tmin, tmax=tmax, inclusive=inclusive ) ) subset_df = self.cache.query(dfquery) N_matches = len(subset_df) if N_matches <= 0: # no match return return None elif N_matches > 1: # first find the row with source origin time most closely # matching the doc starrtime value dt = np.zeros(N_matches) i = 0 for index, row in subset_df.iterrows(): # this key has to exist or we wouldn't get here dt[i] = row[self.source_time_key] dt -= test_time dt = np.abs(dt) row_index_to_use = np.argmin(dt) else: row_index_to_use = 0 row = subset_df.iloc[row_index_to_use] doc_out = dict() notnulltest = row.notnull() for k in self.attributes_to_load: if notnulltest[k]: if k in self.aliases: key = self.aliases[k] else: key = k if self.prepend_collection_name: if key == "_id": mdkey = self.collection + key else: mdkey = self.collection + "_" + key else: mdkey = key doc_out[mdkey] = row[key] else: # land here if a required attribute is missing # from the dataframe cache. find logs # an error but all we can do here is flag # failure returning None. There is a rare # possibilit of this failing with multiple # source documents where one is bad and the other # is not return None for k in self.load_if_defined: if notnulltest[k]: if k in self.aliases: key = self.aliases[k] else: key = k if self.prepend_collection_name: if key == "_id": mdkey = self.collection + key else: mdkey = self.collection + "_" + key else: mdkey = key doc_out[mdkey] = row[key] return doc_out else: return None
[docs]class ArrivalDBMatcher(DatabaseMatcher): """ This is a class for matching a table of arrival times to input waveform data objects. Use this version if the table of arrivals is huge and database query delays will not create a bottleneck in your workflow. Phase arrival time matching is a common need when waveform segments are downloaded. When data are assembled as miniseed files or url downloads of miniseed data, the format has no way to hold arrival time data. This matcher can prove useful for matching waveform segments with an origin as miniseed. The algorithm it uses for matching is a logic and of two tests: 1. We first match all arrival times falling between the sample range of an input MsPASS data object, d. That is, first component of the query is to find all arrival times, t_a, that obey the relation: d.t0 <= t_a <= d.endtime(). 2. Match only data for which the (fixed) name "sta" in arrival and the data match. A secondary key match using the "net" attribute is used only if "net" is defined with the data. That is done to streamline processing of css3.0 data where "net" is not defined. Note the concept of an arrival time is also mixed as in some contexts it means a time computed from an earth model and other time a measured time that is "picked" by a human or computer algorithm. This class does not distinguish model-based from measured times. It simply uses the time and station tag information with the algorithm noted above to attempt a match. :param db: MongoDB database handle (positional - no default) :type db: normally a MsPASS Database class but with this algorithm it can be the superclass from which Database is derived. :param collection: Name of MongoDB collection that is to be queried (default "arrival", which is not currently part of the stock mspass schema. Note it isn't required to be in the schema and illustrates flexibility'). :type collection: string :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find_one will fail. Default ["phase","time"]. :type attributes_to_load: list of string defining keys in collection documents :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. Default is None :param type: list of strings defining collection keys :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by the find and find_one method will all have the collection name prepended with a (fixed) separator. For example, if the collection name is "channel" the "lat" attribute in the channel document would be returned as "channel_lat". :type prepend_collection_name: boolean :param require_unique_match: boolean handling of ambiguous matches. When True find_one will throw an error if an entry is tries to match is not unique. When False find_one returns the first document found and logs a complaint message. (default is False) :type require_unique_match: boolean :param query: optional query predicate. That is, if set the interval query is appended to this query to build a more specific query. An example might be station code keys to match a specific pick for a specific station like {"sta":"AAK"}. Another would be to limit arrivals to a specific phase name like {"phase" : "ScS"}. Default is None which reverts to no query predicate. :type query: python dictionary or None. None is equivalewnt to passing an empty dictionary. A TypeError will be thrown if this argument is not None or a dict. """ def __init__( self, db, collection="arrival", attributes_to_load=["phase", "time"], load_if_defined=None, aliases=None, require_unique_match=False, prepend_collection_name=True, query=None, ): super().__init__( db, collection, attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=require_unique_match, prepend_collection_name=prepend_collection_name, ) if query is None: self.query = dict() elif isinstance(query, dict): self.query = query else: raise TypeError( "ArrivalDBMatcher constructor: query arg must define a python dictionary" )
[docs] def query_generator(self, mspass_object) -> dict: """ Concrete implementation of method required by superclass DatabaseMatcher. This generator implements the switching algorithm noted in the class docstring. That is, for atomic data the time span for the interval query is determined from the range of the waveform data received through mspass_object. For ensembles the algorithm fetches fields defined by self.startime_key and self.endtime_key to define the time interval. The interval test is overlaid on the self.query input. i.e. the query dict components derived are added to the self.query. """ if _input_is_atomic(mspass_object): if mspass_object.live: query = copy.deepcopy(self.query) stime = mspass_object.t0 etime = mspass_object.endtime() query["time"] = {"$gte": stime, "$lte": etime} # these names are frozen sta = _get_with_readonly_recovery(mspass_object, "sta") net = _get_with_readonly_recovery(mspass_object, "net") if net is not None: query["net"] = net if sta is not None: query["sta"] = sta # intentionally ignore loc as option return query else: return None
[docs]class ArrivalMatcher(DataFrameCacheMatcher): """ This is a class for matching a table of arrival times to input waveform data objects. Use this version if the table of arrivals is not huge enough to cause a memory problem. Phase arrival time matching is a common need when waveform segments are downloaded. When data are assembled as miniseed files or url downloads of miniseed data, the format has no way to hold arrival time data. This matcher can prove useful for matching waveform segments with an origin as miniseed. The algorithm it uses for matching is a logic and of two tests: 1. We first match all arrival times falling between the sample range of an input MsPASS data object, d. That is, first component of the query is to find all arrival times, t_a, that obey the relation: d.t0 <= t_a <= d.endtime(). 2. Match only data for which the (fixed) name "sta" in arrival and the data match. A secondary key match using the "net" attribute is used only if "net" is defined with the data. That is done to streamline processing of css3.0 data where "net" is not defined. Note the concept of an arrival time is also mixed as in some contexts it means a time computed from an earth model and other time a measured time that is "picked" by a human or computer algorithm. This class does not distinguish model-based from measured times. It simply uses the time and station tag information with the algorithm noted above to attempt a match. This implementation caches the table of attributes desired to an internal pandas DataFrame. It is thus most appropriate for arrival tables that are not huge. Note it may be possible to do appropriate preprocessing to manage the arrival table size. e.g. the table can be grouped by station or in time blocks and then processed in a loop updating waveform database records in multiple passes. The alternative for large arrival tables is to use the DB version of this matcher. :param db: MongoDB database handle (positional - no default) :type db: normally a MsPASS Database class but with this algorithm it can be the superclass from which Database is derived. :param collection: Name of MongoDB collection that is to be queried (default "arrival", which is not currently part of the stock mspass schema. Note it isn't required to be in the schema and illustrates flexibility'). :type collection: string :param attributes_to_load: list of keys of required attributes that will be returned in the output of the find method. The keys listed must ALL have defined values for all documents in the collection or some calls to find_one will fail. Default ["phase","time"]. :type attributes_to_load: list of string defining keys in collection documents :param load_if_defined: list of keys of optional attributes to be extracted by find method. Any data attached to these keys will only be posted in the find return if they are defined in the database document retrieved in the query. Default is None :param type: list of strings defining collection keyes :param aliases: python dictionary defining alias names to apply when fetching from a data object's Metadata container. The key sense of the mapping is important to keep straight. The key of this dictionary should match one of the attributes in attributes_to_load or load_if_defined. The value the key defines should be the alias used to fetch the comparable attribute from the data. :type aliaes: python dictionary :param prepend_collection_name: when True attributes returned in Metadata containers by the find and find_one method will all have the collection name prepended with a (fixed) separator. For example, if the collection name is "channel" the "lat" attribute in the channel document would be returned as "channel_lat". :type prepend_collection_name: boolean :param require_unique_match: boolean handling of ambiguous matches. When True find_one will throw an error if an entry is tries to match is not unique. When False find_one returns the first document found and logs a complaint message. (default is False) :type require_unique_match: boolean :param ensemble_starttime_key: defines the key used to fetch a start time for the interval test when processing with ensemble data. Default is "starttime". :type ensemble_starttime_key: string :param ensemble_endtime_key: defines the key used to fetch a end time for the interval test when processing with ensemble data. Default is "endtime". :type ensemble_endtime_key: string :param query: optional query predicate. That is, if set the interval query is appended to this query to build a more specific query. An example might be station code keys to match a specific pick for a specific station like {"sta":"AAK"}. Default is None. :type query: python dictionary or None. None is equivalewnt to passing an empty dictionary. A TypeError will be thrown if this argument is not None or a dict. """ def __init__( self, db_or_df, collection="arrival", attributes_to_load=["phase", "time"], load_if_defined=None, aliases=None, require_unique_match=False, prepend_collection_name=True, ensemble_starttime_key="starttime", ensemble_endtime_key="endtime", arrival_time_key=None, custom_null_values=None, ): super().__init__( db_or_df, collection, attributes_to_load=attributes_to_load, load_if_defined=load_if_defined, aliases=aliases, require_unique_match=require_unique_match, prepend_collection_name=prepend_collection_name, custom_null_values=custom_null_values, ) # maybe a bit confusing to shorten the names here but the # argument names are a bit much self.starttime_key = ensemble_starttime_key self.endtime_key = ensemble_endtime_key if arrival_time_key is None: self.arrival_time_key = collection + "_time" elif isinstance(arrival_time_key, str): self.arrival_time_key = arrival_time_key else: raise TypeError( "ArrivalDBMatcher constructor: arrival_time_key argument must define a string" )
[docs] def subset(self, mspass_object) -> pd.DataFrame: """ Concrete implementation of method required by superclass DataFramematcher """ if isinstance(mspass_object, Metadata): if mspass_object.live: if _input_is_atomic(mspass_object): stime = mspass_object.t0 etime = mspass_object.endtime() else: if mspass_object.is_defined( self.starttime_key ) and mspass_object.is_defined(self.endtimekey): stime = mspass_object[self.starttime_key] etime = mspass_object[self.endtime_key] else: return pd.DataFrame() sta = _get_with_readonly_recovery(mspass_object, "sta") if sta is not None: dfret = self.cache[ ("sta" == sta) & (self.arrival_time_key >= stime) & (self.arrival_time_key <= etime) ] if len(dfret) > 1: net = _get_with_readonly_recovery(mspass_object, "net") if net is not None: dfret = dfret["net" == net] return dfret else: return pd.DataFrame() else: return pd.DataFrame()
[docs]@mspass_func_wrapper def normalize(mspass_object, matcher, kill_on_failure=True): """ Generic function to do in line normalization with dask/spark map operator. In MsPASS we use the normalized data model for receiver and source metadata. The normalization can be done during any reads if the data have cross-referencing ids defined as described in the User's Manual. This function provides a generic interface to link to a normalizing collection within a workflow using a map operator applied to a dask bag or spark rdd containing a dataset of MsPASS data objects. The algorithm is made generic through the matcher argument that must point a concrete implementation of the abstract base class defined in this module as BasicMatcher. For example, suppose we create a concrete implementation of the MiniseedMatcher using all defaults from a database handle db as follows: matcher = MiniseedMatcher() Suppose we then load data from wf_miniseed with read_distributed_data into the dask bag we will call dataset. We can normalize that data within a workflow as follows: dataset = dataset.map(normalize,matcher) :param mspass_object: data to be normalized :type mspass_object: For all mspass matchers this must be one of the mspass data types of TimeSeries, Seismogram, TimeSeriesEnsemble, or SeismogramEnsemble. Many matchers have further restrictions. e.g. the normal use of the MiniseedMatcher using the defaults like the example insists the data received are either TimeSeries or Seismogram objects. Read the docstring carefully for your matcher choice for any limitations. :param matcher: a generic matching function that is a subclass of BasicMatcher. This function only calls the find_one method. :type matcher: must be a concrete subclass of BasicMatcher :param kill_on_failure: when True if the call to the find_one method of matcher fails the datum returned will be marked dead. :type kill_on_failure: boolean :return: copy of mspass_object. dead data are returned immediately. if kill_on_failure is true the result may be killed on return. """ if hasattr(mspass_object, "dead"): if mspass_object.dead(): return mspass_object find_output = matcher.find_one(mspass_object) # api of BasicMatcher specified a pair return we handle here if find_output[0] is None: mspass_object.kill() else: # this could be done with operator+= in C++ with appropriate # casting but I think this is the only solution here # we are just copying the return Metadata contents to the data for k in find_output[0]: mspass_object[k] = find_output[0][k] # append any error log returns to the data elog # operator += is bound to python by pybind11 so this works if find_output[1] is not None: mspass_object.elog += find_output[1] return mspass_object
[docs]def bulk_normalize( db, wfquery=None, wf_col="wf_miniseed", blocksize=1000, matcher_list=None, ): """ This function iterates through the collection specified by db and wf_col, and runs a chain of normalization funtions in serial on each document defined by the cursor returned by wfquery. It speeds updates by using the bulk methods of MongoDB. The chain also speeds updates as the all matchers in matcher_list append to the update string for the same wf_col document. A typical example would be to run this function on wf_miniseed data running a matcher to set channel_id, site_id, and source_id. :param db: should be a MsPASS database handle containing the wf_col and the collections defined by the matcher_list list. :param wf_col: The collection that need to be normalized, default is wf_miniseed :param blockssize: To speed up updates this function uses the bulk writer/updater methods of MongoDB that can be orders of magnitude faster than one-at-a-time updates. A user should not normally need to alter this parameter. :param wfquery: is an optional query to apply to wf_col. The output of this query defines the list of documents that the algorithm will attempt to normalize as described above. The default (None) will process the entire collection (query set to an emtpy dict). :param matcher_list: a list of instances of one or more subclasses of BasicMather. In addition to the required classes all instances passed to through this interface must contain two required attributes: (1) collection which defines the collection name, and (2) prepend_collection_name is a boolean that determines if the attributes loaded should have the collection name prepended (e.g. channel_id). In addition, all instances must define the find_doc method which is not required by the BasicMatcher interface. (find_doc is comparable to find_one but uses a python dictionary as the container instead of referencing a mspass data object. find_one is the core method for inline normalization) :return: a list with a length of len(matcher_list)+1. 0 is the number of documents processed in the collection (output of query), The rest are the numbers of success normalizations for the corresponding NMF instances, they are mapped one on one (matcher_list[x] -> ret[x+1]). """ if wfquery is None: wfquery = {} if matcher_list is None: # The default value for matcher_list is for wf_miniseed with channel # Assume the defaults are sufficient but we limit the required # attribute list to save memory channel_matcher = MiniseedMatcher( db, attributes_to_load=["starttime", "endtime", "_id"] ) matcher_list = [channel_matcher] for matcher in matcher_list: if not isinstance(matcher, BasicMatcher): raise MsPASSError( "bulk_normalize: the component in the matcher list={} is not a subclass of BasicMatcher".format( str(matcher) ), ErrorSeverity.Fatal, ) # check that matcher has the find_doc method implemented - it is # not defined in the BasicMatcher interface and is required if not ( hasattr(matcher, "find_doc") and callable(getattr(matcher, "find_doc")) ): message = "matcher_list contains class={classname} that does not have an implementation of the find_doc method required by this function - try using it in a map operator".format( classname=type(matcher).__name__ ) raise MsPASSError("bulk_normalize: " + message, ErrorSeverity.Fatal) # these two attributes are also required and best checked here # for a minor cost if not hasattr(matcher, "prepend_collection_name"): message = "matcher list class={classname} does not define required attribute prepend_collection_name - trying using it in a map operator".format( classname=type(matcher).__name__ ) raise MsPASSError("bulk_normalize: " + message, ErrorSeverity.Fatal) if not hasattr(matcher, "collection"): message = "matcher list class={classname} does not define required attribute collection - trying using it in a map operator".format( classname=type(matcher).__name__ ) raise MsPASSError("bulk_normalize: " + message, ErrorSeverity.Fatal) ndocs = db[wf_col].count_documents(wfquery) if ndocs == 0: raise MsPASSError( "bulk_normalize: " + "query={wfquery} of collection={wf_col} yielded 0 documents\nNothing to process".format( wfquery=wfquery, wf_col=wf_col ), ErrorSeverity.Fatal, ) # this incantation initializes cnt_list as a list with number of # components set as the size of matcher_list and initialized to 0 cnt_list = [0] * len(matcher_list) counter = 0 cursor = db[wf_col].find(wfquery) bulk = [] for doc in cursor: wf_id = doc["_id"] need_update = False update_doc = {} for ind, matcher in enumerate(matcher_list): norm_doc = matcher.find_doc(doc) if norm_doc is None: # not this silently ignores failures # may want this to count failures for each matcher continue for key in matcher.attributes_to_load: new_key = key if matcher.prepend_collection_name: if key == "_id": new_key = matcher.collection + key else: new_key = matcher.collection + "_" + key update_doc[new_key] = norm_doc[new_key] cnt_list[ind] += 1 need_update = True if need_update: bulk.append(pymongo.UpdateOne({"_id": wf_id}, {"$set": update_doc})) counter += 1 # Tests for counter and len(bulk) are needed because the logic here # allows this block to be entered the first pass and if the pass # after the previous call to bulk_write did not yield a match # either will cause bulk_write to throw an error when it gets an # an empty list. Should consider a logic change here # to make this less obscure if counter % blocksize == 0 and counter != 0 and len(bulk) > 0: db[wf_col].bulk_write(bulk) bulk = [] if counter % blocksize != 0: db[wf_col].bulk_write(bulk) return [ndocs] + cnt_list
[docs]def normalize_mseed( db, wfquery=None, blocksize=1000, normalize_channel=True, normalize_site=True, ): """ In MsPASS the standard support for station information is stored in two collections called "channel" and "site". When normalized with channel collection data a miniseed record can be associated with station metadata downloaded by FDSN web services and stored previously with MsPASS database methods. The default behavior tries to associate each wf_miniseed document with an entry in "site". In MsPASS site is a smaller collection intended for use only with data already assembled into three component bundles we call Seismogram objects. For both channel and site the association algorithm used assumes the SEED convention wherein the strings stored with the keys "net","sta","chan", and (optionally) "loc" define a unique channel of data registered globally through the FDSN. The algorithm then need only query for a match of these keys and a time interval match with the start time of the waveform defined by each wf_miniseed document. The only distinction in the algorithm between site and channel is that "chan" is not used in site since by definition site data refer to common attributes of one seismic observatory (commonly also called a "station"). :param db: should be a MsPASS database handle containing at least wf_miniseed and the collections defined by the norm_collection list. :param blockssize: To speed up updates this function uses the bulk writer/updater methods of MongoDB that can be orders of magnitude faster than one-at-a-time updates for setting channel_id and site_id. A user should not normally need to alter this parameter. :param wfquery: is a query to apply to wf_miniseed. The output of this query defines the list of documents that the algorithm will attempt to normalize as described above. The default will process the entire wf_miniseed collection (query set to an emtpy dict). :param normalize_channel: boolean for handling channel collection. When True (default) matches will be attempted with the channel collection and when matches are found the associated channel document id will be set in the associated wf_miniseed document as channel_id. :param normalize_site: boolean for handling site collection. When True (default) matches will be attempted with the site collection and when matches are found the associated site document id will be set wf_miniseed document as site_id. Note at least one of the two booleans normalize_channel and normalize_site must be set True or the function will immediately abort. :return: list with three integers. 0 is the number of documents processed in wf_miniseed (output of query), 1 is the number with channel ids set, and 2 contains the number of site documents set. 1 or 2 should contain 0 if normalization for that collection was set false. """ if wfquery is None: wfquery = {} if not (normalize_channel or normalize_site): raise MsPASSError( "normalize_mseed: usage error. normalize_channel and normalize_site cannot both be set False", ErrorSeverity.Fatal, ) matcher_function_list = [] # in the calls to the constructors below starttime and endtime # must be included for the miniseed matcher to work if normalize_channel: matcher = MiniseedMatcher( db, collection="channel", attributes_to_load=["_id", "starttime", "endtime"], prepend_collection_name=True, ) matcher_function_list.append(matcher) if normalize_site: sitematcher = MiniseedMatcher( db, collection="site", attributes_to_load=["_id", "starttime", "endtime"], prepend_collection_name=True, ) matcher_function_list.append(sitematcher) bulk_ret = bulk_normalize( db, wfquery=wfquery, wf_col="wf_miniseed", blocksize=blocksize, matcher_list=matcher_function_list, ) ret = [bulk_ret[0]] if normalize_channel: ret.append(bulk_ret[1]) if normalize_site: ret.append(bulk_ret[2]) else: ret.append(0) else: ret.append(0) if normalize_site: ret.append(bulk_ret[1]) else: ret.append(0) return ret
def _get_test_time(d, time): """ A helper function to get the test time used for searching. If the time is given, we simply use that as the test time. Otherwise (the time is None), we first try to get the start time from d. If start time is not defined in d, None is return to indicate the time field should be ignored. :param d: Data object with a Metadata container to extract the field :param time: the start time used for matching :return: the test_time extracted """ if time == None: if isinstance(d, (TimeSeries, Seismogram)): test_time = d.t0 else: if d.is_defined("starttime"): test_time = d["starttime"] else: # Use None for test_time as a signal to ignore time field test_time = None else: test_time = time return test_time def _get_with_readonly_recovery(d, key): """ Private method for repetitious handling of trying to use the readonly tag to recover if one of net, sta, chan, or loc have been botched with readonly tag. d is the datum from with key is to be extracted. Returns a None if recovery failed. """ ROTAG = "READONLY_" if d.is_defined(key): return d[key] else: testkey = ROTAG + key if d.is_defined(testkey): return d[testkey] else: return None def _input_is_atomic(d): """ A variant of input_is_valid is to test if a data object is atomic by the mspass definition. In this case, that means a datum is not an ensemble. This is necessary, for example, to assume something like asking for d.t0 doesn't generate an exception. """ return isinstance(d, (TimeSeries, Seismogram)) def _load_as_df(db, collection, query, attributes_to_load, load_if_defined): """ Internal helper function used to translate all or part of a collection to a DataFrame. This algorithm should only be used for small collections as it makes an intermediate copy of the collection as a dictionary before calling the DataFrame.from_dict method to create the working dataframe. :param db: Database handle assumed to contain collection :param collection: collection from which the data are to be extracted :param query: python dict defining a query to apply to the collection. If you want the entire collection specify None or an empty dictionary. :type query: python dict defining a pymongo query or None. :param attributes_to_load: list of keys to extract of required attributes to load from collection. This function will abort if any document does not contain one of these attributes. :param load_if_defined: attributes loaded more cautiously. If the attributes for any of the keys in this list are not found in a document the output dataframe has a Null defined for that cell. """ if query is None: query = dict() ntuples = db[collection].count_documents(query) if ntuples == 0: return pd.DataFrame() # create dictionary with empty array values to initialize dict_tmp = dict() for k in attributes_to_load: dict_tmp[k] = [] for k in load_if_defined: dict_tmp[k] = [] cursor = db[collection].find(query) for doc in cursor: # attributes_to_load list are required. For now let this # thow an exception if that is not true - may need a handler for k in attributes_to_load: dict_tmp[k].append(doc[k]) for k in load_if_defined: if k in doc: dict_tmp[k].append(doc[k]) else: dict_tmp[k].append(None) return pd.DataFrame.from_dict(dict_tmp) def _extractData2Metadata( doc, attributes_to_load, aliases, prepend_collection_name, collection, load_if_defined, ): md = Metadata() for k in attributes_to_load: if k in doc: if k in aliases: key = aliases[k] else: key = k if prepend_collection_name: if key == "_id": mdkey = collection + key else: mdkey = collection + "_" + key else: mdkey = key md[mdkey] = doc[key] else: message = "Required attribute {key} was not found".format( key=k, ) raise MsPASSError( "DictionaryCacheMatcher._load_normalization_cache: " + message, ErrorSeverity.Fatal, ) for k in load_if_defined: if k in doc: if k in aliases: key = aliases[k] else: key = k if prepend_collection_name: if key == "_id": mdkey = collection + key else: mdkey = collection + "_" + key else: mdkey = key md[mdkey] = doc[key] return md