mspasspy.io

mspasspy.io.distributed.post2metadata(mspass_object, doc)[source]

Posts content of key-value pair container doc to Metadata container of mspass_object. Operation only requires that doc be iterable over keys and both doc and mspass_object act like associative arrays. In MsPASS that means doc can be either a dict or a Metadata container. Note the contents of doc[k] will overwrite any existing content defined by mspass_object[k]. It returns the edited copy of mspass_object to mesh with map operator usage in read_distributed_data. Note that Metadata “is a” fundamental component of all atomic data. Ensemble objects contain a special instance of Metadata we normally refer to as “ensemble metadata”. When handling ensembles the content of doc are pushed to the ensemble metadata and member attributes are not altered by this function.

This function is the default for the read_distributed_data argument container_merge_function which provides the means to push a list of Metadata or dict containers held in a conguent bag/rdd with a set of seismic data objects. Because of that usage it has no safeties for efficiency.

mspasspy.io.distributed.post_error_log(d, doc, other_elog=None, elog_key='error_log') dict[source]

Posts error log data as a subdocument to arg0 datum (symbol d).

write_distributed_data has a “post_elog” boolean. By default elog entries for any atomic seismic datum will be posted one-at-a-time with insert_one calls to the “elog” collection. If a workflow expects a large number of error log entries that high database traffic can be a bottleneck. This function is used within write_distributed_data to avoid that by posting the same data to subdocuments attached to wf documents saved with live data.

Note dead data are never handled by this mechanism. They always end up in either the abortions or cemetery collections.

class mspasspy.io.distributed.pyspark_mappartition_interface(db, collection, dbname=None)[source]

Bases: object

Interface class required for pyspark mapPartition.

This class is a workaround for a limitation of pyspark’s api for mapPartition that is a legacy of its scala foundation. mapPartition only accepts a function name as arg0 and has no provision for any optional arguments to the function. An alternative solution found in web sources was “currying” and it might have been possible to do this with just a function wrapper with fixed kwarg values defined with in the write_distributed_data function. In this case, however, because write_distributed_data handles an entire bag/rdd as input a class can be created at the initialization stage of that function. Otherwise the overhead of a wrapper would likely be smaller.

Perhaps the feature most useful to make this a class is the safety valve if db is a None. (see below)

The method partioned_save_wfdoc is just an alias for the file scope function _partitioned_save_wfdoc with the parameters for that function defined by self content.

partitioned_save_wfdoc(iterator)[source]

Method used as a wrapper to pass on to pyspark’s mapPartitions operator. iterator is assumed to be the iterator passed to the function per partition by mapPartitions.

mspasspy.io.distributed.read_distributed_data(data, db=None, query=None, scratchfile=None, collection='wf_TimeSeries', mode='promiscuous', normalize=None, normalize_ensemble=None, load_history=False, exclude_keys=None, scheduler='dask', npartitions=None, spark_context=None, data_tag=None, sort_clause=None, container_merge_function=<function post2metadata>, container_to_merge=None, aws_access_key_id=None, aws_secret_access_key=None)[source]

Parallel data reader for seismic data objects.

In MsPASS seismic data objects need to be loaded into a Spark RDD or Dask bag for parallel processing. This function abstracts that process parallelizing the read operation where it can do so. In MsPASS all data objects are created by constructors driven by one or more documents stored in a MongoDB database collection we will refer to here as a “waveform collection”. Atomic data are built from single documents that may or may not be extended by “normalization” (option defined by normalize parameter). The constructors can get the sample data associated with a waveform document by a variety of storage methods that are abstracted to be “under the hood”. That is relevant to understanding this function because an absolutely required input is a handle to the waveform db collection OR an image of it in another format. Furthermore, the other absolute is that the output is ALWAYS a parallel container; a spark RDD or a Dask bag. This reader is the standard way to load a dataset into one of these parallel containers.

