Normalization
Concepts
A universal property of any data amenable to storage in a database
is that some attributes are highly redundant. For instance,
consider a typical seismic example.
A data set of 1 million waveforms recorded on the order of
1000 fixed seismic stations would have the same station coordinates repeated around
1000 times if stored with each waveform.
That redundancy problem was recognized decades ago as a fundamental
weakness of the use of static “headers” in seismic processing of any kind.
It was, in fact, one of the key motivations for the development of the
CSS3.0 relational database schema in the 1980s.
The standard CSS3.0
relational database schema handles this issue by defining
a set of tables that are linked to the waveform index (wfdisc)
using a relational database “join”. MongoDB is not relational
but handles the same issue by what they call the normalized
versus the embedded
data model
(MongoDB’s documentation on this topic can be found
here).
Normalization is conceptually similar to a relational database join, but
is implemented in a different way that has implications on performance.
For small datasets these issues can be minor, but for very large
data sets we have found poorly designed normalization algorithms
can be a serious bottleneck to performance.
A key difference all users need to appreciate
is that with a relational database, a “join” is always a global operation done between all
tuples in two relations (tables or table subsets). In MongoDB
normalization is an atomic operation made one document (recall a document
is analogous to a tuple) at a time. Because all database operations are
expensive in time we have found that it is important to parallelize the normalization
process and reduce database transactions where possible.
We accomplish that in one of two ways described in the subsections
below: (1) as part of the reader, and (2) with parallel normalization
functions that can be applied in a dask/spark map call.
A novel feature of the MsPASS normalization we discuss later is that
we have abstracted the process in a way that provides great flexibility
in how normalizing data is loaded. e.g. receiver location information can
be loaded either through the MsPASS
site
or channel
collections using MongoDB or by
loading the data from a pandas DataFrame.
An example of the utility of a DataFrame is that all Antelope CSS3.0
tables are easily loaded into a DataFrame with one line of python code
(most require read_fwf
but some can be read with read_csv
)
That abstraction is possible because a MongoDB “collection”
is just an alternative way to represent a table (relation).
Before proceeding it is important to give a pair of definitions we use repeatedly
in the text below. We define the normalizing
collection/table as the
smaller collection/table holding the repetitious data we aim to cross-reference.
In addition, when we use the term target of normalization
we mean the thing into which data in the normalizing collection are to be copied.
The “target of the normalization” in all current examples is one of the
waveform index collections (wf_miniseed, wf_TimeSeries, or wf_Seismogram)
or, in the case of in-line normalization functions, the Metadata container of
one of the MsPaSS data objects.
Normalization with readers
Overview
Almost all workflows begin with a set of initializations. In a production workflow using MsPASS initialization is normally followed immediately by one of two MongoDB constructs:
For serial processing most workflows reduce to an outer loop driven by a MongoDB
Cursor
object. ACursor
is the output of the standard “find” method for the handle to any MongoDB collection.Parallel workflows in MsPASS are driven by a call to the
read_distributed_data
function.
In either case, normalization
can always
be accomplished one of two ways:
The two core readers in MsPASS (
read_data<mspasspy.db.database.read_data()
for serial workflows andread_distributed_data
for parallel workflows) both have anormalize
argument that can contain a list of one of more normalization operators. When applied this way normalization is more-or-less treated as part of the process of constructing the data object that form the dataset.Normalization can be applied within a workflow as illustrated in examples below.
Both approaches utilize the concept of a normalization operator
we discuss in detail in this section. Readers familiar with relational
database concept may find it helpful to view a normalization operator
as equivalent to the operation used to define a database join.
This section focuses on the first approach. The second is covered in a later section below. The most common operators for normalization while reading are those using a cross-referencing id key. We discuss those concepts first before showing examples of normalization during read. Note the next section on ids is equally relevant to normalization in a workflow, but we include it here because it is more central to normalization during reading.
Defining Cross-referencing IDs
An “id” is a common concept in all database implementations. Relational datadata schemas like CSS3.0 have numerous integer ids used for join keys between tables. Integers were traditionally used as cross-reference keys as it is relatively easy to maintain uniqueness and computers do few operations faster than an integer equality test. MongoDB uses a custom data object they call an ObjectId. Conceptually, however, an ObjectId is simply an alternative way to guarantee a unique key for a database document (equivalent to a tuple in relational database theory) to the more traditional integer keys. Note this is in contrast to using integer keys where the set of possible values is finite and some mechanism is required to ask the database server for a key to assure it is unique. ObjectIds can be generated by a workflow without interaction with the MongoDB server. You can learn more about this aspect of ObjectIds here.
Using the ObjectId methods provides the fastest normalization methods
available in MsPASS. Currently the most common model for
data processing is a collection of miniseed files downloaded from
FDSN data services and/or a collection of files created from a
field experiment. Once these files are indexed with
the index_mseed
method
they can be read directly to initiate a processing workflow.
Such data can be normalized with the
operator MiniseedMatcher
without using an Id, but in our experience that is not advised
for two reasons. First, the complexity of SEED data makes it challenging
to know if the channel
collection is complete. We have found
many examples of incomplete or inaccurate station data downloaded
from FDSN that cause some fraction of waveforms in a large dataset to not have any
matching channel
entry. A second, more minor issue, is that
the complexity of the algorithm used by
MiniseedMatcher
makes it inevitably slower than the comparable Id-based algorithm called
ObjectIdMatcher
.
We suggest that unless you are absolutely certain of the
completeness of the channel
collection, you should use the
Id-based method discussed here for doing normalization while reading.
Because miniseed normalization is so fundamental to modern seismology data,
we created a special python function called
normalize_mseed
.
It is used for defining channel_id
(optionally site_id
) matches in the wf_miniseed
collection.
This function is implemented with the matcher called
MiniseedMatcher
mentioned earlier.
The normalize_mseed
function efficiently handles the lookup and
database updates by caching the index in memory and using a bulk update
method to speed update times. We strongly recommend use of this function
for miniseed data as a simpler implementation was found to be as much as two
orders of magnitude slower than the current algorithm. The data on that
development is preserved
here on github.
Normalizing source data is often a more complicated problem. How difficult
depends heavily upon how the data time segmentation is
defined. MsPASS currently has support for only two source association
methods: (1) one where the start time of each datum is a constant offset
relative to an event origin time, and (2) a more complicated method based on
arrival times that can be used to associate data with start times relative
to a measured or predicted phase arrival time. The later can easily violate
the assumption of the normalizing collection being small compared to the
waveform collection. The number of arrivals can easily exceed the number of
waveform segments.
In both cases, normalization to set source_id
values are best
done with the mspass function
bulk_normalize
.
How to actually accomplish that is best understood by consulting the examples
below.
Here is an example of running normalize_mseed
as a precursor to
reading and normalizing miniseed data:
from mspasspy.client import Client
from mspasspy.database.normalize import normalize_mseed
dbclient = Client()
db = dbclient.get_database("mydatabase")
retcodes = normalize_mseed(db)
print("Number of wf_miniseed documents processed=",retcodes[0])
print("Number of documents that normalize_mseed set channel_id=",retcode[1])
Examples of normalization while reading
This is an example serial job that would use the result
from running normalize_mseed
in the example above:
from mspasspy.client import Client
from mspasspy.database.normalize import MiniseedMatcher
dbclient = Client()
db = dbclient.get_database("mydatabase")
# channel is the default collection for this class
channel_matcher = MiniseedMatcher(db)
# loop over all wf_miniseed records
cursor = db.wf_miniseed.find({})
for doc in cursor:
d = db.read_data(doc,
normalize=[channel_matcher],
collection="wf_miniseed",
)
# processing functions here
# normally terminated with a save operation or a graphic display
Notice the use of the normalize argument that tells the reader to
normalize with the channel collection. A parallel version of the
example above requires use of the function
read_distributed_data
.
The following does the same operation as above in parallel with dask
from mspasspy.client import Client
from mspasspy.db.database import read_distributed_data
from mspasspy.database.normalize import MiniseedMatcher
dbclient = Client()
db = dbclient.get_database("mydatabase")
channel_matcher = MiniseedMatcher(db)
# loop over all wf_miniseed records
cursor = db.wf_miniseed.find({})
dataset = read_distributed_data(cursor,
normalize=[channel_matcher],
collection='wf_miniseed',
)
# porocessing steps as map operators follow
# normally terminate with a save
dataset.compute()
Reading ensembles with normalization is similar. The following is a serial job that reads ensembles and normalizes the ensemble with data from the source and channel collections. It assumes source_id was defined previously.
from mspasspy.client import Client
from mspasspy.db.normalize import MiniseedMatcher, ObjectIdMatcher
dbclient = Client()
db = dbclient.get_database("mydatabase")
channel_matcher = MiniseedMatcher(db)
source_matcher = ObjectIdMatcher(db,
collection="source",
attributes_to_load=["lat","lon","depth","time","_id"],
)
# this assumes the returned list is not enormous
sourceid_list = db.wf_miniseed.distinct("source_id")
for srcid in sourceid_list:
cursor = db.wf_miniseed.find({"source_id" : srcid})
ensemble = db.read_data(cursor,
normalize=[channel_matcher],
normalize_ensemble=[source_matcher])
# processing functions for ensembles to follow here
# normally would be followed by a save
Note that we used a different option to handle the source collection in this example. This is an example of creating a set of “common source gathers” (all data from a common source) so it is natural to post the source attributes to the ensemble’s Metadata container instead of each enemble “member”. Putting the source_matcher object as the target for the normalize_ensemble argument accomplishes that. For ensembles loading data to members is the implied meaning of any target for the normalize argument.
Note
The normalize_ensemble feature was added on version 2 of MsPASS. Older versions did not implement that extension.
Normalization with a workflow
Normalization within a workflow uses the same “Matcher” operators but is best done through a function call in a serial job or with a map operator in a parallel job. It is perhaps easiest to demonstrate how this is done by rewriting the examples above doing normalization during read with the equivalent algorithm for normalization as a separate step within the workflow.
First, the serial example:
from mspasspy.client import Client
from mspasspy.database.normalize import MiniseedMatcher,normalize
dbclient = Client()
db = dbclient.get_database("mydatabase")
# channel is the default collection for this class
channel_matcher = MiniseedMatcher(db)
# loop over all wf_miniseed records
cursor = db.wf_miniseed.find({})
for doc in cursor:
d = db.read_data(doc,collection="wf_miniseed")
d = normalize(d,channel_matcher)
# processing functions here
# normally terminated with a save operation or a graphic display
Next, the parallel version of the job immediately above:
from mspasspy.client import Client
from mspasspy.db.database import read_distributed_data
from mspasspy.database.normalize import MiniseedMatcher,normalize
dbclient = Client()
db = dbclient.get_database("mydatabase")
channel_matcher = MiniseedMatcher(db)
# loop over all wf_miniseed records
cursor = db.wf_miniseed.find({})
dataset = read_distributed_data(cursor,collection="wf_miniseed")
dataset = dataset.map(normalize,channel_matcher)
# processing steps as map operators follow
# normally terminate with a save
dataset.compute()
Finally, the example for reading ensembles:
from mspasspy.client import Client
from mspasspy.db.normalize import MiniseedMatcher, ObjectIdMatcher, normalize
dbclient = Client()
db = dbclient.get_database("mydatabase")
channel_matcher = MiniseedMatcher(db)
source_matcher = ObjectIdMatcher(db,
collection="source",
attributes_to_load=["lat","lon","depth","time","_id"],
)
# this assumes the returned list is not enormous
sourceid_list = db.wf_miniseed.distinct("source_id")
for srcid in sourceid_list:
cursor = db.wf_miniseed.find({"source_id" : srcid})
ensemble = db.read_ensemble_data(cursor, collection="wf_miniseed")
ensemble = normalize(ensemble,channel_matcher,apply_to_members=True)
ensemble = normalize(ensemble,source_matcher)
# processing functions for ensembles to follow here
# normally would be followed by a save
Note that we had to set apply_to_members True to have the normalize function process all enemble members. Normal behavior for that function with ensembles is to normalize the ensemble Metadata container as is done with the source_matcher line. Both are necessary to match the examples for normalizing during read which the above were designed to produce identical result by different paths.
Note
The apply_to_members argument is a feature added in version 2 of MsPASS.
Normalization Operators
Overview
This section covers the available normalization operators in MsPASS. It focuses on design concepts and listing the available features. See the examples above and near the end of this section for more nuts and bolts details. The examples below all use the normalization within a workflow approach.
Concepts
Normalization can be abstracted as two concepts that need to be implemented to make a concrete normalization procedure:
We need to define an algorithm that provides a match of records in the normalizing collection with the target of the normalization. A matching algorithm may return a unique match (one-to-one) or multiple matches (one-to-many).
After a match is found we need to copy a set of attributes from the normalizing collection to the target. By definition a standard normalization operation requires the match be one-to-one.
We abstract both of these operations in a novel way in MsPASS through a standardized API we call a “matcher”.
Matchers
Normalization requires a rule that defines how documents in
the normalizing collection match documents in the target.
A match can be defined by
something as simple as a single key string match or it
can be some arbitrarily complex algorithm. For example,
the standard seismology problem of matching SEED waveform data
to receiver metadata requires matching four
different string keys (station-channel codes) and a time interval.
Any matching operation, however, has a simple idea as the core concept:
matching requires an algorithm that can be applied to a collection/table with a boolean
outcome for each document/tuple/row. That is, the algorithm returns
True if there is a match and a False if the match fails.
In MsPASS we define this abstraction in an object-oriented perspective
using inheritance and an abstract base class that defines the
core generic operation. You can read the docstrings of
BasicMatcher
for details.
Note that the API requires a concrete instance of this base class to
implement two core methods: find
is used for a one-to-many match
algorithm while
find_one
is the primary method for one-to-one matches.
Note we require even unique matchers to implement find
since one is
simply a special case of “many”.
The choice of those two names
(find
and find_one
) was not
arbitrary. They are the names used to implement the same concepts in MongoDB
as methods of their database handle object. In fact, as a convenience the
normalize module defines the intermediate class
DatabaseMatcher
that provides a layer to simply the creation of a matcher to work directly with
MongoDB. That class implements find
and find_one
as
generic wrapper code that translates MongoDB documents into the (different)
structure required by the base class,
BasicMatcher
.
To make the database matcher generic,
concrete implementations of DatabaseMatcher
are required to implement the method query_generator
.
That approach allows the implementation to have a generic algorithm for
find
and find_one
with a series of matching classes
that are subclasses of DatabaseMatcher
with different implementations
of query_generator
. The following table is a summary of concrete
matcher classes that are subclasses of DatabaseMatcher
with links
to the docstring for each class:
Class Name |
Use |
---|---|
Match with MongoDB ObjectId |
|
Miniseed match with net:sta:chan:loc and time |
|
Generic equality match of one or more key-value pairs |
|
match data with start time defined by event origin time |
|
match arrival times to waveforms |
As noted many times in this User’s Manual database transactions are expensive
operations due to the inevitable lag from the time between issuing a query until
the result is loaded into your program’s memory space. The subclasses
derived from DatabaseMatcher
are thus most useful for one of two situations: (1) the normalizing
collection is large and the matching algorithm can use an effective
MongoDB index, or (2) the dataset is small enough that the cost of the queries
is not overwhelming.
When the normalizing collection is small we have found a much faster way
to implement normalization is via a cacheing algorithm. That is, we
load all or part of a collection/table into a data area
(a python class self
attribute) “matcher” object
(i.e. a concrete implementation of
BasicMatcher
.).
The implementation then only requires an efficient search algorithm
to implement the required
find
and
find_one
methods. We supply two generic search algorithms as part of MsPASS
implemented as two intermediate classes used similarly to
DatabaseMatcher
:
DictionaryCacheMatcher
uses a python dictionary as the internal cache. It is most useful when the matching algorithm can be reduced to a single string key. The class implements a genericfind
method by using a python list to hold all documents/tuples that match the dictionary key. Note the returned list is actually a list of Metadata containers as defined by the base class API. We do that for efficiency as Metadata containers are native to MsPASS data objects that are the target of the normalization.DataFrameCacheMatcher
uses the more flexible Pandas DataFrame API. to store it’s internal cache. The Pandas library is robust and has a complete set of logical constructs that can be used to construct any query possible with something like SQL and more. Any custom, concrete implementations ofBasicMatcher
that match the small normalizing collection assumption would be best advised to utilize the pandas API.
These two intermediate-level classes have two features in common:
Both can load the normalizing collection in one of two forms: (a) via a MongoDB database handle combined with a
collection
name argument, or (b) a Pandas DataFrame object handle. The former, for example, can be used to loadsite
collection metadata from MongoDB and the later can be used to load comparable data from an Antelopesite
table via the Pandas read_csv method or similar methods for loading a DataFrame from an SQL relational database.Both provide generic implementations of the
find
andfind_one
methods required byBasicMatcher
.
These two classes differ mainly in what they require to make them
concrete. That is, both have abstract/virtual methods that are required
to make a concrete implementation.
DictionaryCacheMatcher
requires implementation of
cache_id
and
db_make_cache_id
.
That minor complication was implemented to allow an implementation to use
different keys to access attributes stored in the database and
the equivalent keys used to access the same data in a workflow.
In addition, there is a type mismatch between a document/tuple/row
abstraction in a MongoDB document and the internal use by the matcher
class family. That is, pymongo represents a “document” as a
python dictionary while the matchers require posting the same data to
the MsPASS Metadata container to work more efficiently with the C++
code base that defines data objects.
DataFrameCacheMatcher
requires only the method
subset
used to select only the rows in the DataFrame that define a “match”
for the complete, concrete class. For more details see the docstrings that
can be viewed by following the hyperlinks above. We also discuss these
issues further in the subsection on writing a custom matcher below.
The following table is a summary of concrete matcher classes that utilize a cacheing method. As above each name is a hyperlink to the docstring for the class:
Class Name |
Use |
---|---|
Match with MongoDB ObjectId as dictioary key for cache |
|
Miniseed match with net:sta:chan:loc and time |
|
Generic equality match of one or more key-value pairs |
|
match data with start time defined by event origin time |
Noting that currently all of these have database query versions that differ only
by have “DB” embedded in the class name
(e.g. the MongoDB version of EqualityMatcher
is EqualityDBMatcher
.)
Examples
Example 1: ObjectId matching
The abstraction of defining matching through a python class allows the
process of loading normalizing data into a dataset through a single,
generic function called normalize
.
That function was designed exclusively for use in map operations. The
idea is most clearly seen by a simple example.
from mspasspy.client import Client
from mspasspy.db.normalize import ObjectIdMatcher,normalize
from mspasspy.db.database import read_distributed_data
dbclient = Client()
db = dbclient.get_database("mydatabase")
# Here limit attributes to be loaded to coordinates
# Note these are defined when the matcher class is instantiated
attribute_list = ['_id','lat','lon','elev']
matcher = ObjectIdMatcher(db,collection="site",attributes_to_load=attribute_list)
# This says load the entire dataset presumed staged to MongoDB
cursor = db.wf_TimeSeries.find({}) #handle to entire data set
dataset = read_distributed_data(cursor,collection='wf_TimeSeries') # dataset returned is a bag
dataset = dataset.map(normalize,matcher)
# additional workflow elements and usually ending with a save would be here
dataset.compute()
This example loads receiver coordinate information from data that was assumed
previously loaded into MongoDB in the “site” collection. It assumes
matching can be done using the site collection ObjectId loaded with the
waveform data at read time with the key “site_id”. i.e. this is an
inline version of what could also be accomplished by
calling read_distribute_data
with a matcher for site in the normalize list.
Key things this example demonstrates common to all in-line normalization workflows are:
normalize
appears only as arg0 of a map operation (dask syntax - Spark would require a “lambda” function in the map call).The “matcher” is created as an initialization before loading data. It is then used by passing it as an argument to the normalize function in the map operation.
Only the attributes defined in the constructor for the matcher are copied to the Metadata container of the data being processed. In this example after running the normalize function the each datum for which a match was found will contain attributes with the following keys:
site_id
,site_lat
,site_lon
, andsite_elev
. Note these have the string “site_” automaticaly prepended by default. That renaming can be disable by setting theprepend_collection_name
to False. By default failures in matching cause the associated waveform data to be marked dead with an informational error log posted to the result.
Example 2: miniseed matching
This example illustrates the in-line equivalent of running the
normalization function for miniseed data noted above called
normalize_mseed
.
This example would load and process an entire dataset defined in
the wf_miniseed collection of a database with the name “mydatabase”.
It shows how a list of keys are used to limit what
attributes are extracted from the channel and site collections
and loaded into each datum. These are defined by the
symbols channel_attribute_list
and site_atribute_list
.
As in example 1 creation of the matcher classes to match the
waveforms to site and channel collection documents is an initialization
step. That is, we “construct” two concrete matchers we assign the symbols
channel_matcher
and site_matcher
.
As above these matches are passed as an argument to the normalize
function in a map operator.
from mspasspy.client import Client
from mspasspy.db.normalize import MiniseedMatcher
from mspasspy.db.database import read_distributed_data
dbclient = Client()
db = dbclient.get_database("mydatabase")
# Here limit attributes to be loaded to coordinates and orientations
channel_attribute_list = ['_id','lat','lon','elev','hang','vang']
site_attribute_list = ['_id','lat','lon','elev']
# These construct the channel a site normalizers
channel_matcher = MiniseedMatcher(db,collection="channel",
attributes_to_load=channel_attribute_list)
site_matcher = MiniseedMatcher(db,collection="site",
attributes_to_load=site_atribute_list)
cursor = db.wf_miniseed.find({}) #handle to entire data set
dataset = read_distributed_data(cursor) # dataset returned is a bag/rdd
dataset = dataset.map(normalize,channel_matcher)
dataset = dataset.map(normalize,site_matcher)
# additional processing steps normally would be inserted here
dataset.compute()
Example 3: source normalization
This example shows an example of how to insert source data into
a parallel workflow. As above we use the dask syntax for a map operator.
This example uses the matcher called OriginTimeMatcher
which works only for waveform segments where the start time of the
signal is a constant offset from the event origin time.
It illustrates another useful feature in the constructor
argument load_if_defined
. This example uses one key, “magnitude”,
for that list. The use is that if a value is associated with the key
“magnitude” in the normalizing collection it will be loaded with the data.
If it is no defined it will be silently ignored and left undefined. Note
that is in contrast to keys listed in “attributes_to_load” that are treated
as required. As noted above if any of the attributes_to_load keys are
missing a datum will, by default, be killed.
from mspasspy.client import Client
from mspasspy.db.normalize import OriginTimeMatcher
from mspasspy.db.database import read_distributed_data
dbclient = Client()
db = dbclient.get_database("mydatabase")
# Here limit attributes to be loaded to source coordinates
attribute_list = ['_id,''lat','lon','depth','time']
# define source normalization instance assuming data start times
# were defined as 20 s after the origin time of the event
# origin time used to define the data time window
source_matcher = OriginTimeMatcher(db,t0offset=20.0,
attributes_to_load=attribute_list,load_if_defined=["magnitude"])
cursor = db.wf_Seismogram.find({}) #handle to entire data set
dataset = read_distributed_data(cursor) # dataset returned is a bag/rdd
dataset = dataset.map(normalize,source_matcher)
# additional processing steps normally would be inserted here
dataset.compute()
Example 4: ensemble processing
This example is a variant of example 3 immediately above but
implemented on ensembles. That is, here the normalizing data
attributes are loaded in the SeismogramEnsemble’s Metadata container
and not copied to the members of the ensemble. This workflow is
a way to assemble what would be called “common-shot gathers”
in seismic reflection processing.
It uses a common
trick for ensemble processing building a dask bag from distinct source_id
values, constructing a ensemble-based query from the id, and then
calling the read_ensemble_data
method within a parallel map call
to create the ensembles. The bag of ensembles are then normalized.
Finally note that this example is a hybrid of database normalization and
in-line normalization. The example assumes that the user has previously
run a function like bulk_normalize
to set the cross-referencing
id for the source collection source_id
.
from mspasspy.client import Client
from mspasspy.db.normalize import ObjectIdMatcher,MiniseedMatcher
from mspasspy.io.distributed import read_distributed_data
def srcidlist2querylist(srcidlist):
"""
Small function used to build query list from a list of source ids.
Uses a new feature of read_distribute_data from version 2 forward
that allows creation of a bag/rdd from a list of python dict containers
assumed to be valid MongoDB queries.
"""
querylist=list()
for srcid in srcidlist:
query={'source_id' : srcid}
querylist.append(query)
return querylist
dbclient = Client()
db = dbclient.get_database("mydatabase")
channel_matcher=MiniseedMatcher(db)
# Here limit attributes to be loaded to source coordinates
attribute_list = ['_id,''lat','lon','depth','time']
source_matcher = ObjectIdMatcher(db,collection="source",
attributes_to_load=attribute_list,load_if_defined=["magnitude"])
# MongoDB incantation to find all unique source_id values
sourceid_list = db.wf_Seismogram.distinct("source_id")
querylist=srcidlist2querylist(sourceid_list)
dataset = read_distributed_data(querylist,normalize=[channel_matcher])
# dataset here is a bag of SeismogramEnsembles. The next line applies
# normalize to the ensemble and loading the attributes into the ensemble's
# Metadata container.
dataset = dataset.map(normalize,source_matcher)
# additional processing steps normally would be inserted here
dataset.compute()
Custom Normalization Functions
If the current set of normalization algorithms are not sufficient for your data, you may need to develop a custom normalization algorithm. We know of three solutions to that problem:
Think about what you are trying to match and see if it is possible to use header math functions Header (Metadata) Math to construct a new Metadata attribute that can be used for a generic match like
EqualityMatcher
. Similarly for string manipulation you may be able to create a special character string to define your match with a custom python function you could use in a map operation prior to using one or the MsPASS generic matchers.Write a custom python function for matching keys in a wf collection and a normalizing correction. The recommended approach is to have the function set the ObjectId of the normalizing collection in the wf collection using the MsPASS naming convention for such ids (e.g. “source_id” to normalize source). With this approach you would use the standard update methods of pymongo easily found from numerous web tutorials. You will also find examples in the MsPASS tutorials found here. Then you can use the
normalize
argument with the readers to load normalizing data at read time or use the inline versionObjectIdDBMatcher
orObjectIdMatcher
.Write an extension class to the intermediate level, subclasses of the base class
BasicMatcher
described above (DatabaseMatcher
,DictionaryCacheMatcher
, andDataFrameCacheMatcher
). One could also build directly on the base class, but we can think of no example where that would be preferable to extending one of the intermediate classes. The remainder of this section focuses only on some hints for extending one of the intermediate classes.
We assume the reader has some familiarity with the general concept of inheritance
in object-oriented programming. If not, some supplementary web research
may be needed to understand the concepts behind some of the terminology below
before an extension is attempted. If you have a sound understanding of inheritance
in object oriented programming, you may want to just ignore the rest of this
section and see how we implemented concrete matcher classes in the
mspasspy.db.normalize
module and use one of them as a template
to modify. You might, however, still find the following useful to understand the
concepts behind our design.
The syntax for inheritance is a standard python construct best illustrated here by a simple example:
from mspasspy.db.normalize import DataFrameCacheMatcher
class MyCustomMatcher(DataFrameCacheMatcher):
# class implementation code
Any class needs a constructor as part of the API. Most will
want to use the superclass constructor to simplify the setup.
Here is an example of the how the class MyCustomMatcher
above
could utilize the base class constructor to allow it to work
cleanly with the base class find
and find_one
methods:
class MyCustomMatcher(DataFrameCacheMatcher):
def __init__(
self,
db,
# additional required arguments with o default would be defined here
collection="site",
attributes_to_load=["lat", "lon", "elev"],
load_if_defined=None,
aliases=None,
prepend_collection_name=True,
# additional optional arguments with defaults would added here
):
super().__init__(
db,
collection,
attributes_to_load=attributes_to_load,
load_if_defined=load_if_defined,
aliases=aliases,
require_unique_match=True,
prepend_collection_name=prepend_collection_name,
)
# any additional argument would be parse to set self variables here
The point of that somewhat elaborate construct is to cleanly construct the
base class, which here is DataFrameCacheMatcher
, from the
inputs to a constructor. An instance of the above using all defaults
could then be created with the following construct:
matcher = MyMatcher(db)
As the comments note, however, a typical implementation would usually need to add one or more required or optional arguments to define constants that define properties of the matching algoithm you are implementing.
Finally, as noted earlier each of the intermediate classes have one or more required
methods that the intermediate class declares to be “abstract” via
the @abstractmethod
decorator defined in the ABC
module.
The methods declared “abstract” are null in the intermediate class.
For an implementation to work it must be made “concrete”, in the language used by the ABC
documentation, by implementing the methods tagged with the
@abstractmethod
decorator. Requirement for each of the
intermediate classes you should use to build your custom matcher are:
The
DatabaseMatcher
requires implementing only one method calledquery_generator
. That method needs to create a python dictionary in pymongo syntax that is to be applied to the normalizing collection. That query would normally be constructed from one or more Metadata attributes in a data object but time queries may also want to use the data start time and endtime available as methods in atomic data objects. Consult the MongoDB documentation for guidance on the syntax of pymongo’s query language based on python dictionaries.The
DictionaryCacheMatcher
requires implementing two methods.cache_id
is a function that needs to return a unique string that defines the key to the python dictionary used as to implement a cache in this intermediate class. The other method,db_make_cache_id
, needs to do the same thing and create identical keys. The difference between the two is thatdb_make_cache_id
is used as the data loader to create the dictionary-based cache whilecache_id
is used to construct the comparable key from a MsPASS data object.The
DataFrameCacheMatcher
requires subclasses to implement only one method calledsubset
. TheDataFrameCacheMatcher
defines its cache internally with the symbolself.cache
. That symbol defines a pandas container. The subset method you implement can use the rich API of pandas to define the matching operation you need to build. Pandas are so widely used there is an overwhelming volume of material you can use for a reference. Here is a reasonable starting point. In any case, a key point is that thesubset
method you implement needs to fetch attributes from the input data object’s Metadata (header) and/or the data objects internals (e.g. start time, end time, and orientation data) to construct a pandas query to select the rows of the cached DataFrame that match that stored internally with the data.
We close this section by emphasizing that the value of using class inheritance
from the BasicMatcher
family is you can then utilize it in a
map operator to load attributes from a normalizating collection within a
workflow. Here, for example, is a variant of example 1 using MyMatcher
:
from mspasspy.client import Client
from mspasspy.db.database import read_distributed_data
# import for MyMatcher would appear here
dbclient = Client()
db = dbclient.get_database("mydatabase")
matcher = MyMatcher(db)
cursor = db.wf_miniseed.find({}) #handle to entire data set
dataset = read_distributed_data(cursor) # dataset returned is a bag
dataset = dataset.map(normalize,matcher)
# additional workflow elements and usually ending with a save would be here
dataset.compute()
If you compare this to example 1 you will see that the only difference is setting
the symbol matcher
to an instance of MyMatcher
instead of
an ObjectIdMatcher
.