import pymongo
[docs]def get_jobid(db):
"""
Ask MongoDB for the a valid jobid.
All processing jobs should have a call to this function at the beginning
of the job script. It simply queries MongoDB for the largest current
value of the key "jobid" in the history collection. If the history
collection is empty it returns 1 under a bias that a jobid of 0 is
illogical.
:param db: database handle
:type db: top level database handle returned by a call to MongoClient.database
"""
hiscol = db.history
hist_size = hiscol.count_documents({})
if hist_size <= 0:
return 1
else:
maxcur = hiscol.find().sort([("jobid", pymongo.DESCENDING)]).limit(1)
maxcur.rewind() # may not be necessary but near zero cost
maxdoc = maxcur[0]
return maxdoc["jobid"] + 1
[docs]def pfbranch_to_dict(pf, key):
"""
Recursive function to convert a single branch in an AntelopePf to a python dict.
This function utilizes recursion to follow a chain of arbitrary length of
branches defined in an AntelopePf object. Result is a dict with a chain of
dicts of the same length. i.e. if AntelopePf has 3 levels of branches
the dict will have a 3 levels of associative arrays keyed by the same
branch names as the Arr items in the original Pf file. Note this should
be called from the top level one branch at a time. i.e. for the parent
AntelopePf this function should be called once for each returned key by
pf.arr_keys().
Note that at each level Tbl& sections of the original pf are parsed to
be converted to lists of strings with each line of the Tbl section being
one string in the list.
:param pf: is an AntelopePf. Recursive calls use get_branch outputs
that return one of these.
:param key: key used to access the branch requested
:type key: string
:return: python dict translation of AntelopePf branch structure
:raise: RunTime errors are possible from the ccore methods that are called.
"""
brkeys = pf.arr_keys()
if len(brkeys) > 0:
# This loads simple parameters at this level
allbrdata = pf.todict()
# This loads any Tbl& data at this level of the hierarchy
tblkeys = pf.tbl_keys()
for k in tblkeys:
tvals = pf.get_tbl(k)
allbrdata[k] = tvals
for k in brkeys:
pfbranch = pf.get_branch(k)
bk = pfbranch_to_dict(pfbranch, k)
allbrdata[k] = bk
return allbrdata
else:
brdata = pf.todict()
return brdata
[docs]class basic_history_data:
"""
This is a pure data object that if it were written in C could be
defined as a struct. It holds the data used to define the
parameters for a given algorithm.
"""
def __init__(self, job):
self.jobid = job
self.algorithm = "UNDEFINED"
self.param_type = "NONE"
self.params = {} # dict with unspecified content dumped to collection
[docs] def load_algorithm_args(self, alg, argdict):
"""
Loads parameters defined to a set of function arguments.
Simple algorithms without a lot of parameters often simply need a
set of argument values. Here we require this to be defined by
a set of key:value pairs that map to dict. We also consider this
the lowest common denominator for a parameter definition so make
it a part of the base class.
:param alg: This should be a string defining the algorithm being registered.
:param argdict: This should be a dict of key:value pairs defining input
parameters. For algorithms defined at the top level by a python
function this should match the names of parameters in the arg list.
For C++ functions wrapped with pybind11 it should match the arg
keys defined in the wrappers. The key string will be used for
key:value pair in BSON written to MongoDB.
"""
self.algorithm = alg
self.param_type = "dict"
self.params = argdict
[docs]class pf_history_data(basic_history_data):
"""
Loads history data container with data from an AntelopePf object.
mspasspy.ccore.utility defines the AntelopePf object that is an option for parameter
inputs. The file structure is identical to the
Antelope Pf file syntax. The API to an AntelopePF is not, however,
the same as the python bindings in Antelope as it handles Tbl
and Arr sections completely differently more in line with alternatives
like YAML. This method converts the data in an AntelopePf to a python
dict that can be dumped directly to MongoDB with pymongo's insert
methods. Converting the MongoDB document back to a pf structure requires
the inverse operator that does not exist, but should eventually be
created if this approach sees extensive use.
"""
def __init__(self, job, alg, pf):
"""
Basic constructor for this subclass.
This constructor applies the construction is initialization model
of oop. The AntelopePf pointed to by pf is parsed in this constuctor
to file and set the params dict and other attributes.
:param job: jobid (integer) normally should be preceded by call to get_jobid function.
:param alg: string defining a name assigned to the algorithm field
:param pf: AntelopePf object to be parsed and posted.
"""
self.jobid = job
self.algorithm = alg
self.param_type = "AntelopePf"
# This works because Metadata2dict calls pf.keys() which only returns
# simple name:value pair parameters. We use branch and tbl calls later
self.params = pf.todict()
# tbl's next - simpler than a Arr which requires recursion
tblkeys = pf.tbl_keys()
for k in tblkeys:
tvals = pf.get_tbl(k)
# testing suggests tvals is a regular python list so this
# should work cleanly
self.params[k] = tvals
# Arr's are harder if we want to allow them to be arbitrarily deep.
# Hence we use this recursive function defined above.
arrkeys = pf.arr_keys()
for k in arrkeys:
branchval = {}
branchval = pfbranch_to_dict(pf, k)
self.params[k] = branchval
[docs]class HistoryLogger:
"""
Base class for generic, global history/provenance preservation in MsPASS.
The main concept of this object that a pymongo script to run a processing
job would create this object or one of it's children to preserve the
global run parameters for the a processing sequence. We limit that to
mean a sequence of processing algorithms that have a set of predefined
parameters that control their behaviour. The global parameters are
preserved in a special collection in MongoDB we give the (fixed) name
of "history".
"""
def __init__(self, db, job=0):
"""
Basic constructor.
This is currently the only constructor for this object. It creates
a handle to MongoDB and sets a unique integer key called jobid.
Calling this constructor will guaranetee the jobid will be unique.
:param db: is a top level handle to a MongoDB server created by
calling the database method of a MongoClient instance.
:param job: job can be used to manually set the jobid. We use
a simple high water mark comparable to lastid in the
Antelope/Datascope database where the next valid id is lastid+1.
Hence if the input value of job is less than the current high
water mark in the history collection for jobid, the jobid is
silently set to the +1 of the largest jobid found in history.
(default is 0 which automatically uses the high water mark method)
Users can get the actual value set from the jobid variable after
successful creation of this object.
"""
self.history_collection = db.history
# Check the input job id for validity and use get_jobid if needed
if job == 0:
self.jobid = get_jobid(db)
else:
jobtmp = get_jobid(db)
if job > jobtmp:
self.jobid = job
else:
self.jobid = jobtmp
print(
"HistoryLogger(Warning): input jobid=",
job,
" was invalid. Set jobid=",
jobtmp,
)
self.history_chain = [] # create empty container for history record
[docs] def register(self, alg, partype, params):
"""
Register an algorithm's signature to preserve processing history.
Each algorithm in a processing chains should be registered by this
mechanism before starting a mspass processing chain. The
register method should be called in the order in which the algorithms are
applied.
:param alg: is the name of the algorithm that will be run. Assumed to be
a string.
:param partype: defines the format of the data defining input parameters
to this algorithm (Must be either 'dict' or 'AntelopePf')
:param params: is the actual input data. Actual type of this data
this arg references will depend up partype. partype defines
the type of the object expect (dict in this case means a python
dict object)
:raise: Throws a RuntimeError with a message if partype is not on the
list of supported parameter types
"""
if partype == "dict":
bhd = basic_history_data(self.jobid)
bhd.load_algorithm_args(alg, params)
self.history_chain.append(bhd)
elif partype == "AntelopePf":
pfhis = pf_history_data(self.jobid, alg, params)
self.history_chain.append(pfhis)
else:
raise RuntimeError(
"HistoryLogger (Warning): Unsupported parameter type=" + partype
)
[docs] def save(self):
"""
Save the contents to the history collection.
The doc created in a save is more or less an image of the
structure of this object translated to a python dict
"""
doc = {}
doc["jobid"] = self.jobid
for d in self.history_chain:
subdoc = {}
subdoc["algorithm"] = d.algorithm
subdoc["param_type"] = d.param_type
subdoc["params"] = d.params
doc[d.algorithm] = subdoc
self.history_collection.insert_one(doc)