A complexity arises because of two properties of any database including MongoDB. First, queries to any database are almost always very slow compared to processor speed. Initiating a workflow with a series of queries is possible, but we have found it ill advised as it can represent a throttle on speed for many workflows where the time for the query is large compared to the processing time for a single object. Second, all database engines, in contrast, can work linearly through a table (collection in MongoDB) very fast because they cache blocks of records send from the server to the client. In MongoDB that means a “cursor” returned by a “find” operation can be iterated very fast compared to one-by-one queries of the same data. Why all that is relevant is that arg0 if this function is required to be one of three things to work well within these constraints:

  1. An instance of a mspass Database handle (mspasspy.db.database.Database). Default with this input is to read an entire collection. Use the query parameter to limit input. This mode also implicitly implies the result will be a bag/RDD of atomic data. Note that for efficiency when run in this mode the first step the algorithm does is load the entire set of documents defined by query (or the default of all documents) into a pandas DataFrame. If that approach causes memory issues use the scratchfile option to buffer the large table into a scratch file and then construct a dask or spark DataFrame that are designed as containers that can overflow memory.

  2. One of several implementations of a “Dataframe”, that are an image or a substitute for a MongoDB waveform collection. A Dataframe, for example, is a natural output from any sql (or, for seismologists, an Antelope) database. This reader supports input through a pandas Dataframe, a Dask Dataframe, or a pyspark Dataframe. THE KEY POINT about dataframe input, however, is that the attribute names must match schema constraints on the MongoDB database that is required as an auxiliary input when reading directly from a dataframe. Note also that Dataframe input also only makes sense for Atomic data with each tuple mapping to one atomic data object to be created by the reader.

  3. Ensembles represent a fundamentally different problem in this context. An ensemble, by definition, is a group of atomic data with an optional set of common Metadata. The data model for this function for ensembles is it needs to return a RDD/bag containing a dataset organized into ensembles. The approach used here is that a third type of input for arg0 (data) is a list of python dict containers that are ASSUMED to be a set of queries that defined the grouping of the ensembles. For example, a data set you want to process as a set of common source gathers (ensmebles) might be created using a list something like this: [{“source_id” : ObjectId(‘64b917ce9aa746564e8ecbfd’)},

    {“source_id” : ObjectId(‘64b917d69aa746564e8ecbfe’)}, … ]

This function is more-or-less three algorithms that are run for each of the three cases above. In the same order as above they are:

  1. With a Database input the function first iterates through the entire set of records defined for specified collection (passed via collection argument) constrained by the (optional) query argument. Note that processing is run serial but fast because it working through a cursor is optimized in MongoDB and the proceessing is little more than reformatting the data.

  2. The dataframe is passed through a map operator (dask or spark depending on the setting of the scheduler argument) that constructs the output bag/RDD in parallel. The atomic operation is calls to db.read_data, but the output is a bag/RDD of atomic mspass data objects. That means reads will be parallel with one reader per worker.

  3. Ensembles are read in parallel with granularity defined by a partitioning of the list of queries set by the npartitions parameter. Parallelism is achieved by calling the function internal to this function called read_ensemble_parallel in a map operator. That function queries the database using the query derived form arg0 of this function and uses the return to call the Database.read_data method.

Reading all types in a parallel context have a more subtle complexity that arises in a number of situations. That is, many algorithms are driven by external lists of attributes with one item in the list for each datum to be constructed and posted to the parallel container output by this function. An example is a list of arrival times used to create a dataset of waveform segments windowed relative to the list of arrival times from continuous data or a dataset built from longer time windows (e.g. extracting P wave windows from hour long segments created for teleseismic data.). That requires a special type of matching that is very inefficient (i.e. slow) to implement with MongoDB (It requires a query for every datum.) One-to-one matching attributes can handled by this function with proper use of the two attributes container_to_merge and container_merge_function. container_to_merge must be either a dask bag or pyspark RDD depending on the setting of the argument spark_context (i.e. if spark_context is defined the function requires the container be an RDD while a dask bag is the default.) The related argument, container_merge_function, is an optional function to use to merge the content of the components of container_to_merge. The default assumes container_to_merge is a bag/RDD of Metadata or python dict containers that are to be copied (overwriting) the Metadata container of each result bag/RDD component. Note that for atomic data that means the Metadata container for each TimeSeries or Seismogram constructed by the reader while for ensembles the attributes are posted to the ensemble’s Metadata container. A user supplied function to replace the default must have two arguments where arg0 is the seismic data to receive the edits defined by container_to_merge while arg1 should contain the comparable component of container_to_merge. Note this capability is not at all limited to Metadata. This function can contain anything that provides input for applying an algorithm that alters the datum given a set of parameters passed through the container_to_merge.

