Source code for mspasspy.io.distributed

import os
import pandas as pd
import json


from mspasspy.db.database import (
    Database,
    md2doc,
    doc2md,
    elog2doc,
    history2doc,
)
from mspasspy.db.normalize import BasicMatcher

# name collision here requires this alias
from mspasspy.db.normalize import normalize as normalize_function
from mspasspy.util.Undertaker import Undertaker
from mspasspy.db.client import DBClient


from mspasspy.ccore.utility import (
    ErrorLogger,
)

try:
    import dask

    _mspasspy_has_dask = True
except ImportError:
    _mspasspy_has_dask = False
try:
    import pyspark

    _mspasspy_has_pyspark = True
except ImportError:
    _mspasspy_has_pyspark = False

if not _mspasspy_has_dask and not _mspasspy_has_pyspark:
    message = "{} requires either dask or pyspark module. Please install dask or pyspark".format(
        __name__
    )
    raise ModuleNotFoundError(message)


[docs]def read_ensemble_parallel( query, db, collection="wf_TimeSeries", mode="promiscuous", normalize=None, load_history=False, exclude_keys=None, data_tag=None, sort_clause=None, aws_access_key_id=None, aws_secret_access_key=None, ): """ Special function used in read_distributed_data to handle ensembles. Ensembles need to be read via a cursor which is not serializable. Here we query a Database class, which is serializable, and call it's read_data method to construct an ensemble that it returns. Defined as a function instead of using a lambda due largely to the complexity of the argument list passed to read_data. Arguments are all passed directly from values set within read_distributed_data. See that function for parameter descriptions. """ if sort_clause: cursor = db[collection].find(query).sort(sort_clause) else: cursor = db[collection].find(query) ensemble = db.read_data( cursor, collection=collection, mode=mode, normalize=normalize, load_history=load_history, exclude_keys=exclude_keys, data_tag=data_tag, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) kill_me = True for d in ensemble.member: if d.live: kill_me = False break if kill_me: ensemble.kill() return ensemble
[docs]def post2metadata(mspass_object, doc): """ Posts content of key-value pair container doc to Metadata container of mspass_object. Operation only requires that doc be iterable over keys and both doc and mspass_object act like associative arrays. In MsPASS that means doc can be either a dict or a Metadata container. Note the contents of doc[k] will overwrite any existing content defined by mspass_object[k]. It returns the edited copy of mspass_object to mesh with map operator usage in read_distributed_data. Note that Metadata "is a" fundamental component of all atomic data. Ensemble objects contain a special instance of Metadata we normally refer to as "ensemble metadata". When handling ensembles the content of doc are pushed to the ensemble metadata and member attributes are not altered by this function. This function is the default for the read_distributed_data argument container_merge_function which provides the means to push a list of Metadata or dict containers held in a conguent bag/rdd with a set of seismic data objects. Because of that usage it has no safeties for efficiency. """ for k in doc: mspass_object[k] = doc[k] return mspass_object
[docs]def read_distributed_data( data, db=None, query=None, scratchfile=None, collection="wf_TimeSeries", mode="promiscuous", normalize=None, normalize_ensemble=None, load_history=False, exclude_keys=None, scheduler="dask", npartitions=None, spark_context=None, data_tag=None, sort_clause=None, container_merge_function=post2metadata, container_to_merge=None, aws_access_key_id=None, aws_secret_access_key=None, ): """ Parallel data reader for seismic data objects. In MsPASS seismic data objects need to be loaded into a Spark RDD or Dask bag for parallel processing. This function abstracts that process parallelizing the read operation where it can do so. In MsPASS all data objects are created by constructors driven by one or more documents stored in a MongoDB database collection we will refer to here as a "waveform collection". Atomic data are built from single documents that may or may not be extended by "normalization" (option defined by normalize parameter). The constructors can get the sample data associated with a waveform document by a variety of storage methods that are abstracted to be "under the hood". That is relevant to understanding this function because an absolutely required input is a handle to the waveform db collection OR an image of it in another format. Furthermore, the other absolute is that the output is ALWAYS a parallel container; a spark RDD or a Dask bag. This reader is the standard way to load a dataset into one of these parallel containers. A complexity arises because of two properties of any database including MongoDB. First, queries to any database are almost always very slow compared to processor speed. Initiating a workflow with a series of queries is possible, but we have found it ill advised as it can represent a throttle on speed for many workflows where the time for the query is large compared to the processing time for a single object. Second, all database engines, in contrast, can work linearly through a table (collection in MongoDB) very fast because they cache blocks of records send from the server to the client. In MongoDB that means a "cursor" returned by a "find" operation can be iterated very fast compared to one-by-one queries of the same data. Why all that is relevant is that arg0 if this function is required to be one of three things to work well within these constraints: 1. An instance of a mspass `Database` handle (:class:`mspasspy.db.database.Database`). Default with this input is to read an entire collection. Use the query parameter to limit input. This mode also implicitly implies the result will be a bag/RDD of atomic data. Note that for efficiency when run in this mode the first step the algorithm does is load the entire set of documents defined by query (or the default of all documents) into a pandas DataFrame. If that approach causes memory issues use the `scratchfile` option to buffer the large table into a scratch file and then construct a dask or spark DataFrame that are designed as containers that can overflow memory. 2. One of several implementations of a "Dataframe", that are an image or a substitute for a MongoDB waveform collection. A Dataframe, for example, is a natural output from any sql (or, for seismologists, an Antelope) database. This reader supports input through a pandas Dataframe, a Dask Dataframe, or a pyspark Dataframe. THE KEY POINT about dataframe input, however, is that the attribute names must match schema constraints on the MongoDB database that is required as an auxiliary input when reading directly from a dataframe. Note also that Dataframe input also only makes sense for Atomic data with each tuple mapping to one atomic data object to be created by the reader. 3. Ensembles represent a fundamentally different problem in this context. An ensemble, by definition, is a group of atomic data with an optional set of common Metadata. The data model for this function for ensembles is it needs to return a RDD/bag containing a dataset organized into ensembles. The approach used here is that a third type of input for arg0 (data) is a list of python dict containers that are ASSUMED to be a set of queries that defined the grouping of the ensembles. For example, a data set you want to process as a set of common source gathers (ensmebles) might be created using a list something like this: [{"source_id" : ObjectId('64b917ce9aa746564e8ecbfd')}, {"source_id" : ObjectId('64b917d69aa746564e8ecbfe')}, ... ] This function is more-or-less three algorithms that are run for each of the three cases above. In the same order as above they are: 1. With a Database input the function first iterates through the entire set of records defined for specified collection (passed via collection argument) constrained by the (optional) query argument. Note that processing is run serial but fast because it working through a cursor is optimized in MongoDB and the proceessing is little more than reformatting the data. 2. The dataframe is passed through a map operator (dask or spark depending on the setting of the scheduler argument) that constructs the output bag/RDD in parallel. The atomic operation is calls to db.read_data, but the output is a bag/RDD of atomic mspass data objects. That means reads will be parallel with one reader per worker. 3. Ensembles are read in parallel with granularity defined by a partitioning of the list of queries set by the npartitions parameter. Parallelism is achieved by calling the function internal to this function called `read_ensemble_parallel` in a map operator. That function queries the database using the query derived form arg0 of this function and uses the return to call the `Database.read_data` method. Reading all types in a parallel context have a more subtle complexity that arises in a number of situations. That is, many algorithms are driven by external lists of attributes with one item in the list for each datum to be constructed and posted to the parallel container output by this function. An example is a list of arrival times used to create a dataset of waveform segments windowed relative to the list of arrival times from continuous data or a dataset built from longer time windows (e.g. extracting P wave windows from hour long segments created for teleseismic data.). That requires a special type of matching that is very inefficient (i.e. slow) to implement with MongoDB (It requires a query for every datum.) One-to-one matching attributes can handled by this function with proper use of the two attributes `container_to_merge` and `container_merge_function`. `container_to_merge` must be either a dask bag or pyspark RDD depending on the setting of the argument `spark_context` (i.e. if `spark_context` is defined the function requires the container be an RDD while a dask bag is the default.) The related argument, `container_merge_function`, is an optional function to use to merge the content of the components of `container_to_merge`. The default assumes `container_to_merge` is a bag/RDD of `Metadata` or python dict containers that are to be copied (overwriting) the Metadata container of each result bag/RDD component. Note that for atomic data that means the Metadata container for each `TimeSeries` or `Seismogram` constructed by the reader while for ensembles the attributes are posted to the ensemble's `Metadata` container. A user supplied function to replace the default must have two arguments where arg0 is the seismic data to receive the edits defined by `container_to_merge` while arg1 should contain the comparable component of `container_to_merge`. Note this capability is not at all limited to `Metadata`. This function can contain anything that provides input for applying an algorithm that alters the datum given a set of parameters passed through the `container_to_merge`. Normalization with normalizing collections like source, site, and channel are possible through the normalize argument. Normalizers using data cached to memory can be used but are likely better handled after a bag/rdd is created with this function via one or more map calls following this reader. Database-driven normalizers are (likely) best done through this function to reduce unnecessary serialization of the database handle essential to this function (i.e. the handle is already in the workspace of this function). Avoid the form of normalize used prior to version 2.0 that allowed the use of a list of collection names. It was retained for backward compatibility but is slow. A final complexity users need to be aware of is how this reader handles any errors that happen during construction of all the objects in the output bag/rdd. All errors that create invalid data objects produce what we call "abortions" in the User Manual and docstring for the :class:`mspasspy.util.Undertaker.Undertaker` class. Invalid, atomic data will be the same type as the other bag/rdd components but will have the following properties that can be used to distinguish them: 1. They will have the Metadata field "is_live" set False. 2. The data object itself will have the interal attribute "live" set False. 3. The Metadata field with key "is_abortion" will be defined and set True. 4. The sample array will be zero length (datum.npts==0) Ensembles are still ensembles but they may contain dead data with the properties of atomic data noted above EXCEPT that the "is_live". attribute will not be set - that is used only inside this function. An ensemble return will be marked dead only if all its members are found to be marked dead. :param data: variable type arguement used to drive construction as described above. See above for how this argument drives the functions behavor. Note when set as a Database handle the cursor argument must be set. Otherwise it is ignored. :type data: :class:`mspasspy.db.database.Database` or :class:`pandas.DataFrame` or :class:`dask.dataframe.core.DataFrame` or :class:`pyspark.sql.dataframe.DataFrame` for atomic data. List of python dicts defining queries to read a dataset of ensembles. :param db: Database handle for loading data. Required input if reading from a dataframe or with ensemble reading via list of queries. Ignored if the "data" parameter is a Database handle. :type db: :class:`mspasspy.db.Database`. Can be None (default) only if the data parameter contains the database handle. Other uses require this argument to be set. :param query: optional query to apply to input collection when using a :class:`mspasspy.db.Database` as input. Ignored for dataframe or a list input. Default is None which means no query is used. :type query: python dict defining a valid MongoDB query. :param scratchfile: This argument is referenced only when input is drive by a :class:`mspasspy.db.Database` handle. For very large datasets loading the entire set of documents that define the dataset into memory can be an issue on a system with smaller "RAM" memory available. This optional argument makes this function scalable to the largest conceivable seismic data sets. When defined the documents retreived from the database are reformatted and pushed to a scratch file with the name defined by this argument. The contents of the file are then reloaded into a dask or spark DataFrame that allow the same data to be handled within a more limited memory footprint. Note use of this feature is rare and should never be necessary in an HPC or cloud cluster. The default us None which means this that database input is loaded directly into memory to initiate construction of the parallel container output. :type scratchfile: str :param collection: waveform collection name for reading. Default is "wf_TimeSeries". :type collection: string :param mode: reading mode that controls how the function interacts with the schema definition for the data type. Must be one of ['promiscuous','cautious','pedantic']. See user's manual for a detailed description of what the modes mean. Default is 'promiscuous' which turns off all schema checks and loads all attributes defined for each object read. :type mode: :class:`str` :param normalize: List of normalizers. This parameter is passed directly to the `Database.read_data` method internally. See the docstring for that method for how this parameter is handled. For atomic data each component is used with the `normalize` function to apply one or more normalization operations to each datum. For ensembles, the same operation is done in a loop over all ensembles members (i.e. the member objects are atomic and normalized in a loop.). Use `normalize_ensemble` to set values in the ensemble Metadata container. :type normalize: must be a python list of subclasses of the abstract class :class:`mspasspy.db.normalize.BasicMatcher` that can be used as the normalization operator in the `normalize` function. param normalize_ensemble: This parameter should be used to apply normalization to ensemble Metadata (attributes common to the entire ensemble.) It will be ignored if reading atomic data. Otherwise it behaves like normalize and is assumed to a list of subclasses of :class:`BasicMatcher` objects. If using this option you must also specify a valid value for the `container_to_merge` argument. The reason is that currently the only efficient way to post any Metadata components to an ensemble's Metadata container is via the algorithm used by if the `container_to_merge` option is used. This feature was designed with ids in mind where the ids would link to a collection that are contain defining properties for what the ensemble is. For example, if the ensemble is a "common-source gather" the `container_to_merge` could be a bag/RDD of ObjectIds defining the `source_id` attribute. Then the normalize_ensemble list could contain an instance of :class:`mspasspy.db.normalize.ObjectIdMatcher` created to match and load source data. :type normalize_ensemble: a :class:`list` of :class:`BasicMatcher`. :class:`BasicMatchers` are applied sequentialy with the `normalize` function with the list of attributes loaded defined by the instance. :param load_history: boolean (True or False) switch used to enable or disable object level history mechanism. When set True each datum will be tagged with its origin id that defines the leaf nodes of a history G-tree. See the User's manual for additional details of this feature. Default is False. :param exclude_keys: Sometimes it is helpful to remove one or more attributes stored in the database from the data's Metadata (header) so they will not cause problems in downstream processing. :type exclude_keys: a :class:`list` of :class:`str` :param scheduler: Set the format of the parallel container to define the dataset. Must be either "spark" or "dask" or the job will abort immediately with a ValueError exception :type scheduler: :class:`str` :param spark_context: If using spark this argument is required. Spark defines the concept of a "context" that is a global control object that manages schduling. See online Spark documentation for details on this concept. :type spark_context: :class:`pyspark.SparkContext` :param npartitions: The number of desired partitions for Dask or the number of slices for Spark. By default Dask will use 100 and Spark will determine it automatically based on the cluster. If using this parameter and a container_to_merge make sure the number used here matches the partitioning of container_to_merge. If specified and container_to_merge is defined this function will test them for consistency and throw a ValueError exception if they don't match. If not set (i.e. left default of None) that test is not done and the function assumes container_to_merge also uses default partitioning. :type npartitions: :class:`int` :param data_tag: The definition of a dataset can become ambiguous when partially processed data are saved within a workflow. A common example would be windowing long time blocks of data to shorter time windows around a particular seismic phase and saving the windowed data. The windowed data can be difficult to distinguish from the original with standard queries. For this reason we make extensive use of "tags" for save and read operations to improve the efficiency and simplify read operations. Default turns this off by setting the tag null (None). :type data_tag: :class:`str` :param sort_clause: When reading ensembles it is sometimes helpful to apply a sort clause to each database query. The type example is reading continuous data where there it is necessary to sort the data into channels in time order. If defined this should be a clause that can be inserted in the MongoDB sort method commonly applied in a line like this: `cursor=db.wf_miniseed.find(query).sort(sort_clause)`. This argument is tested for existence only when reading ensembles (implied with list of dict input). :type sort_clause: if None (default) no sorting is invoked when reading ensembles. Other wise should be a python list of tuples defining a sort order. e.g. [("sta",pymongo.ASCENDING),()"time",pymongo.ASCENDING)] :param container_to_merge: bag/RDD containing data packaged with one item per datum this reader is asked to read. See above for details and examples of how this feature can be used. Default is None which turns off this option. :type container_to_merge: dask bag or pyspark RDD. The number of partitions in the input must match the explicit (i.e. set with `npartitions`) or implict (defaulted) number of partitions. :param container_merge_function: function that defines what to do with components of the `container_to_merge` if it is defined. Default is a Metadata merge function, which is defined internally in this module. That function assumes `container_to_merge` is a bag/RDD of either `Metadata` or python dict containers that define key-value pairs to be posted to the output. (i.e. it act like the Metadata += operator.) A custom function can be used here. A custom function must have only two arguments with arg0 the target seismic datum (component the reader is creating) and arg1 defining attributes to use to edit the datum being created. Note the default only alters Metadata but that is not at all a restriction. The function must simply return a (normally modified) copy of the component it receives as arg0. :param aws_access_key_id: A part of the credentials to authenticate the user :param aws_secret_access_key: A part of the credentials to authenticate the user :return: container defining the parallel dataset. A spark `RDD` if scheduler is "Spark" and a dask 'bag' if scheduler is "dask" """ # This is a base error message that is an initialization for # any throw error. We first two type checking of arg0 message = "read_distributed_data: " if not scheduler in ["dask", "spark"]: message += "Unsupported value for scheduler={}\n".format(scheduler) message += "Must be either 'dask' or 'spark'" raise ValueError(message) if scheduler == "spark" and not _mspasspy_has_pyspark: print( "WARNING(read_distributed_data): pyspark not found, will use dask instead. The scheduler argument is ignored." ) scheduler = "dask" if isinstance(data, list): ensemble_mode = True i = 0 for x in data: if not isinstance(x, dict): message += ( "arg0 is a list, but component {} has illegal type={}\n".format( i, str(type(x)) ) ) message += "list must contain only python dict defining queries that define each ensemble to be loaded" raise TypeError(message) if sort_clause: # TODO - conflicting examples of the type of this clause # may have a hidden bug in TimeIntervalReader as it has usage # differnt from this restricteion if not isinstance(sort_clause, [list, str]): message += "sort_clause argument is invalid\n" message += "Must be either a list or a single string" raise TypeError(message) if isinstance(sort_clause, list): for x in sort_clause: if not isinstance(x, dict): message += "sort_clause value = " + str(sort_clause) message += " is invalid input for MongoDB" raise TypeError(message) elif isinstance(data, Database): ensemble_mode = False dataframe_input = False db = data elif ( isinstance(data, pd.DataFrame) or (_mspasspy_has_dask and isinstance(data, dask.dataframe.core.DataFrame)) or (_mspasspy_has_pyspark and isinstance(data, pyspark.sql.dataframe.DataFrame)) ): ensemble_mode = False dataframe_input = True if isinstance(db, Database): db = db else: if db: message += "Illegal type={} for db argument - required with dataframe input".format( str(type(db)) ) raise TypeError(message) else: message += "Usage error. An instance of Database class is required for db argument when input is a dataframe" raise TypeError(message) else: message += "Illegal type={} for arg0\n".format(str(type(data))) message += "Must be a Dataframe (pandas, spark, or dask), Database, or a list of query dictionaries" raise TypeError(message) if normalize: if isinstance(normalize, list): i = 0 for nrm in normalize: if not isinstance(nrm, BasicMatcher): message += ( "Illegal type={} for component {} of normalize list\n".format( type(nrm), i ) ) message += "Must be subclass of BasicMatcher to allow use in normalize function" raise TypeError(message) else: message += "Illegal type for normalize argument = {}\n".format( type(normalize) ) message += "Must be a python list of implementations of BasicMatcher" raise TypeError(message) if container_to_merge: if scheduler == "spark": if not isinstance(container_to_merge, pyspark.RDD): message += ( "container_to_merge must define a pyspark RDD with scheduler==spark" ) raise TypeError(message) container_partitions = container_to_merge.getNumPartitions() else: if not isinstance(container_to_merge, dask.bag.core.Bag): message += ( "container_to_merge must define a dask bag with scheduler==dask" ) raise TypeError(message) container_partitions = container_to_merge.npartitions if npartitions: # This error handler only works if npartitions is set in the # arg list. Can't test the data container here as it doesn't # exist yet and putting it inside the logic below would be awkward # and could slow execution if container_partitions != npartitions: message += "container_to_merge number of partitions={}\n".format( container_partitions ) message += ( "must match value of npartitions passed to function={}".format( npartitions ) ) raise ValueError(message) if normalize_ensemble: if container_merge_function is None: message += "normalize_ensemble option requires specifying a bag/RDD passed via container_to_merge argument\n" message += "Received a (default) None value for container_to_merge argument" raise ValueError(message) if isinstance(normalize_ensemble, list): i = 0 for nrm in normalize_ensemble: if not isinstance(nrm, BasicMatcher): message += "Illegal type={} for component {} of normalize_ensemble list\n".format( type(nrm), i ) message += "Must be subclass of BasicMatcher to allow use in normalize function" raise TypeError(message) else: message += "Illegal type for normalize_ensemble argument = {}\n".format( type(normalize) ) message += "Must be a python list of implementations of BasicMatcher" raise TypeError(message) # This has a fundamentally different algorithm for handling # ensembles than atomic data. Ensembles are driven by a list of # queries while atomic data are driven by a dataframe. # the dataframe is necessary in this context because MongoDB # cursors can not be serialized while Database can. if ensemble_mode: if normalize_ensemble: if isinstance(normalize_ensemble, list): i = 0 for nrm in normalize_ensemble: if not isinstance(nrm, BasicMatcher): message += "Illegal type={} for component {} of normalize list\n".format( type(nrm), i ) message += "Must be subclass of BasicMatcher to allow use in normalize function" raise TypeError(message) else: message += "Illegal type for normallze_ensemble argument = {}\n".format( type(normalize_ensemble) ) message += "Must be a python list of implementations of BasicMatcher" raise TypeError(message) if scheduler == "spark": # note this works only because parallelize treats a None as default # and we use None as our default too - could break with version change plist = spark_context.parallelize(data, numSlices=npartitions) plist = plist.map( lambda q: read_ensemble_parallel( q, db, collection=collection, mode=mode, normalize=normalize, load_history=load_history, exclude_keys=exclude_keys, data_tag=data_tag, sort_clause=sort_clause, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) ) else: # note same maintenance issue as with parallelize above plist = dask.bag.from_sequence(data, npartitions=npartitions) plist = plist.map( read_ensemble_parallel, db, collection=collection, mode=mode, normalize=normalize, load_history=load_history, exclude_keys=exclude_keys, data_tag=data_tag, sort_clause=sort_clause, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) else: # Logic here gets a bit complex to handle the multiple inputs # possible for atomic data. That is, multiple dataframe # implementations and a direct database reading mechanism. # The database instance is assumed to converted to a bag/rdd of docs # above. We use a set of internal variables to control the # input block used. # TODO: clean up symbol - plist is an awful name if dataframe_input: if scheduler == "spark": plist = spark_context.parallelize( data.to_dict("records"), numSlices=npartitions ) else: # Seems we ahve to convert a pandas df to a dask # df to have access to the "to_bag" method of dask # DataFrame. It may be better to write a small # converter run with a map operator row by row if isinstance(data, pd.DataFrame): data = dask.dataframe.from_pandas(data, npartitions=npartitions) # format arg s essential as default is tuple plist = data.to_bag(format="dict") else: # logic above should guarantee data is a Database # object that can be queried and used to generate the bag/rdd # needed to read data in parallel immediately after this block if query: fullquery = query else: fullquery = dict() if data_tag: fullquery["data_tag"] = data_tag cursor = data[collection].find(fullquery) if scratchfile: # here we write the documents all to a scratch file in # json and immediately read them back to create a bag or RDD with open(scratchfile, "w") as outfile: for doc in cursor: json.dump(doc, outfile) if scheduler == "spark": # the only way I could find to load json data in pyspark # is to use an intermediate dataframe. This should # still parallelize, but will probably be slower # if there is a more direct solution should be done here. plist = spark_context.read.json(scratchfile) # this is wrong above also don't see how to do partitions # this section is broken until I (glp) can get help # plist = plist.map(to_dict,"records") else: plist = dask.bag.read_text(scratchfile).map(json.loads) # Intentionally omit error handler here. Assume # system will throw an error if file open files or write files # that will be sufficient for user to understand the problem. os.remove(scratchfile) else: doclist = [] for doc in cursor: doclist.append(doc) if scheduler == "spark": plist = spark_context.parallelize(doclist, numSlices=npartitions) else: plist = dask.bag.from_sequence(doclist, npartitions=npartitions) del doclist # Earlier logic make list a bag/rdd of docs - above converts dataframe to same if scheduler == "spark": plist = plist.map( lambda doc: db.read_data( doc, collection=collection, normalize=normalize, load_history=load_history, exclude_keys=exclude_keys, data_tag=data_tag, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) ) else: plist = plist.map( db.read_data, collection=collection, normalize=normalize, load_history=load_history, exclude_keys=exclude_keys, data_tag=data_tag, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) # all cases create plist. Run the container_merge_function function # if requested if container_to_merge: if scheduler == "spark": plist = plist.zip(container_to_merge).map( lambda x: container_merge_function(x[0], x[1]) ) else: plist = dask.bag.map(container_merge_function, plist, container_to_merge) if normalize_ensemble: for nrm in normalize_ensemble: if scheduler == "spark": plist = plist.map(lambda d: normalize_function(d, nrm)) else: plist = plist.map(normalize_function, nrm) return plist
def _partitioned_save_wfdoc( partition_iterator, db, collection="wf_TimeSeries", dbname=None, ) -> list: """ Internal function used to save the core wf document data for atomic mspass data objects. This function is intended for internal use only as a component of the function write_distributed_data. It uses a bulk write method (pymongo's insert_many method) to reduce database transaction delays. This function has a strong dependency on a method for handling dead data within `write_distributed_data`. That is, each document it processes is ASSUMED (there are no error handlers since this is an internal function) to have a boolean value set with the key "llve". When True that document is pushed to MongoDB. When False the contents of the document are ignored. The return of this function is a list of MongoDB `ObjectId`s of live data inserted with a None in the list for any datum marked dead ("live":False). This causes the size of the return list to be the same as the input iterator length (normally partition size, but usually truncated at the last partition). dask and pyspark handle the partitioned list and concatenate the sequence of lists into a single long list. Counting None values in that list provides a way to cross-check the number of data killed in a workflow. The function has a feature not used at present, but was preserved here as an idea. At present there is a workaround to provide a mechanism to serialize a mspasspy.db.Database class. When the function is called with default parameters it assumes the db argument will serialize in dask/spark. If db is set to None each call to the function will construct a :class:`mspasspy.db.Database` object for call to the function. Because it is done by partition the cost scales by the number of partitions not the number of data items to use that algorithm. :param partition_iterator : This parameter is an iterator that is expected to contain a series of python dict containers defining the translation of data Metadata to the format needed by pymong (a python dict). It could be used as a regular function with a python list, but normal use with dask map_partition or pyspark's mapPartition send the function an iterable that can only be traversed ONCE. In that situation, the iterator value is a component of the bag/RDD that is being handled and the entire range received in a single call is defined by the partition size of the bag/RDD. :type partition_iterator: iterable container of python dict translated from seismic data Metadata. :param db: Should normally be a database handle that is to be used to save the documents stored in partition_iterator . Set to None if you want to have the function use the feature of creating a new handle for each instance to avoid serialization. :type db: :class:`mspasspy.db.Database` or None. Type is not tested for efficiency so the function would likely abort with ambiguous messages when used in a parallel workflow. :param collection: wf collection name. Default "wf_TimeSeries". :param dbname: database name to save data into. This parameter is referenced ONLY if db is set to None. :type dbname: string :return: a python list of ObjectIds of the inserted documents """ # This makes the function more bombproof in the event a database # handle can't be serialized - db should normally be defined if db is None: dbclient = DBClient() # needs to throw an exception of both db and dbname are none db = dbclient.get_database(dbname) dbcol = db[collection] # test for the existence of any dead data. Handle that case specially # Very important to realize the algorithm is complicated by the fact that # partition_iterator can normally only be traversed once. Hence # we copy documents to a new container (cleaned doclit) and define the pattern of # anty dead data with the boolean list lifelist has_bodies = False docarray = [] for doc in partition_iterator: # clear the wfid if it exists or mongo may overwrite if "_id" in doc: doc.pop("_id") docarray.append(doc) if not doc["live"]: has_bodies = True if has_bodies: lifelist = [] cleaned_doclist = [] for doc in docarray: if doc["live"]: cleaned_doclist.append(doc) lifelist.append(True) else: lifelist.append(False) if len(cleaned_doclist) > 0: wfids_inserted = dbcol.insert_many(cleaned_doclist).inserted_ids wfids = [] ii = 0 for i in range(len(lifelist)): if lifelist[i]: wfids.append(wfids_inserted[ii]) ii += 1 else: wfids.append(None) else: wfids = [] for i in range(len(docarray)): wfids.append(None) else: # this case is much simpler # note plural ids. insert_one uses "inserted_id". # proper english usage but potentially confusing - beware wfids = dbcol.insert_many(docarray).inserted_ids return wfids
[docs]class pyspark_mappartition_interface: """ Interface class required for pyspark mapPartition. This class is a workaround for a limitation of pyspark's api for mapPartition that is a legacy of its scala foundation. mapPartition only accepts a function name as arg0 and has no provision for any optional arguments to the function. An alternative solution found in web sources was "currying" and it might have been possible to do this with just a function wrapper with fixed kwarg values defined with in the `write_distributed_data` function. In this case, however, because `write_distributed_data` handles an entire bag/rdd as input a class can be created at the initialization stage of that function. Otherwise the overhead of a wrapper would likely be smaller. Perhaps the feature most useful to make this a class is the safety valve if db is a None. (see below) The method `partioned_save_wfdoc` is just an alias for the file scope function `_partitioned_save_wfdoc` with the parameters for that function defined by self content. """ def __init__( self, db, collection, dbname=None, ): """ Constructor - called in write_distributed_data. :param db: database handle to use for saving wf documents with by partition. If passed a None for this value the constructor will attempt to create a default connection to MongoDB and use the dbname argument to fetch the database to use. :collection: name of MongoDB collection where documents are to be written. """ if db is None: if dbname is None: message = "pyspark_mappartion_interface constructor: invalid parameter combination\n" message += "Both db (arg0) and dbname (arg2) values are None. One or the other must be defined" raise ValueError(message) dbclient = DBClient() self.db = dbclient.get_database(dbname) else: self.db = db self.collection = collection
[docs] def partitioned_save_wfdoc(self, iterator): """ Method used as a wrapper to pass on to pyspark's mapPartitions operator. iterator is assumed to be the iterator passed to the function per partition by mapPartitions. """ wfidlist = _partitioned_save_wfdoc( iterator, self.db, collection=self.collection ) return wfidlist
[docs]def post_error_log(d, doc, other_elog=None, elog_key="error_log") -> dict: """ Posts error log data as a subdocument to arg0 datum (symbol d). write_distributed_data has a "post_elog" boolean. By default elog entries for any atomic seismic datum will be posted one-at-a-time with insert_one calls to the "elog" collection. If a workflow expects a large number of error log entries that high database traffic can be a bottleneck. This function is used within `write_distributed_data` to avoid that by posting the same data to subdocuments attached to wf documents saved with live data. Note dead data are never handled by this mechanism. They always end up in either the abortions or cemetery collections. """ elog = ErrorLogger() if d.elog.size() > 0: elog = d.elog elif other_elog: if other_elog.size() > 0: elog += other_elog if elog.size() > 0: elogdoc = elog2doc(elog) doc[elog_key] = elogdoc return doc
def _save_ensemble_wfdocs( ensemble_data, db, save_schema, exclude_keys, mode, undertaker, normalizing_collections, cremate=False, post_elog=True, save_history=False, post_history=False, history_key="history_data", data_tag=None, ): """ Internal function that saves wf documents for all live members of ensemkble objects. Ensembles have a built in efficiency in database transactions not possible with atomic data. That is, bulk inserts defined by the number of live members in the ensemble are a natural way to save the documents extracted from ensemble member Metadata. Enembles have a complextity in handling data marked dead. First, if an ensemble itself is marked dead the function assumes earlier logic marked all the members dead. That assumption is true in this context as this is an internal function, but be cautious if this code is reused elsewhere. For normal use, the Undertaker is called in a way that separates the living and the dead (a application of the best python joke ever: bring_out_your_dead). The dead are buried by default. If cremate is set true dead members are vaporized leaving no trace in the database. The booleans post_elog and post_history can impact performance of this function. Default is the fastest mode where post_elog is set True and save_history is off (False). When post_elog is set False, any error log entries will will cause error log data to be saved to the "elog" collection one document at a time. Similarly, if save_history is set True and the history feature is enabled every datum will generate a call to save a document in the "history" collection. The idea is one would not do that unless the dataset is fairly small or other steps are a bigger throttle on the throughput. For large data sets with history, one should also set post_history True. Then the history data will be posted as a subdocument with the wf documents saved by this function for each live ensemble member. :param ensemble_data: ensemble object containing data to be saved. :type ensemble_data: assumed to be a TimeSeriesEsnemble or SeismgoramEnsembles. Because this is an internal function there are no safeties to test that assumption. :param db: Database handle for all database saves :param save_schema: schema object - passed directly to md2doc. See that function's docstring for description. :param exclude_keys: list of metadata keys to discard when translating metadata to python dict (document). Sent verbatim to md2doc. :param mode: one of "promiscuous", "cautious", or "pedantic" used to define handling of mismatches between schema definitions defined by the save_schema argument and Metadata. See Database docstring and User's manual for description of this common argument. :poram undertaker: Instance of :class:`mspasspy.util.Undertaker` to handle dead data (see above) :param normalizing_collections: see docstring for `write_distributed_data`. :param cremate: tells Undertaker how to handle dead data (see above) :param post_elog: see above :param save_history: see above :param post_history: see above :history_key: name to use for posting history data if post_history is set True. Ignored if False., :param data_tag: Most data saved with `write_distributed_data` should use a data tag string to define the state of processing of that data. Default is None, but normal use should set it as an appropriate string defining what this dataset is. D """ if cremate: ensemble_data = undertaker.cremate(ensemble_data) else: ensemble_data, bodies = undertaker.bring_out_your_dead(ensemble_data) undertaker.bury(bodies) del bodies # Need to handle empty ensembles. Undertaker removes dead bodies # for both bury and cremate from ensembles so we can end up # with an empty ensemble. Normal return will be an empty list # for this case if len(ensemble_data.member) == 0 or ensemble_data.dead(): wfids = [] else: doclist = [] # we don't have to test for dead data in the loop below above removes # them so we just use md2doc. We do, however, have to handle # md2doc failure signaled with aok False for d in ensemble_data.member: doc, aok, elog = md2doc( d, save_schema=save_schema, exclude_keys=exclude_keys, mode=mode, normalizing_collections=normalizing_collections, ) if aok: if data_tag: doc["data_tag"] = data_tag if "_id" in doc: doc.pop("_id") # Handle the error log if it is not empty # Either post it to the doc or push the entry to the database if d.elog.size() > 0 or elog.size() > 0: doc = post_error_log(d, doc, other_elog=elog) else: if elog.size() > 0: d.elog += elog elog_id = db._save_elog(d) doc["elog_id"] = elog_id if save_history: if post_history: # is_empty is part of ProcessingHistory if not d.is_empty(): doc = history2doc(d) doc[history_key] = doc else: history_id = db._save_history(d) doc["history_id"] = history_id if data_tag: doc["data_tag"] = data_tag doclist.append(doc) else: d.elog += elog d.kill() if cremate: d = undertaker.cremate(d) else: d = undertaker.bury(d) # weird trick to get waveform collection name - borrowed from # :class:`mspasspy.db.Database` save_data method code wf_collection_name = save_schema.collection("_id") # note plural ids. insert_one uses "inserted_id". # proper english usage but potentially confusing - beware if len(doclist) > 0: wfids = db[wf_collection_name].insert_many(doclist).inserted_ids else: wfids = [] return wfids def _atomic_extract_wf_document( d, db, save_schema, exclude_keys, mode, normalizing_collections, post_elog=True, elog_key="error_log", post_history=False, save_history=False, history_key="history_data", data_tag=None, undertaker=None, cremate=False, ): """ This is an internal function used in a map operator to extract the Metadata contents of atomic MsPASS seismic data objects returning an edited version of the contents as a python dictionary that write_distributed_data later passes to MongoDB to be saved as wf documents. This function does only half of the steps in the related function _save_ensemble_wfdocs. That is it only does the metadata extraction and handling of dead data. It leaves saving the wf documents to a the partitioned save function. If the datum received is marked dead it is handled by the instance of :class:`mspasspy.util.Undertaker` passed with the undertaker argument. If cremate is set True the returned contents will be minimal. All dead data will have the attribute "live" set to a boolean False. All live data, will have tha value set True. Error log and history data handling are as described in the docstring for `write_distributed_data` with which this function is intimately linked. Similarly all the argument descriptions can be found in that docstring. :return: python dict translation of Metadata container of input datum d. Note the return always has a boolean value associated with the key "live". That value is critical downstream from this function in the partition-based writer to assure the contents of dead data are not store in a wf collection. """ if undertaker: stedronsky = undertaker else: stedronsky = Undertaker(db) doc, aok, elog_md2doc = md2doc( d, save_schema=save_schema, exclude_keys=exclude_keys, mode=mode, normalizing_collections=normalizing_collections, ) # cremate or bury dead data. # both return an edited data object reduced to ashes or a skeleton # doc and elog contents are handled separately. When cremate is # true nothing will be saved in the database. Default will # bury the body leaving a cemetery record. if d.dead() or (not aok): # this posts any elog content to error of d so bury will # save it d.elog += elog_md2doc d.kill() if cremate: d = stedronsky.cremate(d) else: d = stedronsky.bury(d) # make sure this is set as it is used in logic below # we use this instead of d to handle case with aok False doc["live"] = False # d.kill() else: doc["live"] = True if post_elog: doc = post_error_log(d, doc, other_elog=elog_md2doc) else: if d.elog.size() > 0 or elog_md2doc.size() > 0: d.elog += elog_md2doc # does nothing if rhs is empty elog_id = db._save_elog(d) doc["elog_id"] = elog_id if save_history: if post_history: # is_empty is part of ProcessingHistory if not d.is_empty(): doc = history2doc(d) doc[history_key] = doc else: history_id = db._save_history(d) doc["history_id"] = history_id if data_tag: doc["data_tag"] = data_tag return doc
[docs]def write_distributed_data( data, db, data_are_atomic=True, mode="promiscuous", storage_mode="gridfs", scheduler=None, file_format=None, overwrite=False, exclude_keys=None, collection="wf_TimeSeries", data_tag=None, post_elog=False, save_history=False, post_history=False, cremate=False, normalizing_collections=["channel", "site", "source"], alg_name="write_distributed_data", alg_id="0", ) -> list: """ Parallel save function for termination of a processing script. Saving data from a parallel container (i.e. bag/rdd) is a different problem from a serial writer. Bottlenecks are likely with communication delays talking to the MonboDB server and complexities of file based io. Further, there are efficiency issues in the need to reduce transactions that cause delays with the MongoDB server. This function tries to address these issues with two approaches: 1. Since v2 it uses single function that handles writing the sample data for a datum. For atomic data that means a single array but for ensembles it is the combination of all arrays in the ensemble "member" containers. The writing functions are passed through a map operator so writing is a parallelized per worker. Note the function also abstracts how the data are written with different things done depending on the "storage_mode" attribute that can optionally be defined for each atomic object. 2. After the sample data are saved we use a MongoDB "update_many" operator with the "many" defined by the partition size. That reduces database transaction delays by 1/object_per_partition. For ensembles the partitioning is natural with bulk writes controlled by the number of ensemble members. The function also handles data marked dead in a standardized way though the use of the :class:`mspasspy.util.Undertaker` now defined within the Database class handle. The default will call the `bury` method on all dead data which leaves a document containing the datum's Metadata and and error log messages in a collection called "cemetery". If the `cremate` argument is set True dead data will be vaporized and no trace of them will appear in output. To further reduce database traffic the function has two (boolean) options called `post_elog` and `post_history`. When set True the elog and/or object-level history data will be posted as subdocuments in the output collection instead of the normal (at least as defined by the Database handle) way these data are saved (In Database the error log is saved to the "elog" collection and the history data is saved to "history".) Note post_history is ignored unless the related `save_history` argument is changed from the default False ot True. A peculiarity that is a consequence of python's "duck typing" is that the user must give the writer some hints about the type of data objects it is expected to handle. Rather than specify a type argument, the type is inferred from two arguments that are necessary anyway: (1) the `collection` argument value, and (2) the boolean `data_are_atomic`. The idea is that `collection` is used to determine of the writer is handling TimeSeries or Seismogram objects ("wf_TimeSeries" or "wf_Seismogram" values respectively)\ and the boolean is used, as the name implies, to infer if the data are atomic or ensembles. This function should only be used as the terminal step of a parallel workflow (i.e. a chain of map/reduce operators). This function will ALWAYS initiate a lazy computation on such a chain of operators because it calls the "compute" method for dask and the "collect" method of spark before returning. It then always returns a list of ObjectIds of live, saved data. The function is dogmatic about that because the return can never be a bag/RDD of the the data. If your workflow requires an intermediate save (i.e. saving data in an intermediate step within a chain of map/reduce opeators) the best approach at present is to use the `save_data` method of the `Database` class in a variant of the following (dask) example that also illustrates how this function is used as the terminator of a chain of map-reduce operators. ``` mybag = read_distributed_data(db,collection='wf_TimeSeries') mybag = mybag.map(detrend) # example # intermediate save mybag = mybag.map(db.save_data,collection="wf_TimeSeries") # more processing - trivial example mybag = mybag.map(filter,'lowpass',freq=1.0) # termination with this function wfids = write_distributed_data(mybag,db,collection='wf_TimeSeries') ``` The `storage_mode` argument is a constant that defines how the SAMPLE DATA are to be stored. Currently this can be "file" or "gridfs", but be aware future evolution may extend the options. "gridfs" is the default as the only complexity it has is a speed throttle by requiring the sample data to move through MongoDB and the potential to overflow the file system where the database is stored. (See User's Manual for more on this topic.). Most users, however, likely want to use the "file" option for that parameter. There are, however, some caveats in that use that users MUST be aware of before using that option with large data sets. Since V2 of MsPASS the file save process was made more robust by allowing a chain of options for how the actual file name where data is stored is set. The algorithm used here is a private method in :class:`mspasspy.db.Database` called `_save_sample_data_to_file`. When used here that that function is passed a None type for dir and dfile. The EXPECTED use is that you as a user should set the dir and dfile attribute for EVERY datum in the bag/RDD this function is asked to handle. That allows each atomic datum to define what the file destination is. For ensembles normal behavior is to require the entire ensemble content to be saved in one file defined by the dir and dfile values in the ensemble's Metadata container. THE WARNING is that to be robust the file writer will alway default a value for dir and dfile. The default dir is the run directory. The default dfile (if not set) is a unique name created by a uuid generator. Care must be taken in file writes to make sure you don't create huge numbers of files that overflow directories or similar file system errors that are all to easy to do with large data set saves. See the User Manual for examples of how to set output file names for a large data set. :param data: parallel container of data to be written :type data: :class:`dask.bag.Bag` or :class:`pyspark.RDD`. :param db: database handle to manage data saved by this function. :type db: :class:`mspasspy.db.database.Database`. :param mode: This parameter defines how attributes defined with key-value pairs in MongoDB documents are to be handled for writes. By "to be handled" we mean how strongly to enforce name and type specification in the schema for the type of object being constructed. Options are ['promiscuous','cautious','pedantic'] with 'promiscuous' being the default. See the User's manual for more details on the concepts and how to use this option. :type mode: :class:`str` :param storage_mode: Must be either "gridfs" or "file. When set to "gridfs" the waveform data are stored internally and managed by MongoDB. If set to "file" the data will be stored in a file system. File names are derived from attributes with the tags "dir" and "dfile" in the standard way. Any datum for which dir or dfile aren't defined will default to the behaviour of the Database class method `save_data`. See the docstring for details but the concept is it will always be bombproof even if not ideal. :type storage_mode: :class:`str` :param scheduler: name of parallel scheduler being used by this writer. MsPASS currently support pyspark and dask. If arg0 is an RDD scheduler must be "spark" and arg0 defines dask bag schduler must be "dask". The function will raise a ValueError exception of scheduler and the type of arg0 are not consistent or if the value of scheduler is illegal. Note with spark the context is not required because of how this algorithm is structured. :type scheduler: string Must be either "dask" or "spark". Default is None which is is equivalent to the value of "dask". :param file_format: the format of the file. This can be one of the `supported formats <https://docs.obspy.org/packages/autogen/obspy.core.stream.Stream.write.html#supported-formats>`__ of ObsPy writer. The default the python None which the method assumes means to store the data in its raw binary form. The default should normally be used for efficiency. Alternate formats are primarily a simple export mechanism. See the User's manual for more details on data export. Used only for "file" storage mode. :type file_format: :class:`str` :param overwrite: If true gridfs data linked to the original waveform will be replaced by the sample data from this save. Default is false, and should be the normal use. This option should never be used after a reduce operator as the parents are not tracked and the space advantage is likely minimal for the confusion it would cause. This is most useful for light, stable preprocessing with a set of map operators to regularize a data set before more extensive processing. It can only be used when storage_mode is set to gridfs. :type overwrite: boolean :param collectiion: name of wf collection where the documents derived from the data are to be saved. Standard values are "wf_TimeSeries" and "wf_Seismogram" for which a schema is defined in MsPASS. Normal use should specify one or the other. The default is "wf_TimeSeries" but normal usage should specify this argument explicitly for clarity in reuse. :type collection: :class:`str` :param exclude_keys: Metadata can often become contaminated with attributes that are no longer needed or a mismatch with the data. A type example is the bundle algorithm takes three TimeSeries objects and produces a single Seismogram from them. That process can, and usually does, leave things like seed channel names and orientation attributes (hang and vang) from one of the components as extraneous baggage. Use this of keys to prevent such attributes from being written to the output documents. Not if the data being saved lack these keys nothing happens so it is safer, albeit slower, to have the list be as large as necessary to eliminate any potential debris. :type exclude_keys: a :class:`list` of :class:`str` :param data_tag: a user specified "data_tag" key. See above and User's manual for guidance on how the use of this option. :type data_tag: :class:`str` :param post_elog: boolean controlling how error log messages are handled. When False (default) error log messages get posted in single transactions with MongoDB to the "elog" collection. When set True error log entries will be posted to as subdocuments to the wf collection entry for each datum. Setting post_elog True is most useful if you anticipate a run will generate a large number of error that could throttle processing with a large number of one-at-a-time document saves. For normal use with small number of errors it is easier to review error issue by inspecting the elog collection than having to query the larger wf collection. :param save_history: When set True (default is False) write will save any object-level history data saved within the input data objects. The related boolean (described below) called post_history controls how such data is saved if this option is enable. Note post_history is ignored unless save_history is True. :param post_history: boolean similar to post_elog for handling object-level history data. It is, however, only handled if the related boolean "save_history" is set True. When post_history is set True the history data will be saved as a subdocument in the wf document saved for each live, atomic datum (note for ensembles that means all live members). When False each atomic datum will generate a insert_one transaction with MongoDB and save the history data in the "history" collection. It then sets the attribute with key "history_id" to the ObjectId of the saved document. The default for this argument is True to avoid accidentally throttling workflows on large data sets. The default for save_history is False so overall default behavior is to drop any history data. :param cremate: boolean controlling handling of dead data. When True dead data will be passed to the `cremate` method of :class:`mspasspy.util.Undertaker` which leaves only ashes to nothing in the return. When False (default) the `bury` method will be called instead which saves a skeleton (error log and Metadata content) of the results in the "cemetery" collection. :param normalizing_collections: list of collection names dogmatically treated as normalizing collection names. The keywords in the list are used to always (i.e. for all modes) erase any attribute with a key name of the form `collection_attribute where `collection` is one of the collection names in this list and attribute is any string. Attribute names with the "_" separator are saved unless the collection field matches one one of the strings (e.g. "channel_vang" will be erased before saving to the wf collection while "foo_bar" will not be erased.) This list should ONLY be changed if a different schema than the default mspass schema is used and different names are used for normalizing collections. (e.g. if one added a "shot" collection to the schema the list would need to be changed to at least add "shot".) :type normalizing_collection: list if strings defining collection names. :param alg_name: do not change :param alg_id: algorithm id for object-level history. Normally assigned by global history manager. """ # We don't do type check on the data argument assuming dask or # spark will throw errors that make the mistake clear. # Too awkward to use an isinstance test so for now at least we don't # test the type of data if not isinstance(db, Database): message = "write_distributed_data: required arg1 (db) must be an instance of mspasspy.db.Database\n" message += "Type of arg1 received is {}".format(str(type(db))) raise TypeError(message) if storage_mode not in ["file", "gridfs"]: raise TypeError( "write_distributed_data: Unsupported storage_mode={}".format(storage_mode) ) if mode not in ["promiscuous", "cautious", "pedantic"]: message = "write_distributed_data: Illegal value of mode={}\n".format(mode) message += ( "Must be one one of the following: promiscuous, cautious, or pedantic" ) raise ValueError(message) if scheduler: if scheduler not in ["dask", "spark"]: message = "write_distributed_data: Illegal value of scheduler={}\n".format( scheduler ) message += "Must be either dask or spark" raise ValueError(message) if scheduler == "spark" and not _mspasspy_has_pyspark: print( "WARNING(write_distributed_data): pyspark not found, will use dask instead. The scheduler argument is ignored." ) scheduler = "dask" else: scheduler = "dask" # This use of the collection name to establish the schema is # a bit fragile as it depends upon the mspass schema naming # convention. Once tried using take(1) and probing the content of # the container but that has bad memory consequences at least for pyspark if collection is None or collection == "wf_TimeSeries": save_schema = db.metadata_schema.TimeSeries elif collection == "wf_Seismogram": save_schema = db.metadata_schema.Seismogram else: message = "write_distributed_data: Illegal value of collection={}\n".format( collection ) message += "Currently must be either wf_TimeSeries, wf_Seismogram, or default that implies wf_TimeSeries" raise ValueError(message) if overwrite: if storage_mode != "gridfs": message = "write_distributed_data: overwrite mode is set True with storage_mode={}\n".format( storage_mode ) message += "overwrite is only allowed with gridfs storage_mode" raise ValueError(message) stedronsky = Undertaker(db) if scheduler == "spark": if data_are_atomic: data = data.map( lambda d: db._save_sample_data( d, storage_mode=storage_mode, dir=None, dfile=None, format=file_format, overwrite=overwrite, ) ) pyspark_interface = pyspark_mappartition_interface(db, collection) # With atomic data dead in this implementation we handle # any dead datum with the map operators that saves the # wf documents. Dead data return a None instead of an id # by default and leave a body in the cemetery collection # unless cremate is set true data = data.map( lambda d: _atomic_extract_wf_document( d, db, save_schema, exclude_keys, mode, normalizing_collections, post_elog=post_elog, save_history=save_history, post_history=post_history, data_tag=data_tag, undertaker=stedronsky, cremate=cremate, ) ) data = data.mapPartitions(pyspark_interface.partitioned_save_wfdoc) data = data.collect() else: # This step adds some minor overhead, but it can reduce # memory use at a small cost. Ensembles are particularly # prone to memory problems so for now view this as worth doing # Note _save_ensemble_wfdocs is assumed to handle the bodies # cleanly when cremate is False (note when true dead members # are vaporized with no trace). The default runs bury which # will create some overhead if there are a lot of bodies to handle # in both cases the ensemble has the bodies removed by the undertaker if cremate: data = data.map(stedronsky.cremate) else: data = data.map(lambda d: stedronsky.bury(d, save_history=save_history)) # Note with ensembles we delay saving sample data until the undertaker # has taken care of the dead - different from atomic data. # Works for atomic data data as it does nothing if the datum is # marked dead. Similarly if an entire ensemble is marked dead this # will do nothing. data = data.map( lambda d: db._save_sample_data( d, storage_mode=storage_mode, dir=None, dfile=None, format=file_format, overwrite=overwrite, ) ) data = data.map( lambda d: _save_ensemble_wfdocs( d, db, save_schema, exclude_keys, mode, stedronsky, normalizing_collections, cremate=cremate, post_elog=post_elog, save_history=save_history, post_history=post_history, data_tag=data_tag, ) ) data = data.collect() else: if data_are_atomic: # See comment at top of spark section - this code is exactly # the same by in the dask dialect data = data.map( db._save_sample_data, storage_mode=storage_mode, dir=None, dfile=None, format=file_format, overwrite=overwrite, ) data = data.map( _atomic_extract_wf_document, db, save_schema, exclude_keys, mode, normalizing_collections, post_elog=post_elog, save_history=save_history, post_history=post_history, data_tag=data_tag, undertaker=stedronsky, cremate=cremate, ) data = data.map_partitions( _partitioned_save_wfdoc, db, collection=collection ) # necessary here or the map_partition function will fail # because it will receive a DAG structure instead of a list data = data.compute() else: # This step adds some minor overhead, but it can reduce # memory use at a small cost. Ensembles are particularly # prone to memory problems so for now view this as worth doing # Note _save_ensemble_wfdocs is assumed to handle the bodies # cleanly when cremate is False (note when true dead members # are vaporized with no trace) Default is bury which will # cause some overhead if the number of bodies is large. # in both cases the ensemble has the bodies removed by the undertaker if cremate: data = data.map(stedronsky.cremate) else: data = data.map(stedronsky.bury, save_history=save_history) # important comment about this next line in the spark section data = data.map( db._save_sample_data, storage_mode=storage_mode, dir=None, dfile=None, format=file_format, overwrite=overwrite, ) data = data.map( _save_ensemble_wfdocs, db, save_schema, exclude_keys, mode, stedronsky, normalizing_collections, cremate=cremate, post_elog=post_elog, save_history=save_history, post_history=post_history, data_tag=data_tag, ) data = data.compute() return data
[docs]def read_to_dataframe( db, cursor, mode="promiscuous", normalize=None, load_history=False, exclude_keys=None, data_tag=None, alg_name="read_to_dataframe", alg_id="0", define_as_raw=False, retrieve_history_record=False, ): """ Read the documents defined by a MongoDB cursor into a panda DataFrame. The data stucture called a panda DataFrame is heavily used by many python user's. This is convenience function for users wanting to use that api to do pure metadata operations. Be warned this function originated as a prototype where we experimented with using a dask or pyspark DataFrame as an intermediatry for parallel readers. We developed an alternative algorithm that made the baggage of the intermediary unnecessary. The warning is the function is not mainstream and may be prone to issues. :param db: the database from which the data are to be read. :type db: :class:`mspasspy.db.database.Database`. :param object_id: MongoDB object id of the wf document to be constructed from data defined in the database. The object id is guaranteed unique and provides a unique link to a unique document or nothing. In the later case the function will return a None. :type cursor: :class:`pymongo.cursor.CursorType` :param mode: reading mode that controls how the function interacts with the schema definition for the data type. Must be one of ['promiscuous','cautious','pedantic']. See user's manual for a detailed description of what the modes mean. Default is 'promiscuous' which turns off all schema checks and loads all attributes defined for each object read. :type mode: :class:`str` :param normalize: list of collections that are to used for data normalization. (see User's manual and MongoDB documentation for details on this concept) Briefly normalization means common metadata like source and receiver geometry are defined in separate smaller collections that are linked through this mechanism during reads. Default uses no normalization. :type normalize: a :class:`list` of :class:`str` :param load_history: boolean (True or False) switch used to enable or disable object level history mechanism. When set True each datum will be tagged with its origin id that defines the leaf nodes of a history G-tree. See the User's manual for additional details of this feature. Default is False. :param exclude_keys: Sometimes it is helpful to remove one or more attributes stored in the database from the data's Metadata (header) so they will not cause problems in downstream processing. :type exclude_keys: a :class:`list` of :class:`str` :param collection: Specify an alternate collection name to use for reading the data. The default sets the collection name based on the data type and automatically loads the correct schema. The collection listed must be defined in the schema and satisfy the expectations of the reader. This is an advanced option that is indended only to simplify extensions to the reader. :param data_tag: The definition of a dataset can become ambiguous when partially processed data are saved within a workflow. A common example would be windowing long time blocks of data to shorter time windows around a particular seismic phase and saving the windowed data. The windowed data can be difficult to distinguish from the original with standard queries. For this reason we make extensive use of "tags" for save and read operations to improve the efficiency and simplify read operations. Default turns this off by setting the tag null (None). :type data_tag: :class:`str` :param alg_name: alg_name is the name the func we are gonna save while preserving the history. :type alg_name: :class:`str` :param alg_id: alg_id is a unique id to record the usage of func while preserving the history. :type alg_id: :class:`bson.objectid.ObjectId` :param define_as_raw: a boolean control whether we would like to set_as_origin when loading processing history :type define_as_raw: :class:`bool` :param retrieve_history_record: a boolean control whether we would like to load processing history :type retrieve_history_record: :class:`bool` """ collection = cursor.collection.name dbschema = db.database_schema mdschema = db.metadata_schema this_elog = ErrorLogger() md_list = list() for doc in cursor: # Use the databhase module function doc2md that standardizes # handling of schema constraints and exlcude_keys md, aok, elog = doc2md( doc, dbschema, mdschema, collection, exclude_keys, mode, ) if aok: md_list.append(md) else: this_elog += elog if elog.size() > 0: print( "WARNING(read_to_dataframe): ", elog.size(), " errors were handled during dataframe construction", ) print( "All data associated with these errors were dropped. Error log entries from doc2md follow" ) errorlist = elog.get_error_log() for entry in errorlist: print(entry.algorithm, entry.message, entry.badness) # convert the metadata list to a dataframe return pd.json_normalize(map(lambda cur: cur.todict(), md_list))