import os
import yaml
import pymongo
import collections
import json
try:
import dask.bag as daskbag
except ImportError:
pass
try:
import pyspark
except ImportError:
pass
from bson.objectid import ObjectId
from mspasspy.ccore.utility import MsPASSError, AntelopePf
from mspasspy.util.converter import AntelopePf2dict
from datetime import datetime
from dill.source import getsource
import functools
import mspasspy.algorithms.signals as signals
from mspasspy.global_history.ParameterGTree import ParameterGTree, parameter_to_GTree
[docs]def mspass_map(
data,
func,
global_history=None,
object_history=False,
alg_id=None,
alg_name=None,
parameters=None,
):
"""
This decorator method performs map function in Python. Instead of performing the normal map function,
if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional)
as input, the global history manager will log down the usage of the algorithm. Also, if user set
object_history to be True, then each mspass object in this map function will save the object level history.
:param data: a iterable which is to be mapped.
:param func: target function to which map passes each element of given iterable.
:param global_history: a user specified global history manager
:type global_history: :class:`GlobalHistoryManager`
:param object_history: save the each object's history in the map when True
:param alg_id: a user specified alg_id for the map operation
:type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
:param alg_name: a user specified alg_name for the map operation
:type alg_name: :class:`str`
:param parameters: a user specified parameters for the map operation
:type parameters: :class:`str`
:return: mapped objects.
"""
if parameters:
parameterGTree = parameter_to_GTree(parameters_str=parameters)
else:
new_kwargs = {}
new_kwargs["object_history"] = object_history
if alg_name:
new_kwargs["alg_name"] = alg_name
if alg_id:
new_kwargs["alg_id"] = alg_id
parameterGTree = parameter_to_GTree(**new_kwargs)
parameters_json = json.dumps(parameterGTree.asdict())
if not alg_name:
alg_name = func.__name__
# get the alg_id if exists, else create a new one
if not alg_id:
# get the alg_id if exists
if global_history:
alg_id = global_history.get_alg_id(alg_name, parameters_json)
# else create a new one
if not alg_id:
alg_id = ObjectId()
# save the global history
if global_history:
global_history.logging(alg_id, alg_name, parameters_json)
# save the object history
if object_history:
return map(
lambda wf: func(
wf,
object_history=object_history,
alg_name=alg_name,
alg_id=str(alg_id),
),
data,
)
return map(lambda wf: func(wf, object_history=object_history), data)
[docs]def mspass_reduce(
data,
func,
global_history=None,
object_history=False,
alg_id=None,
alg_name=None,
parameters=None,
):
"""
This method performs reduce function using functools. Instead of performing the normal reduce function,
if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional)
as input, the global history manager will log down the usage of the algorithm. Also, if user set
object_history to be True, then each mspass object in this reduce function will save the object level history.
:param data: data to be processed, it needs to be a iterable. Apply func of two arguments cumulatively
to the items of iterable, from left to right, so as to reduce the iterable to a single value.
:param func: target function
:param global_history: a user specified global history manager
:type global_history: :class:`GlobalHistoryManager`
:param object_history: save the each object's history in the reduce when True
:param alg_id: a user specified alg_id for the reduce operation
:type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
:param alg_name: a user specified alg_name for the reduce operation
:type alg_name: :class:`str`
:param parameters: a user specified parameters for the reduce operation
:type parameters: :class:`str`
:return: reduced objects.
"""
if parameters:
parameterGTree = parameter_to_GTree(parameters_str=parameters)
else:
new_kwargs = {}
new_kwargs["object_history"] = object_history
if alg_name:
new_kwargs["alg_name"] = alg_name
if alg_id:
new_kwargs["alg_id"] = alg_id
parameterGTree = parameter_to_GTree(**new_kwargs)
parameters_json = json.dumps(parameterGTree.asdict())
if not alg_name:
alg_name = func.__name__
# get the alg_id if exists, else create a new one
if not alg_id:
# get the alg_id if exists
if global_history:
alg_id = global_history.get_alg_id(alg_name, parameters_json)
# else create a new one
if not alg_id:
alg_id = ObjectId()
# save the global history
if global_history:
global_history.logging(alg_id, alg_name, parameters_json)
# save the object history
if object_history:
return functools.reduce(
lambda a, b: func(
a,
b,
object_history=object_history,
alg_name=alg_name,
alg_id=str(alg_id),
),
data,
)
return functools.reduce(
lambda a, b: func(a, b, object_history=object_history), data
)
[docs]def mspass_spark_map(
self,
func,
*args,
global_history=None,
object_history=False,
alg_id=None,
alg_name=None,
parameters=None,
**kwargs,
):
"""
This decorator method add more functionaliy on the standard spark map method and be a part of member functions
in the spark RDD library. Instead of performing the normal map function, if user provides global history manager,
alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down
the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this map function
will save the object level history.
:param func: target function
:param global_history: a user specified global history manager
:type global_history: :class:`GlobalHistoryManager`
:param object_history: save the each object's history in the map when True
:param alg_id: a user specified alg_id for the map operation
:type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
:param alg_name: a user specified alg_name for the map operation
:type alg_name: :class:`str`
:param parameters: a user specified parameters for the map operation
:type parameters: :class:`str`
:return: a spark `RDD` format of objects.
"""
if parameters:
parameterGTree = parameter_to_GTree(parameters_str=parameters)
else:
new_kwargs = kwargs.copy()
new_kwargs["object_history"] = object_history
if alg_name:
new_kwargs["alg_name"] = alg_name
if alg_id:
new_kwargs["alg_id"] = alg_id
parameterGTree = parameter_to_GTree(*args, **new_kwargs)
parameters_json = json.dumps(parameterGTree.asdict())
if not alg_name:
alg_name = func.__name__
# get the alg_id if exists, else create a new one
if not alg_id:
# get the alg_id if exists
alg_id = global_history.get_alg_id(alg_name, parameters_json)
# else create a new one
if not alg_id:
alg_id = ObjectId()
# save the global history
if global_history:
global_history.logging(alg_id, alg_name, parameters_json)
# read_data method
if alg_name.rfind("read_data") != -1 and alg_name.rfind("read_data") + 9 == len(
alg_name
):
if global_history:
return self.map(
lambda wf: func(
wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs
)
)
else:
return self.map(lambda wf: func(wf, *args, **kwargs))
# save_data method
if alg_name.rfind("save_data") != -1 and alg_name.rfind("save_data") + 9 == len(
alg_name
):
# (return_code, mspass_object) is return for save_data, otherwise the original mspass_object is unchanged
if global_history:
return self.map(
lambda wf: (
func(wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs),
wf,
)
)
else:
return self.map(lambda wf: (func(wf, *args, **kwargs), wf))
# save the object history
if object_history:
return self.map(
lambda wf: func(
wf,
*args,
object_history=object_history,
alg_name=alg_name,
alg_id=str(alg_id),
**kwargs,
)
)
return self.map(lambda wf: func(wf, *args, object_history=object_history, **kwargs))
[docs]def mspass_dask_map(
self,
func,
*args,
global_history=None,
object_history=False,
alg_id=None,
alg_name=None,
parameters=None,
**kwargs,
):
"""
This decorator method add more functionaliy on the standard dask map method and be a part of member functions
in the dask bag library. Instead of performing the normal map function, if user provides global history manager,
alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down
the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this map function
will save the object level history.
:param func: target function
:param global_history: a user specified global history manager
:type global_history: :class:`GlobalHistoryManager`
:param object_history: save the each object's history in the map when True
:param alg_id: a user specified alg_id for the map operation
:type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
:param alg_name: a user specified alg_name for the map operation
:type alg_name: :class:`str`
:param parameters: a user specified parameters for the map operation
:type parameters: :class:`str`
:return: a dask `bag` format of objects.
"""
if parameters:
parameterGTree = parameter_to_GTree(parameters_str=parameters)
else:
new_kwargs = kwargs.copy()
new_kwargs["object_history"] = object_history
if alg_name:
new_kwargs["alg_name"] = alg_name
if alg_id:
new_kwargs["alg_id"] = alg_id
parameterGTree = parameter_to_GTree(*args, **new_kwargs)
parameters_json = json.dumps(parameterGTree.asdict())
if not alg_name:
alg_name = func.__name__
# get the alg_id if exists, else create a new one
if not alg_id:
# get the alg_id if exists
alg_id = global_history.get_alg_id(alg_name, parameters_json)
# else create a new one
if not alg_id:
alg_id = ObjectId()
# save the global history
if global_history:
global_history.logging(alg_id, alg_name, parameters_json)
# read_data method
if alg_name.rfind("read_data") != -1 and alg_name.rfind("read_data") + 9 == len(
alg_name
):
if global_history:
return self.map(
lambda wf: func(
wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs
)
)
else:
return self.map(lambda wf: func(wf, *args, **kwargs))
# save_data method
if alg_name.rfind("save_data") != -1 and alg_name.rfind("save_data") + 9 == len(
alg_name
):
# (return_code, mspass_object) is return for save_data, otherwise the original mspass_object is unchanged
if global_history:
return self.map(
lambda wf: (
func(wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs),
wf,
)
)
else:
return self.map(lambda wf: (func(wf, *args, **kwargs), wf))
# save the object history
if object_history:
return self.map(
func,
*args,
object_history=object_history,
alg_name=alg_name,
alg_id=str(alg_id),
**kwargs,
)
return self.map(func, *args, object_history=object_history, **kwargs)
[docs]def mspass_spark_reduce(
self,
func,
*args,
global_history=None,
object_history=False,
alg_id=None,
alg_name=None,
parameters=None,
**kwargs,
):
"""
This decorator method add more functionaliy on the standard spark reduce method and be a part of member functions
in the spark RDD library. Instead of performing the normal reduce function, if user provides global history manager,
alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down
the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this reduce function
will save the object level history.
:param func: target function
:param global_history: a user specified global history manager
:type global_history: :class:`GlobalHistoryManager`
:param object_history: save the each object's history in the reduce when True
:param alg_id: a user specified alg_id for the reduce operation
:type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
:param alg_name: a user specified alg_name for the reduce operation
:type alg_name: :class:`str`
:param parameters: a user specified parameters for the reduce operation
:type parameters: :class:`str`
:return: a spark `RDD` format of objects.
"""
if parameters:
parameterGTree = parameter_to_GTree(parameters_str=parameters)
else:
new_kwargs = kwargs.copy()
new_kwargs["object_history"] = object_history
if alg_name:
new_kwargs["alg_name"] = alg_name
if alg_id:
new_kwargs["alg_id"] = alg_id
parameterGTree = parameter_to_GTree(*args, **new_kwargs)
parameters_json = json.dumps(parameterGTree.asdict())
if not alg_name:
alg_name = func.__name__
# get the alg_id if exists, else create a new one
if not alg_id:
# get the alg_id if exists
alg_id = global_history.get_alg_id(alg_name, parameters_json)
# else create a new one
if not alg_id:
alg_id = ObjectId()
# save the global history
if global_history:
global_history.logging(alg_id, alg_name, parameters_json)
# save the object history
if object_history:
return self.reduce(
lambda a, b: func(
a,
b,
*args,
object_history=object_history,
alg_name=alg_name,
alg_id=str(alg_id),
**kwargs,
)
)
return self.reduce(
lambda a, b: func(a, b, *args, object_history=object_history, **kwargs)
)
[docs]def mspass_dask_fold(
self,
func,
*args,
global_history=None,
object_history=False,
alg_id=None,
alg_name=None,
parameters=None,
**kwargs,
):
"""
This decorator method add more functionaliy on the standard dask fold method and be a part of member functions
in the dask bag library. Instead of performing the normal fold function, if user provides global history manager,
alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down
the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this fold function
will save the object level history.
:param func: target function
:param global_history: a user specified global history manager
:type global_history: :class:`GlobalHistoryManager`
:param object_history: save the each object's history in the fold when True
:param alg_id: a user specified alg_id for the fold operation
:type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
:param alg_name: a user specified alg_name for the fold operation
:type alg_name: :class:`str`
:param parameters: a user specified parameters for the fold operation
:type parameters: :class:`str`
:return: a dask `bag` format of objects.
"""
if parameters:
parameterGTree = parameter_to_GTree(parameters_str=parameters)
else:
new_kwargs = kwargs.copy()
new_kwargs["object_history"] = object_history
if alg_name:
new_kwargs["alg_name"] = alg_name
if alg_id:
new_kwargs["alg_id"] = alg_id
parameterGTree = parameter_to_GTree(*args, **new_kwargs)
parameters_json = json.dumps(parameterGTree.asdict())
if not alg_name:
alg_name = func.__name__
# get the alg_id if exists, else create a new one
if not alg_id:
# get the alg_id if exists
alg_id = global_history.get_alg_id(alg_name, parameters_json)
# else create a new one
if not alg_id:
alg_id = ObjectId()
# save the global history
if global_history:
global_history.logging(alg_id, alg_name, parameters_json)
# save the object history
if object_history:
return self.fold(
lambda a, b: func(
a,
b,
*args,
object_history=object_history,
alg_name=alg_name,
alg_id=str(alg_id),
**kwargs,
)
)
return self.fold(
lambda a, b: func(a, b, *args, object_history=object_history, **kwargs)
)
[docs]class GlobalHistoryManager:
"""
A Global History Mananger handler.
This is a handler used in the mspass_client, normally user should not directly create
a Global History Manager by his own. Instead, user should get the Global History Manager
through mspass client's methods.
"""
def __init__(self, database_instance, job_name, collection=None):
self.job_name = job_name
# generate an bson UUID for this job, should be unique on the application level
self.job_id = ObjectId()
self.history_db = database_instance
self.collection = collection
if not self.collection:
# use the `history` collection defined in database schema
self.collection = self.history_db.database_schema.default_name(
"history_global"
)
# create unique index -> (alg_name, parameters)
self.history_db[self.collection].create_index(
[("alg_name", pymongo.TEXT), ("parameters", pymongo.TEXT)],
)
# modify pyspark/dask map to our defined map
try:
daskbag.Bag.mspass_map = mspass_dask_map
except NameError:
pass
try:
pyspark.RDD.mspass_map = mspass_spark_map
except NameError:
pass
# modify pyspark/dask reduce to our defined reduce
try:
daskbag.Bag.mspass_reduce = mspass_dask_fold
except NameError:
pass
try:
pyspark.RDD.mspass_reduce = mspass_spark_reduce
except NameError:
pass
[docs] def logging(self, alg_id, alg_name, parameters):
"""
Save the usage of the algorithm in the map/reduce operation
:param alg_id: the UUID of the combination of algorithm_name and parameters
:type alg_id: :class:`bson.objectid.ObjectId`
:param alg_name: the name of the algorithm
:type alg_name: :class:`str`
:param parameters: the parameters of the algorithm
:type parameters: :class:`str`
"""
# current timestamp when logging into database
timestamp = datetime.utcnow().timestamp()
self.history_db[self.collection].insert_one(
{
"time": timestamp,
"job_id": self.job_id,
"job_name": self.job_name,
"alg_name": alg_name,
"alg_id": alg_id,
"parameters": parameters,
}
)
[docs] def get_alg_id(self, alg_name, parameters):
"""
Save the usage of the algorithm in the map/reduce operation
:param alg_name: the name of the algorithm
:type alg_name: :class:`str`
:param alg_id: the UUID of the combination of algorithm_name and parameters
:type alg_id: :class:`bson.objectid.ObjectId`
:param parameters: the parameters of the algorithm
:type parameters: :class:`str`
"""
# no alg_name and parameters combination in the database
if not self.history_db[self.collection].count_documents(
{"alg_name": alg_name, "parameters": parameters}
):
return None
doc = self.history_db[self.collection].find_one(
{"alg_name": alg_name, "parameters": parameters}
)
return doc["alg_id"]
[docs] def get_alg_list(self, job_name, job_id=None):
"""
Get a list of history records by job name(and job_id)
:param job_name: the name of the job
:type job_name: :class:`str`
:param job_id: the UUID of the job
:type job_id: :class:`bson.objectid.ObjectId`
"""
query = {"job_name": job_name}
if job_id:
query["job_id"] = job_id
alg_list = []
docs = self.history_db[self.collection].find(query)
for doc in docs:
alg_list.append(doc)
return alg_list
[docs] def set_alg_name_and_parameters(self, alg_id, alg_name, parameters):
"""
Set the alg_name and parameters by a user specified alg_id
:param alg_id: the UUID of the combination of algorithm_name and parameters, used to find the records
:type alg_id: :class:`bson.objectid.ObjectId`
:param alg_name: the name of the algorithm user would like to set
:type alg_name: :class:`str`
:param parameters: the parameters of the algorithm user would like to set
:type parameters: :class:`str`
"""
doc = self.history_db[self.collection].find_one({"alg_id": alg_id})
if not doc:
raise MsPASSError("No such history record with alg_id = " + alg_id, "Fatal")
update_dict = {}
update_dict["alg_name"] = alg_name
update_dict["parameters"] = parameters
self.history_db[self.collection].update_many(
{"alg_id": alg_id}, {"$set": update_dict}
)