Normalization with normalizing collections like source, site, and channel are possible through the normalize argument. Normalizers using data cached to memory can be used but are likely better handled after a bag/rdd is created with this function via one or more map calls following this reader. Database-driven normalizers are (likely) best done through this function to reduce unnecessary serialization of the database handle essential to this function (i.e. the handle is already in the workspace of this function). Avoid the form of normalize used prior to version 2.0 that allowed the use of a list of collection names. It was retained for backward compatibility but is slow.

A final complexity users need to be aware of is how this reader handles any errors that happen during construction of all the objects in the output bag/rdd. All errors that create invalid data objects produce what we call “abortions” in the User Manual and docstring for the mspasspy.util.Undertaker.Undertaker class. Invalid, atomic data will be the same type as the other bag/rdd components but will have the following properties that can be used to distinguish them:

  1. They will have the Metadata field “is_live” set False.

  2. The data object itself will have the interal attribute “live” set False.

  3. The Metadata field with key “is_abortion” will be defined and set True.

  4. The sample array will be zero length (datum.npts==0)

Ensembles are still ensembles but they may contain dead data with the properties of atomic data noted above EXCEPT that the “is_live”. attribute will not be set - that is used only inside this function. An ensemble return will be marked dead only if all its members are found to be marked dead.

Parameters:

data (mspasspy.db.database.Database or pandas.DataFrame) – variable type arguement used to drive construction as described above. See above for how this argument drives the functions behavor. Note when set as a Database handle the cursor argument must be set. Otherwise it is ignored.

or dask.dataframe.core.DataFrame or pyspark.sql.dataframe.DataFrame for atomic data. List of python dicts defining queries to read a dataset of ensembles.

Parameters:
  • db (mspasspy.db.Database. Can be None (default) only if the data parameter contains the database handle. Other uses require this argument to be set.) – Database handle for loading data. Required input if reading from a dataframe or with ensemble reading via list of queries. Ignored if the “data” parameter is a Database handle.

  • query (python dict defining a valid MongoDB query.) – optional query to apply to input collection when using a mspasspy.db.Database as input. Ignored for dataframe or a list input. Default is None which means no query is used.

  • scratchfile (str) – This argument is referenced only when input is drive by a mspasspy.db.Database handle. For very large datasets loading the entire set of documents that define the dataset into memory can be an issue on a system with smaller “RAM” memory available. This optional argument makes this function scalable to the largest conceivable seismic data sets. When defined the documents retreived from the database are reformatted and pushed to a scratch file with the name defined by this argument. The contents of the file are then reloaded into a dask or spark DataFrame that allow the same data to be handled within a more limited memory footprint. Note use of this feature is rare and should never be necessary in an HPC or cloud cluster. The default us None which means this that database input is loaded directly into memory to initiate construction of the parallel container output.

  • collection (string) – waveform collection name for reading. Default is “wf_TimeSeries”.

  • mode (str) – reading mode that controls how the function interacts with the schema definition for the data type. Must be one of [‘promiscuous’,’cautious’,’pedantic’]. See user’s manual for a detailed description of what the modes mean. Default is ‘promiscuous’ which turns off all schema checks and loads all attributes defined for each object read.

  • normalize – List of normalizers. This parameter is passed directly to the Database.read_data method internally. See the docstring for that method for how this parameter is handled. For atomic data each component is used with the normalize function to apply one or more normalization operations to each datum. For ensembles, the same operation is done in a loop over all ensembles members (i.e. the member objects are atomic and normalized in a loop.). Use normalize_ensemble to set values in the ensemble Metadata container.

  • load_history – boolean (True or False) switch used to enable or disable object level history mechanism. When set True each datum will be tagged with its origin id that defines the leaf nodes of a history G-tree. See the User’s manual for additional details of this feature. Default is False.

  • exclude_keys (a list of str) – Sometimes it is helpful to remove one or more attributes stored in the database from the data’s Metadata (header) so they will not cause problems in downstream processing.

  • scheduler (str) – Set the format of the parallel container to define the dataset. Must be either “spark” or “dask” or the job will abort immediately with a ValueError exception

  • spark_context (pyspark.SparkContext) – If using spark this argument is required. Spark defines the concept of a “context” that is a global control object that manages schduling. See online Spark documentation for details on this concept.

  • npartitions (int) – The number of desired partitions for Dask or the number of slices for Spark. By default Dask will use 100 and Spark will determine it automatically based on the cluster. If using this parameter and a container_to_merge make sure the number used here matches the partitioning of container_to_merge. If specified and container_to_merge is defined this function will test them for consistency and throw a ValueError exception if they don’t match. If not set (i.e. left default of None) that test is not done and the function assumes container_to_merge also uses default partitioning.

  • data_tag (str) – The definition of a dataset can become ambiguous when partially processed data are saved within a workflow. A common example would be windowing long time blocks of data to shorter time windows around a particular seismic phase and saving the windowed data. The windowed data can be difficult to distinguish from the original with standard queries. For this reason we make extensive use of “tags” for save and read operations to improve the efficiency and simplify read operations. Default turns this off by setting the tag null (None).

  • sort_clause (if None (default) no sorting is invoked when reading ensembles. Other wise should be a python list of tuples defining a sort order. e.g. [("sta",pymongo.ASCENDING),()"time",pymongo.ASCENDING)]) – When reading ensembles it is sometimes helpful to apply a sort clause to each database query. The type example is reading continuous data where there it is necessary to sort the data into channels in time order. If defined this should be a clause that can be inserted in the MongoDB sort method commonly applied in a line like this: cursor=db.wf_miniseed.find(query).sort(sort_clause). This argument is tested for existence only when reading ensembles (implied with list of dict input).

  • container_to_merge (dask bag or pyspark RDD. The number of partitions in the input must match the explicit (i.e. set with npartitions) or implict (defaulted) number of partitions.) – bag/RDD containing data packaged with one item per datum this reader is asked to read. See above for details and examples of how this feature can be used. Default is None which turns off this option.

  • container_merge_function – function that defines what to do with components of the container_to_merge if it is defined. Default is a Metadata merge function, which is defined internally in this module. That function assumes container_to_merge is a bag/RDD of either Metadata or python dict containers that define key-value pairs to be posted to the output. (i.e. it act like the Metadata += operator.) A custom function can be used here. A custom function must have only two arguments with arg0 the target seismic datum (component the reader is creating) and arg1 defining attributes to use to edit the datum being created. Note the default only alters Metadata but that is not at all a restriction. The function must simply return a (normally modified) copy of the component it receives as arg0.

  • aws_access_key_id – A part of the credentials to authenticate the user

  • aws_secret_access_key – A part of the credentials to authenticate the user

Returns:

container defining the parallel dataset. A spark RDD if scheduler is “Spark” and a dask ‘bag’ if scheduler is “dask”

mspasspy.io.distributed.read_ensemble_parallel(query, db, collection='wf_TimeSeries', mode='promiscuous', normalize=None, load_history=False, exclude_keys=None, data_tag=None, sort_clause=None, aws_access_key_id=None, aws_secret_access_key=None)[source]

Special function used in read_distributed_data to handle ensembles.

Ensembles need to be read via a cursor which is not serializable. Here we query a Database class, which is serializable, and call it’s read_data method to construct an ensemble that it returns. Defined as a function instead of using a lambda due largely to the complexity of the argument list passed to read_data.

Arguments are all passed directly from values set within read_distributed_data. See that function for parameter descriptions.

mspasspy.io.distributed.read_to_dataframe(db, cursor, mode='promiscuous', normalize=None, load_history=False, exclude_keys=None, data_tag=None, alg_name='read_to_dataframe', alg_id='0', define_as_raw=False, retrieve_history_record=False)[source]

Read the documents defined by a MongoDB cursor into a panda DataFrame.

The data stucture called a panda DataFrame is heavily used by many python user’s. This is convenience function for users wanting to use that api to do pure metadata operations.

Be warned this function originated as a prototype where we experimented with using a dask or pyspark DataFrame as an intermediatry for parallel readers. We developed an alternative algorithm that made the baggage of the intermediary unnecessary. The warning is the function is not mainstream and may be prone to issues.

Parameters:
  • db (mspasspy.db.database.Database.) – the database from which the data are to be read.

  • object_id – MongoDB object id of the wf document to be constructed from data defined in the database. The object id is guaranteed unique and provides a unique link to a unique document or nothing. In the later case the function will return a None.

  • mode (str) – reading mode that controls how the function interacts with the schema definition for the data type. Must be one of [‘promiscuous’,’cautious’,’pedantic’]. See user’s manual for a detailed description of what the modes mean. Default is ‘promiscuous’ which turns off all schema checks and loads all attributes defined for each object read.

  • normalize (a list of str) – list of collections that are to used for data normalization. (see User’s manual and MongoDB documentation for details on this concept) Briefly normalization means common metadata like source and receiver geometry are defined in separate smaller collections that are linked through this mechanism during reads. Default uses no normalization.

  • load_history – boolean (True or False) switch used to enable or disable object level history mechanism. When set True each datum will be tagged with its origin id that defines the leaf nodes of a history G-tree. See the User’s manual for additional details of this feature. Default is False.

  • exclude_keys (a list of str) – Sometimes it is helpful to remove one or more attributes stored in the database from the data’s Metadata (header) so they will not cause problems in downstream processing.

  • collection – Specify an alternate collection name to use for reading the data. The default sets the collection name based on the data type and automatically loads the correct schema. The collection listed must be defined in the schema and satisfy the expectations of the reader. This is an advanced option that is indended only to simplify extensions to the reader.

  • data_tag (str) – The definition of a dataset can become ambiguous when partially processed data are saved within a workflow. A common example would be windowing long time blocks of data to shorter time windows around a particular seismic phase and saving the windowed data. The windowed data can be difficult to distinguish from the original with standard queries. For this reason we make extensive use of “tags” for save and read operations to improve the efficiency and simplify read operations. Default turns this off by setting the tag null (None).

  • alg_name (str) – alg_name is the name the func we are gonna save while preserving the history.

  • alg_id (bson.objectid.ObjectId) – alg_id is a unique id to record the usage of func while preserving the history.

  • define_as_raw (bool) – a boolean control whether we would like to set_as_origin when loading processing history

  • retrieve_history_record (bool) – a boolean control whether we would like to load processing history

mspasspy.io.distributed.write_distributed_data(data, db, data_are_atomic=True, mode='promiscuous', storage_mode='gridfs', scheduler=None, file_format=None, overwrite=False, exclude_keys=None, collection='wf_TimeSeries', data_tag=None, post_elog=False, save_history=False, post_history=False, cremate=False, normalizing_collections=['channel', 'site', 'source'], alg_name='write_distributed_data', alg_id='0') list[source]

Parallel save function for termination of a processing script.

Saving data from a parallel container (i.e. bag/rdd) is a different problem from a serial writer. Bottlenecks are likely with communication delays talking to the MonboDB server and complexities of file based io. Further, there are efficiency issues in the need to reduce transactions that cause delays with the MongoDB server. This function tries to address these issues with two approaches:

  1. Since v2 it uses single function that handles writing the sample data for a datum. For atomic data that means a single array but for ensembles it is the combination of all arrays in the ensemble “member” containers. The writing functions are passed through a map operator so writing is a parallelized per worker. Note the function also abstracts how the data are written with different things done depending on the “storage_mode” attribute that can optionally be defined for each atomic object.

  2. After the sample data are saved we use a MongoDB “update_many” operator with the “many” defined by the partition size. That reduces database transaction delays by 1/object_per_partition. For ensembles the partitioning is natural with bulk writes controlled by the number of ensemble members.

The function also handles data marked dead in a standardized way though the use of the mspasspy.util.Undertaker now defined within the Database class handle. The default will call the bury method on all dead data which leaves a document containing the datum’s Metadata and and error log messages in a collection called “cemetery”. If the cremate argument is set True dead data will be vaporized and no trace of them will appear in output.

To further reduce database traffic the function has two (boolean) options called post_elog and post_history. When set True the elog and/or object-level history data will be posted as subdocuments in the output collection instead of the normal (at least as defined by the Database handle) way these data are saved (In Database the error log is saved to the “elog” collection and the history data is saved to “history”.) Note post_history is ignored unless the related save_history argument is changed from the default False ot True.

A peculiarity that is a consequence of python’s “duck typing” is that the user must give the writer some hints about the type of data objects it is expected to handle. Rather than specify a type argument, the type is inferred from two arguments that are necessary anyway: (1) the collection argument value, and (2) the boolean data_are_atomic. The idea is that collection is used to determine of the writer is handling TimeSeries or Seismogram objects (“wf_TimeSeries” or “wf_Seismogram” values respectively) and the boolean is used, as the name implies, to infer if the data are atomic or ensembles.

This function should only be used as the terminal step of a parallel workflow (i.e. a chain of map/reduce operators). This function will ALWAYS initiate a lazy computation on such a chain of operators because it calls the “compute” method for dask and the “collect” method of spark before returning. It then always returns a list of ObjectIds of live, saved data. The function is dogmatic about that because the return can never be a bag/RDD of the the data.

If your workflow requires an intermediate save (i.e. saving data in an intermediate step within a chain of map/reduce opeators) the best approach at present is to use the save_data method of the Database class in a variant of the following (dask) example that also illustrates how this function is used as the terminator of a chain of map-reduce operators.

```

mybag = read_distributed_data(db,collection=’wf_TimeSeries’) mybag = mybag.map(detrend) # example # intermediate save mybag = mybag.map(db.save_data,collection=”wf_TimeSeries”) # more processing - trivial example mybag = mybag.map(filter,’lowpass’,freq=1.0) # termination with this function wfids = write_distributed_data(mybag,db,collection=’wf_TimeSeries’)

```

The storage_mode argument is a constant that defines how the SAMPLE DATA are to be stored. Currently this can be “file” or “gridfs”, but be aware future evolution may extend the options. “gridfs” is the default as the only complexity it has is a speed throttle by requiring the sample data to move through MongoDB and the potential to overflow the file system where the database is stored. (See User’s Manual for more on this topic.). Most users, however, likely want to use the “file” option for that parameter. There are, however, some caveats in that use that users MUST be aware of before using that option with large data sets. Since V2 of MsPASS the file save process was made more robust by allowing a chain of options for how the actual file name where data is stored is set. The algorithm used here is a private method in mspasspy.db.Database called _save_sample_data_to_file. When used here that that function is passed a None type for dir and dfile. The EXPECTED use is that you as a user should set the dir and dfile attribute for EVERY datum in the bag/RDD this function is asked to handle. That allows each atomic datum to define what the file destination is. For ensembles normal behavior is to require the entire ensemble content to be saved in one file defined by the dir and dfile values in the ensemble’s Metadata container. THE WARNING is that to be robust the file writer will alway default a value for dir and dfile. The default dir is the run directory. The default dfile (if not set) is a unique name created by a uuid generator. Care must be taken in file writes to make sure you don’t create huge numbers of files that overflow directories or similar file system errors that are all to easy to do with large data set saves. See the User Manual for examples of how to set output file names for a large data set.

Parameters:
  • data (dask.bag.Bag or pyspark.RDD.) – parallel container of data to be written

  • db (mspasspy.db.database.Database.) – database handle to manage data saved by this function.

  • mode (str) – This parameter defines how attributes defined with key-value pairs in MongoDB documents are to be handled for writes. By “to be handled” we mean how strongly to enforce name and type specification in the schema for the type of object being constructed. Options are [‘promiscuous’,’cautious’,’pedantic’] with ‘promiscuous’ being the default. See the User’s manual for more details on the concepts and how to use this option.

  • storage_mode (str) – Must be either “gridfs” or “file. When set to “gridfs” the waveform data are stored internally and managed by MongoDB. If set to “file” the data will be stored in a file system. File names are derived from attributes with the tags “dir” and “dfile” in the standard way. Any datum for which dir or dfile aren’t defined will default to the behaviour of the Database class method save_data. See the docstring for details but the concept is it will always be bombproof even if not ideal.

  • scheduler (string Must be either "dask" or "spark". Default is None which is is equivalent to the value of "dask".) – name of parallel scheduler being used by this writer. MsPASS currently support pyspark and dask. If arg0 is an RDD scheduler must be “spark” and arg0 defines dask bag schduler must be “dask”. The function will raise a ValueError exception of scheduler and the type of arg0 are not consistent or if the value of scheduler is illegal. Note with spark the context is not required because of how this algorithm is structured.

  • file_format (str) – the format of the file. This can be one of the supported formats of ObsPy writer. The default the python None which the method assumes means to store the data in its raw binary form. The default should normally be used for efficiency. Alternate formats are primarily a simple export mechanism. See the User’s manual for more details on data export. Used only for “file” storage mode.

  • overwrite (boolean) – If true gridfs data linked to the original waveform will be replaced by the sample data from this save. Default is false, and should be the normal use. This option should never be used after a reduce operator as the parents are not tracked and the space advantage is likely minimal for the confusion it would cause. This is most useful for light, stable preprocessing with a set of map operators to regularize a data set before more extensive processing. It can only be used when storage_mode is set to gridfs.

  • collectiion – name of wf collection where the documents derived from the data are to be saved. Standard values are “wf_TimeSeries” and “wf_Seismogram” for which a schema is defined in MsPASS. Normal use should specify one or the other. The default is “wf_TimeSeries” but normal usage should specify this argument explicitly for clarity in reuse.

  • exclude_keys (a list of str) – Metadata can often become contaminated with attributes that are no longer needed or a mismatch with the data. A type example is the bundle algorithm takes three TimeSeries objects and produces a single Seismogram from them. That process can, and usually does, leave things like seed channel names and orientation attributes (hang and vang) from one of the components as extraneous baggage. Use this of keys to prevent such attributes from being written to the output documents. Not if the data being saved lack these keys nothing happens so it is safer, albeit slower, to have the list be as large as necessary to eliminate any potential debris.

  • data_tag (str) – a user specified “data_tag” key. See above and User’s manual for guidance on how the use of this option.

  • post_elog

    boolean controlling how error log messages are

    handled. When False (default) error log messages get posted in single transactions with MongoDB to the “elog” collection. When set True error log entries will be posted to as subdocuments to the wf collection entry for each datum. Setting post_elog True is most useful if you anticipate a run will generate a large number of error that could throttle processing with a large number of one-at-a-time document saves. For normal use with small number of errors it is easier to review error issue by inspecting the elog collection than having to query the larger wf collection.

    param save_history:

    When set True (default is False) write will save any object-level history data saved within the input data objects. The related boolean (described below) called post_history controls how such data is saved if this option is enable. Note post_history is ignored unless save_history is True.

    param post_history:

    boolean similar to post_elog for handling object-level history data. It is, however, only handled if the related boolean “save_history” is set True. When post_history is set True the history data will be saved as a subdocument in the wf document saved for each live, atomic datum (note for ensembles that means all live members). When False each atomic datum will generate a insert_one transaction with MongoDB and save the history data in the “history” collection. It then sets the attribute with key “history_id” to the ObjectId of the saved document. The default for this argument is True to avoid accidentally throttling workflows on large data sets. The default for save_history is False so overall default behavior is to drop any history data.

    param cremate:

    boolean controlling handling of dead data. When True dead data will be passed to the cremate method of mspasspy.util.Undertaker which leaves only ashes to nothing in the return. When False (default) the bury method will be called instead which saves a skeleton (error log and Metadata content) of the results in the “cemetery” collection.

    param normalizing_collections:

    list of collection names dogmatically treated as normalizing collection names. The keywords in the list are used to always (i.e. for all modes) erase any attribute with a key name of the form collection_attribute where `collection is one of the collection names in this list and attribute is any string. Attribute names with the “_” separator are saved unless the collection field matches one one of the strings (e.g. “channel_vang” will be erased before saving to the wf collection while “foo_bar” will not be erased.) This list should ONLY be changed if a different schema than the default mspass schema is used and different names are used for normalizing collections. (e.g. if one added a “shot” collection to the schema the list would need to be changed to at least add “shot”.)

    type normalizing_collection:

    list if strings defining collection names.

    param alg_name:

    do not change

    param alg_id:

    algorithm id for object-level history. Normally assigned by global history manager.