version  0.0.1
Defines the C++ API for MsPASS
Loading...
Searching...
No Matches
Public Member Functions | Public Attributes | Protected Attributes | List of all members
mspass::utility::ProcessingHistory Class Reference

Lightweight class to preserve procesing chain of atomic objects. More...

#include <ProcessingHistory.h>

Inheritance diagram for mspass::utility::ProcessingHistory:
mspass::utility::BasicProcessingHistory mspass::seismic::Seismogram mspass::seismic::TimeSeries mspass::seismic::SeismogramWGaps mspass::seismic::TimeSeriesWGaps

Public Member Functions

 ProcessingHistory ()
 
 ProcessingHistory (const std::string jobnm, const std::string jid)
 
 ProcessingHistory (const ProcessingHistory &parent)
 
bool is_empty () const
 
bool is_raw () const
 
bool is_origin () const
 
bool is_volatile () const
 
bool is_saved () const
 
size_t number_of_stages () override
 Return number of processing stages that have been applied to this object.
 
void set_as_origin (const std::string alg, const std::string algid, const std::string uuid, const AtomicType typ, bool define_as_raw=false)
 
std::string new_ensemble_process (const std::string alg, const std::string algid, const AtomicType typ, const std::vector< ProcessingHistory * > parents, const bool create_newid=true)
 
void add_one_input (const ProcessingHistory &data_to_add)
 Add one datum as an input for current data.
 
void add_many_inputs (const std::vector< ProcessingHistory * > &d)
 Define several data objects as inputs.
 
void merge (const ProcessingHistory &data_to_add)
 Merge the history nodes from another.
 
void accumulate (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingHistory &newinput)
 Method to use with a spark reduce algorithm.
 
std::string clean_accumulate_uuids ()
 Clean up inconsistent uuids that can be produced by reduce.
 
std::string new_map (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingStatus newstatus=ProcessingStatus::VOLATILE)
 Define this algorithm as a one-to-one map of same type data.
 
std::string new_map (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingHistory &data_to_clone, const ProcessingStatus newstatus=ProcessingStatus::VOLATILE)
 Define this algorithm as a one-to-one map.
 
std::string map_as_saved (const std::string alg, const std::string algid, const AtomicType typ)
 Prepare the current data for saving.
 
void clear ()
 
std::multimap< std::string, mspass::utility::NodeDataget_nodes () const
 
int stage () const
 
ProcessingStatus status () const
 
std::string id () const
 
std::pair< std::string, std::string > created_by () const
 
NodeData current_nodedata () const
 
std::string newid ()
 
int number_inputs () const
 
int number_inputs (const std::string uuidstr) const
 
void set_id (const std::string newid)
 
std::list< mspass::utility::NodeDatainputs (const std::string id_to_find) const
 Return a list of data that define the inputs to a give uuids.
 
ProcessingHistoryoperator= (const ProcessingHistory &parent)
 
- Public Member Functions inherited from mspass::utility::BasicProcessingHistory
 BasicProcessingHistory (const std::string jobname, const std::string jobid)
 
 BasicProcessingHistory (const BasicProcessingHistory &parent)
 
std::string jobid () const
 
void set_jobid (const std::string &newjid)
 
std::string jobname () const
 
void set_jobname (const std::string jobname)
 
BasicProcessingHistoryoperator= (const BasicProcessingHistory &parent)
 

Public Attributes

ErrorLogger elog
 

Protected Attributes

std::multimap< std::string, mspass::utility::NodeDatanodes
 
- Protected Attributes inherited from mspass::utility::BasicProcessingHistory
std::string jid
 
std::string jnm
 

Detailed Description

Lightweight class to preserve procesing chain of atomic objects.

This class is intended to be used as a parent for any data object in MsPASS that should be considered atomic. It is designed to completely preserve the chain of processing algorithms applied to any atomic data to put it in it's current state. It is designed to save that information during processing with the core information that can then be saved to define the state. Writers for atomic objects inheriting this class should arrange to save the data contained in it to history collection in MongoDB. Note that actually doing the inverse is a different problem that are expected to be implemented as extesions of this class to be used in special programs used to reconstrut a data workflow and the processing chain applied to produce any final output.

The design was complicated by the need to keep the history data from causing memory bloat. A careless implementation could be prone to that problem even for modest chains, but we were particularly worried about iterative algorithms that could conceivably multiply the size of out of control. There was also the fundamental problem of dealing with transient versus data stored in longer term storage instead of just in memory. Our implementation was simplified by using the concept of a unique id with a Universal Unique IDentifier. (UUID) Our history mechanism assumes each data object has a uuid assigned to it on creation by an implementation id of the one object this particular record is associated with on dependent mechanism. That is, whenever a new object is created in MsPASS using the history feature one of these records will be created for each data object that is defined as atomic. This string defines unique key for the object it could be connected to with the this pointer. The parents of the current object are defined by the inputs data structure below.

In the current implementation id is string representation of a uuid maintained by each atomic object. We use a string to maximize flexibility at a minor cost for storage.

Names used imply the following concepts: raw - means the data is new input to mspass (raw data from data center, field experiment, or simulation). That tag means no prior history can be reconstructed. origin - top-level ancestor of current data. The top of a processing chain is always tagged as an origin. A top level can also be "raw" but not necessarily. In particular, readers that load partially processed data should mark the data read as an origin, but not raw. stage - all processed data objects that are volatile elements within a workflow are defined as a stage. They are presumed to leave their existence known only through ancestory preserved in the processing chain. A stage becomes a potential root only when it is saved by a writer where the writer will mark that position as a save. Considered calling this a branch, but that doesn't capture the concept right since we require this mechanism to correctly perserve splits into multiple outputs. We preserve that cleanly for each data object. That is, the implementation make it easy to reconstruct the history of a single final data object, but reconstructing interlinks between objects in an overall processing flow will be a challenge. That was a necessary compomise to avoid memory bloat. The history is properly viewed as a tree branching from a single root (the final output) to leaves that define all it's parents.

The concepts of raw, origin, and stage are implemented with the enum class defined above called ProcessingStatus. Each history record has that as an attribute, but each call to new_stage updates a copy kept inside this object to simplify the python wrappers.

Constructor & Destructor Documentation

◆ ProcessingHistory() [1/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( )

Default constructor.

82 : elog() {
83 current_status = ProcessingStatus::UNDEFINED;
84 current_id = "UNDEFINED";
85 current_stage =
86 -1; // illegal value that could be used as signal for uninitalized
87 mytype = AtomicType::UNDEFINED;
88 algorithm = "UNDEFINED";
89 algid = "UNDEFINED";
90}

◆ ProcessingHistory() [2/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( const std::string  jobnm,
const std::string  jid 
)

Construct and fill in BasicProcessingHistory job attributes.

Parameters
jobnm- set as jobname
jid- set as jobid

◆ ProcessingHistory() [3/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( const ProcessingHistory parent)

Standard copy constructor.

102 : BasicProcessingHistory(parent), elog(parent.elog), nodes(parent.nodes),
103 algorithm(parent.algorithm), algid(parent.algid) {
104 current_status = parent.current_status;
105 current_id = parent.current_id;
106 current_stage = parent.current_stage;
107 mytype = parent.mytype;
108}

Member Function Documentation

◆ accumulate()

void mspass::utility::ProcessingHistory::accumulate ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingHistory newinput 
)

Method to use with a spark reduce algorithm.

A reduce operator in spark utilizes a binary function where two inputs are used to generate a single output object. Because the inputs could be scattered on multiple processor nodes this operation must be associative. The new_ensemble_process method does not satisfy that constraint so this method was necessary to handle that type of algorithm correctly.

The way this algorithm works is it fundamentally branches on two different cases: (1) initialization, which is detected by testing if the node data map is empty or (2) secondary calls. This should work even if multiple inputs are combined at the end of the reduce operation because the copies being merged will not be empty. Note an empty input will create a complaint entry in the error log.

476 {
477 ProcessingHistory newinput(ni);
478 if ((newinput.algorithm != algin) || (newinput.algid != algidin) ||
479 (newinput.jid != newinput.jobid()) ||
480 (newinput.jnm != newinput.jobname())) {
481 NodeData nd;
482 nd = newinput.current_nodedata();
483 newinput.newid();
484 pair<string, NodeData> pn(newinput.current_id, nd);
485 newinput.nodes.insert(pn);
486 newinput.jid = newinput.jobid();
487 newinput.jnm = newinput.jobname();
488 newinput.algorithm = algin;
489 newinput.algid = algidin;
490 newinput.current_status = ProcessingStatus::VOLATILE;
491 newinput.current_stage = nd.stage + 1;
492 newinput.mytype = typ;
493 }
494 /* We have to detect an initialization condition without losing the
495 stored history. There are two conditions we need to handle. First,
496 if we create an empty container to hold the accmulator and put it on the
497 left hand side we will want to clear the history chain or we will
498 accumulate random junk. The second condition is if we accumulate in
499 a way were the left hand side is some existing data where we do want to
500 preserve the history. For the is_empty logic: we just copy the
501 newinput's history and add make its current node data the connection
502 backward - i.e. we have to make a new uuid and add an entry. */
503 if (this->is_empty()) {
504 this->newid();
505 nodes = ni.get_nodes();
506 NodeData nd;
507 nd = ni.current_nodedata();
508 pair<string, NodeData> pn(current_id, nd);
509 this->nodes.insert(pn);
510 this->set_jobid(ni.jobid());
511 this->set_jobname(ni.jobname());
512 algorithm = algin;
513 algid = algidin;
514 current_status = ProcessingStatus::VOLATILE;
515 current_stage = nd.stage + 1;
516 mytype = typ;
517 }
518 /* This is the condition for a left hand side that is not empty but not
519 yet initialized. We detect this condition by a mismatch in all the unique
520 names and ids that mark the current process define this reduce operation*/
521 else if ((this->algorithm != algin) || (this->algid != algidin) ||
522 (this->jid != newinput.jobid()) ||
523 (this->jnm != newinput.jobname())) {
524 /* This is similar to the block above, but the key difference here is we
525 have to push this's history data to convert it's current data to define an
526 input. That means getting a new uuid and pushing current node data to the
527 nodes map as an input */
528 NodeData nd;
529 nd = this->current_nodedata();
530 this->newid();
531 pair<string, NodeData> pn(current_id, nd);
532 this->nodes.insert(pn);
533 this->jid = newinput.jobid();
534 this->jnm = newinput.jobname();
535 this->algorithm = algin;
536 this->algid = algidin;
537 this->current_status = ProcessingStatus::VOLATILE;
538 this->current_stage = nd.stage + 1;
539 this->mytype = typ;
540 this->merge(newinput);
541 } else {
542 this->merge(newinput);
543 }
544}
NodeData current_nodedata() const
Definition ProcessingHistory.cc:659
void merge(const ProcessingHistory &data_to_add)
Merge the history nodes from another.
Definition ProcessingHistory.cc:439
ProcessingHistory()
Definition ProcessingHistory.cc:82
bool is_empty() const
Definition ProcessingHistory.cc:109
std::string newid()
Definition ProcessingHistory.cc:651

References current_nodedata(), get_nodes(), is_empty(), merge(), newid(), and mspass::utility::NodeData::stage.

◆ add_many_inputs()

void mspass::utility::ProcessingHistory::add_many_inputs ( const std::vector< ProcessingHistory * > &  d)

Define several data objects as inputs.

This method acts like add_one_input in that it alters only the inputs chain. In fact it is nothing more than a loop over the components of the vector calling add_one_input for each component.

Parameters
dis the vector of data to define as inputs
304 {
305 vector<ProcessingHistory *>::const_iterator dptr;
306 for (dptr = d.begin(); dptr != d.end(); ++dptr) {
308 ptr = (*dptr);
309 this->add_one_input(*ptr);
310 }
311}
void add_one_input(const ProcessingHistory &data_to_add)
Add one datum as an input for current data.
Definition ProcessingHistory.cc:263

References add_one_input().

◆ add_one_input()

void mspass::utility::ProcessingHistory::add_one_input ( const ProcessingHistory data_to_add)

Add one datum as an input for current data.

This method MUST ONLY be called after a call to new_ensemble_process in the situation were additional inputs need to be defined that were not available at the time new_ensemble_process was called. An example might be a stack that was created within the scope of "algorithm" and then used in some way to create the output data. In any case it differs fundamentally from new_ensemble_process in that it does not touch attributes that define the current state of "this". It simply says this is another input to the data "this" contains.

Parameters
data_to_addis the ProcessingHistory of the data object to be defined as input. Note the type of the data to which it is linked will be saved as the base of the input chain from data_to_add. It can be different from the type of "this".
263 {
264
265 if (data_to_add.is_empty()) {
266 stringstream ss;
267 ss << "Data with uuid=" << data_to_add.id() << " has an empty history chain"
268 << endl
269 << "At best this will leave ProcessingHistory incomplete" << endl;
270 elog.log_error("ProcessingHistory::add_one_input", ss.str(),
271 ErrorSeverity::Complaint);
272 } else {
273 multimap<string, NodeData>::iterator nptr;
274 multimap<string, NodeData> newhistory = data_to_add.get_nodes();
275 multimap<string, NodeData>::iterator nl, nu;
276 /* As above this one needs check for duplicates and only add
277 a node if the data are unique. This is simple compared to
278 new_ensemble_process because we just have to check one object's history at a
279 time. */
280 for (nptr = newhistory.begin(); nptr != newhistory.end(); ++nptr) {
281 string key(nptr->first);
282 if (this->nodes.count(key) > 0) {
283 nl = this->nodes.lower_bound(key);
284 nu = this->nodes.upper_bound(key);
285 for (auto ptr = nl; ptr != nu; ++ptr) {
286 NodeData ndtest(ptr->second);
287 if (ndtest != (nptr->second)) {
288 this->nodes.insert(*nptr);
289 }
290 }
291 } else {
292 this->nodes.insert(*nptr);
293 }
294 }
295 /* Don't forget head node data*/
296 NodeData nd = data_to_add.current_nodedata();
297 NodeData ndhere = this->current_nodedata();
298 pair<string, NodeData> pnd(current_id, nd);
299 this->nodes.insert(pnd);
300 }
301}
int log_error(const mspass::utility::MsPASSError &merr)
Definition ErrorLogger.cc:72

References current_nodedata(), get_nodes(), id(), is_empty(), and mspass::utility::ErrorLogger::log_error().

◆ clean_accumulate_uuids()

string mspass::utility::ProcessingHistory::clean_accumulate_uuids ( )

Clean up inconsistent uuids that can be produced by reduce.

In a spark reduce operation it is possible to create multiple uuid keys for inputs to the same algorithm instance. That happpens because the mechanism used by ProcessingHistory to define the process history tree is not associative. When a reduce gets sprayed across multiple nodes multiple initializations can occur that make artifical inconsitent uuids. This method should normally be called after a reduce operator if history is being preserved or the history chain may be foobarred - no invalid just mess up with extra branches in the processing tree.

A VERY IMPORTANT limitation of the algorithm used by this method is that the combination of algorithm and algid in "this" MUST be unique for a given job run when a reduce is called. i.e. if an earlier workflow had used alg and algid but with a different jobid and jobname the distintion cannot be detected with this algorithm. This means our global history handling must guarantee algid is unique for each run.

Returns
unique uuid for alg,algid match set in the history chain. Note if there are no duplicates it simply returns the only one it finds. If there are duplicates it returns the lexically smallest (first in alphabetic order) uuid. Most importantly if there is no match or if history is empty it returns the string UNDEFINED.
546 {
547 /* Return undefined immediately if the history chain is empty */
548 if (this->is_empty())
549 return string("UNDEFINED");
550 NodeData ndthis = this->current_nodedata();
551 string alg(ndthis.algorithm);
552 string algidtest(ndthis.algid);
553 /* The algorithm here finds all entries for which algorithm is alg and
554 algid matches aldid. We build a list of uuids (keys) linked to that unique
555 algorithm. We then use the id in ndthis as the master*/
556 set<string> matching_ids;
557 matching_ids.insert(ndthis.uuid);
558 /* this approach of pushing iterators to this list that match seemed to
559 be the only way I could make this work correctly. Not sure why, but
560 the added cost over handling this correctly in the loops is small. */
561 std::list<multimap<string, NodeData>::iterator> need_to_erase;
562 for (auto nptr = this->nodes.begin(); nptr != this->nodes.end(); ++nptr) {
563 /* this copy operation is somewhat inefficient, but the cost is small
564 compared to how obscure the code will look if we directly manipulate the
565 second value */
566 NodeData nd(nptr->second);
567 /* this depends upon the distinction between set and multiset. i.e. an
568 insert of a duplicate does nothing*/
569 if ((alg == nd.algorithm) && (algidtest == nd.algid)) {
570 matching_ids.insert(nd.uuid);
571 need_to_erase.push_back(nptr);
572 }
573 }
574 // handle no match situation gracefully
575 if (matching_ids.empty())
576 return string("UNDEFINED");
577 /* Nothing more to do but return the uuid if there is only one*/
578 if (matching_ids.size() == 1)
579 return *(matching_ids.begin());
580 else {
581 for (auto sptr = need_to_erase.begin(); sptr != need_to_erase.end();
582 ++sptr) {
583 nodes.erase(*sptr);
584 }
585 need_to_erase.clear();
586 }
587 /* Here is the complicated case. We use the uuid from ndthis as the master
588 and change all the others. This operation works ONLY because in a multimap
589 erase only invalidates the iterator it points to and others remain valid.
590 */
591 string master_uuid = ndthis.uuid;
592 for (auto sptr = matching_ids.begin(); sptr != matching_ids.end(); ++sptr) {
593 /* Note this test is necessary to stip the master_uuid - no else needed*/
594 if ((*sptr) != master_uuid) {
595 multimap<string, NodeData>::iterator nl, nu;
596 nl = this->nodes.lower_bound(*sptr);
597 nu = this->nodes.upper_bound(*sptr);
598 for (auto nptr = nl; nptr != nu; ++nptr) {
599 NodeData nd;
600 nd = (nptr->second);
601 need_to_erase.push_back(nptr);
602 nodes.insert(pair<string, NodeData>(master_uuid, nd));
603 }
604 }
605 }
606 for (auto sptr = need_to_erase.begin(); sptr != need_to_erase.end(); ++sptr) {
607 nodes.erase(*sptr);
608 }
609
610 return master_uuid;
611}

References mspass::utility::NodeData::algid, mspass::utility::NodeData::algorithm, current_nodedata(), is_empty(), and mspass::utility::NodeData::uuid.

◆ clear()

void mspass::utility::ProcessingHistory::clear ( )

Clear this history chain - use with caution.

631 {
632 nodes.clear();
633 current_status = ProcessingStatus::UNDEFINED;
634 current_stage = 0;
635 mytype = AtomicType::UNDEFINED;
636 algorithm = "UNDEFINED";
637 algid = "UNDEFINED";
638}

◆ created_by()

std::pair< std::string, std::string > mspass::utility::ProcessingHistory::created_by ( ) const
inline

Return the algorithm name and id that created current node.

581 {
582 std::pair<std::string, std::string> result(algorithm, algid);
583 return result;
584 }

◆ current_nodedata()

NodeData mspass::utility::ProcessingHistory::current_nodedata ( ) const

Return all the attributes of current.

This is a convenience method strictly for the C++ interface (it too nonpythonic to be useful to wrap for python). It returns a NodeData class containing the attributes of the head of the chain. Like the getters above that is needed to save that data.

659 {
660 NodeData nd;
661 nd.status = current_status;
662 nd.uuid = current_id;
663 nd.type = mytype;
664 nd.stage = current_stage;
665 nd.algorithm = algorithm;
666 nd.algid = algid;
667 return nd;
668}

References mspass::utility::NodeData::algid, mspass::utility::NodeData::algorithm, mspass::utility::NodeData::stage, mspass::utility::NodeData::status, mspass::utility::NodeData::type, and mspass::utility::NodeData::uuid.

◆ get_nodes()

multimap< string, NodeData > mspass::utility::ProcessingHistory::get_nodes ( ) const

Retrieve the nodes multimap that defines the tree stucture branches.

This method does more than just get the protected multimap called nodes. It copies the map and then pushes the "current" contents to the map before returning the copy. This allows the data defines as current to not be pushed into the tree until they are needed.

612 {
613 /* Return empty map if it has no data - necessary or the logic
614 below will insert an empty head to the chain. */
615 if (this->is_empty())
616 return nodes; // a way to return an empty container
617 /* This is wrong, I think, but retained to test before removing.
618 remove this once current idea is confirmed. Note if that
619 proves true we can also remove the two lines above as they do
620 nothing useful*/
621 /*
622 NodeData nd;
623 nd=this->current_nodedata();
624 pair<string,NodeData> pn(current_id,nd);
625 multimap<string,NodeData> result(this->nodes);
626 result.insert(pn);
627 return result;
628 */
629 return nodes;
630}

References is_empty().

◆ id()

std::string mspass::utility::ProcessingHistory::id ( ) const
inline

Return the id of this object set for this history chain.

We maintain the uuid for a data object inside this class. This method fetches the string representation of the uuid of this data object.

579{ return current_id; };

◆ inputs()

list< NodeData > mspass::utility::ProcessingHistory::inputs ( const std::string  id_to_find) const

Return a list of data that define the inputs to a give uuids.

This low level getter returns the NodeData objects that define the inputs to the uuid of some piece of data that was used as input at some stage for the current object.

Parameters
id_to_findis the uuid for which input data is desired.
Returns
list of NodeData that define the inputs. Will silently return empty list if the key is not found.
670 {
671 list<NodeData> result;
672 // Return empty list immediately if key not found
673 if (nodes.count(id_to_find) <= 0)
674 return result;
675 /* Note these have to be const_iterators because method is tagged const*/
676 multimap<string, NodeData>::const_iterator upper, lower;
677 lower = nodes.lower_bound(id_to_find);
678 upper = nodes.upper_bound(id_to_find);
679 multimap<string, NodeData>::const_iterator mptr;
680 for (mptr = lower; mptr != upper; ++mptr) {
681 result.push_back(mptr->second);
682 }
683 return result;
684};

◆ is_empty()

bool mspass::utility::ProcessingHistory::is_empty ( ) const

Return true if the processing chain is empty.

This method provides a standard test for an invalid, empty processing chain. Constructors except the copy constructor will all put this object in an invalid state that will cause this method to return true. Only if the chain is initialized properly with a call to set_as_origin will this method return a false.

109 {
110 if ((current_status == ProcessingStatus::UNDEFINED) && (nodes.empty()))
111 return true;
112 return false;
113}

◆ is_origin()

bool mspass::utility::ProcessingHistory::is_origin ( ) const

Return true if the current data is in state defined as "origin" - see class description

120 {
121 if (current_status == ProcessingStatus::RAW ||
122 current_status == ProcessingStatus::ORIGIN)
123 return true;
124 else
125 return false;
126}

◆ is_raw()

bool mspass::utility::ProcessingHistory::is_raw ( ) const

Return true if the current data is in state defined as "raw" - see class description

114 {
115 if (current_status == ProcessingStatus::RAW)
116 return true;
117 else
118 return false;
119}

◆ is_saved()

bool mspass::utility::ProcessingHistory::is_saved ( ) const

Return true if the current data is in state defined as "saved" - see class description

133 {
134 if (current_status == ProcessingStatus::SAVED)
135 return true;
136 else
137 return false;
138}

◆ is_volatile()

bool mspass::utility::ProcessingHistory::is_volatile ( ) const

Return true if the current data is in state defined as "volatile" - see class description

127 {
128 if (current_status == ProcessingStatus::VOLATILE)
129 return true;
130 else
131 return false;
132}

◆ map_as_saved()

string mspass::utility::ProcessingHistory::map_as_saved ( const std::string  alg,
const std::string  algid,
const AtomicType  typ 
)

Prepare the current data for saving.

Saving data is treated as a special form of map operation. That is because a save by our definition is always a one-to-one operation with an index entry for each atomic object. This method pushes a new entry in the history chain tagged by the algorithm/algid field for the writer. It differs from new_map in the important sense that the uuid is not changed. The record this sets in the nodes multimap will then have the same uuid for the key as the that in NodeData. That along with the status set SAVED can be used downstream to recognize save records.

It is VERY IMPORTANT for use of this method to realize this method saves nothing. It only preps the history chain data so calls that follow will retrieve the right information to reconstruct the full history chain. Writers should follow this sequence:

  1. call map_as_saved with the writer name for algorithm definition
  2. save the data and history chain to MongoDB.
  3. be sure you have a copy of the uuid string of the data just saved and call the clear method.
  4. call the set_as_origin method using the uuid saved with the algorithm/id the same as used for earlier call to map_as_saved. This makes the put ProcessingHistory in a state identical to that produced by a reader.
Parameters
algis the algorithm names to assign to the ouput. This would normally be name defining the writer.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups. Note one model to distinguish records of actual save and redefinition of the data as an origin (see above) is to use a different id for the call to map_as_saved and later call to set_as_origin. This code doesn't care, but that is an implementation detail in how this will work with MongoDB.
typdefines the data type (C++ class) that was just saved.
396 {
397 if (this->is_empty()) {
398 stringstream ss;
399 ss << "Attempt to call this method on an empty history chain for uuid="
400 << this->id() << endl
401 << "Cannot preserve history for writer=" << alg << " with id=" << algid
402 << endl;
403 elog.log_error("ProcessingHistory::map_as_saved", ss.str(),
404 ErrorSeverity::Complaint);
405 return current_id;
406 }
407 /* This is essentially pushing current data to the end of the history chain
408 but using a special id that may or may not be saved by the caller.
409 We use a fixed keyword defined in ProcessingHistory.h assuming saves
410 are always a one-to-one operation (definition of atomic really)*/
411 NodeData nd(this->current_nodedata());
412 pair<string, NodeData> pn(SAVED_ID_KEY, nd);
413 this->nodes.insert(pn);
414 /* Now we reset current to define it as the saver. Then calls to the
415 getters for the multimap will properly insert this data as the end of the
416 chain. Note a key difference from new_map is we don't create a new uuid.
417 I don't think that will cause an ambiguity, but it might be better to
418 just create a new one here - will do it this way unless that proves a problem
419 as the equality of the two might be a useful test for other purposes */
420 algorithm = alg;
421 algid = algid_in;
422 current_status = ProcessingStatus::SAVED;
423 current_id = SAVED_ID_KEY;
424 if (current_stage >= 0)
425 ++current_stage;
426 else {
427 elog.log_error(
428 "ProcessingHistory::map_as_saved",
429 "current_stage on entry had not been initialized\nImproper usage will "
430 "create an invalid history chain that may cause downstream problems",
431 ErrorSeverity::Complaint);
432 current_stage = 0;
433 }
434 mytype = typ;
435 return current_id;
436}
std::string id() const
Definition ProcessingHistory.h:579

References current_nodedata(), id(), is_empty(), and mspass::utility::ErrorLogger::log_error().

◆ merge()

void mspass::utility::ProcessingHistory::merge ( const ProcessingHistory data_to_add)

Merge the history nodes from another.

Parameters
data_to_addis the ProcessingHistory of the data object to be merged.
439 {
440
441 if (data_to_add.is_empty()) {
442 stringstream ss;
443 ss << "Data with uuid=" << data_to_add.id() << " has an empty history chain"
444 << endl
445 << "At best this will leave ProcessingHistory incomplete" << endl;
446 elog.log_error("ProcessingHistory::merge", ss.str(),
447 ErrorSeverity::Complaint);
448 } else {
449 multimap<string, NodeData>::iterator nptr;
450 multimap<string, NodeData> newhistory = data_to_add.get_nodes();
451 multimap<string, NodeData>::iterator nl, nu;
452 for (nptr = newhistory.begin(); nptr != newhistory.end(); ++nptr) {
453 string key(nptr->first);
454 /* if the data_to_add's key matches its current id,
455 we merge all the nodes under the current id of *this. */
456 if (key == data_to_add.current_id) {
457 this->nodes.insert(std::make_pair(this->current_id, nptr->second));
458 } else if (this->nodes.count(key) > 0) {
459 nl = this->nodes.lower_bound(key);
460 nu = this->nodes.upper_bound(key);
461 for (auto ptr = nl; ptr != nu; ++ptr) {
462 NodeData ndtest(ptr->second);
463 if (ndtest != (nptr->second)) {
464 this->nodes.insert(*nptr);
465 }
466 }
467 } else {
468 this->nodes.insert(*nptr);
469 }
470 }
471 }
472}

References get_nodes(), id(), is_empty(), and mspass::utility::ErrorLogger::log_error().

◆ new_ensemble_process()

string mspass::utility::ProcessingHistory::new_ensemble_process ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const std::vector< ProcessingHistory * >  parents,
const bool  create_newid = true 
)

Define history chain for an algorithm with multiple inputs in an ensemble.

Use this method to define the history chain for an algorithm that has multiple inputs for each output. Each output needs to call this method to build the connections that define how all inputs link to the the new data being created by the algorithm that calls this method. Use this method for map operators that have an ensemble object as input and a single data object as output. This method should be called in creation of the output object. If the algorthm builds multiple outputs to build an output ensemble call this method for each output before pushing it to the output ensemble container.

This method should not be used for a reduce operation in spark. It does not satisfy the associative rule for reduce. Use accumulate for reduce operations.

Normally, it makes sense to have the boolean create_newid true so it is guaranteed the current_id is unique. There is little cost in creating a new one if there is any doubt the current_id is not a duplicate. The false option is there only for rare cases where the current id value needs to be preserved.

Note the vector of data passed is raw pointers for efficiency to avoid excessive copying. For normal use this should not create memory leaks but make sure you don't try to free what the pointers point to or problems are guaranteed. It is VERY IMPORTANT to realize that all the pointers are presumed to point to the ProcessingHistory component of a set of larger data object (Seismogram or TimeSeries). The parents do not all have be a common type as if they have valid history data within them their current type will be defined.

This method ALWAYS marks the status as VOLATILE.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
create_newidis a boolean defining how the current id is handled. As described above, if true the method will call newid and set that as the current id of this data object. If false the current value is left intact.
Returns
a string representation of the uuid of the data to which this ProcessingHistory is now attached.
174 {
175 if (create_newid) {
176 this->newid();
177 }
178 /* We need to clear the tree contents because all the parents will
179 branch from this. Hence, we have to put the node data into an empty
180 container */
181 this->clear();
182 algorithm = alg;
183 algid = algid_in;
184 mytype = typ;
185 /* Initialize current stage but assume it will be updated as max of
186 parents below */
187 current_stage = 0;
188 multimap<string, NodeData>::const_iterator nptr, nl, nu;
189 size_t i;
190 /* current_stage can be ambiguous from multiple inputs. We define
191 the current stage from a reduce as the largest stage value found
192 in all inputs. Note we only test the stage value at the head for
193 each parent */
194 int max_stage(0);
195 for (i = 0; i < parents.size(); ++i) {
196 if (parents[i]->is_empty()) {
197 stringstream ss;
198 ss << "Vector member number " << i << " with uuid=" << parents[i]->id()
199 << " has an empty history chain" << endl
200 << "At best the processing history data will be incomplete" << endl;
201 elog.log_error("ProcessingHistory::new_ensemble_process", ss.str(),
202 ErrorSeverity::Complaint);
203 continue;
204 }
205 multimap<string, NodeData> parent_node_data(parents[i]->get_nodes());
206 /* We also have to get the head data with this method now */
207 NodeData nd = parents[i]->current_nodedata();
208 if (nd.stage > max_stage)
209 max_stage = nd.stage;
210 for (nptr = parent_node_data.begin(); nptr != parent_node_data.end();
211 ++nptr) {
212 /*Adding to nodes multimap has a complication. It is possible in
213 some situations to have duplicate node data coming from different
214 inputs. The method we use to reconstruct the processing history tree
215 will be confused by such duplicates so we need to test for pure
216 duplicates in NodeData values. This algorithm would not scale well
217 if the number of values with a common key is large for either
218 this or parent[i]*/
219 string key(nptr->first);
220 if (this->nodes.count(key) > 0) {
221 nl = this->nodes.lower_bound(key);
222 nu = this->nodes.upper_bound(key);
223 for (auto ptr = nl; ptr != nu; ++ptr) {
224 NodeData ndtest(ptr->second);
225 if (ndtest != (nptr->second)) {
226 this->nodes.insert(*nptr);
227 }
228 }
229 } else {
230 /* No problem just inserting a node if there were no previous
231 entries*/
232 this->nodes.insert(*nptr);
233 }
234 }
235 /* Also insert the head data */
236 pair<string, NodeData> pnd(current_id, nd);
237 this->nodes.insert(pnd);
238 }
239 current_stage = max_stage;
240 /* Now reset the current contents to make it the base of the history tree.
241 Be careful of uninitialized current_stage*/
242 if (current_stage >= 0)
243 ++current_stage;
244 else {
245 elog.log_error("ProcessingHistory::new_ensemble_process",
246 "current_stage for none of the parents was "
247 "initialized\nImproper usage will create an invalid history "
248 "chain that may cause downstream problems",
249 ErrorSeverity::Complaint);
250 current_stage = 0;
251 }
252 algorithm = alg;
253 algid = algid_in;
254 // note this is output type - inputs can be variable and defined by nodes
255 mytype = typ;
256 current_status = ProcessingStatus::VOLATILE;
257 return current_id;
258}
void clear()
Definition ProcessingHistory.cc:631
std::multimap< std::string, mspass::utility::NodeData > get_nodes() const
Definition ProcessingHistory.cc:612

References clear(), get_nodes(), is_empty(), mspass::utility::ErrorLogger::log_error(), newid(), and mspass::utility::NodeData::stage.

◆ new_map() [1/2]

std::string mspass::utility::ProcessingHistory::new_map ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingHistory data_to_clone,
const ProcessingStatus  newstatus = ProcessingStatus::VOLATILE 
)

Define this algorithm as a one-to-one map.

Many algorithms define a one-to-one map where each one input data object creates one output data object. This class allows the input and output to be different data types requiring only that one input will map to one output. It differs from the overloaded method with fewer arguments in that it should be used if you need to clear and refresh the history chain for any reason. Known examples are creating simulation waveforms for testing within a workflow that have no prior history data loaded but which clone some properties of another piece of data. This method should be used in any situation where the history chain in the current data is wrong but the contents are the linked to some other process chain. It is supplied to cover odd cases, but use will likely be rare.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
data_to_cloneis reference to the ProcessingHistory section of a parent data object that should be used to override the existing history chain.
newstatusis how the status marking for the output. Normal (default) would be VOLATILE. This argument was included mainly for flexibility in case we wanted to extend the allowed entries in ProcessingStatus.

◆ new_map() [2/2]

std::string mspass::utility::ProcessingHistory::new_map ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingStatus  newstatus = ProcessingStatus::VOLATILE 
)

Define this algorithm as a one-to-one map of same type data.

Many algorithms define a one-to-one map where each one input data object creates one output data object. This (overloaded) version of this method is most appropriate when input and output are the same type and the history chain (ProcessingHistory) is what the new algorithm will alter to make the result when it finishes. Use the overloaded version with a separate ProcessingHistory copy if the current object's data are not correct. In this algorithm the chain for this algorithm is simply appended with new definitions.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
newstatusis how the status marking for the output. Normal (default) would be VOLATILE. This argument was included mainly for flexibility in case we wanted to extend the allowed entries in ProcessingStatus.

◆ newid()

string mspass::utility::ProcessingHistory::newid ( )

Create a new id.

This creates a new uuid - how is an implementation detail but here we use boost's random number generator uuid generator that has some absurdly small probability of generating two equal ids. It returns the string representation of the id created.

651 {
652 boost::uuids::random_generator gen;
653 boost::uuids::uuid uuidval;
654 uuidval = gen();
655 this->current_id = boost::uuids::to_string(uuidval);
656 return current_id;
657}

◆ number_inputs() [1/2]

int mspass::utility::ProcessingHistory::number_inputs ( ) const

Return the number of inputs used to create current data.

In a number of contexts it can be useful to know the number of inputs defined for the current object. This returns that count.

648 {
649 return this->number_inputs(current_id);
650}
int number_inputs() const
Definition ProcessingHistory.cc:648

References number_inputs().

◆ number_inputs() [2/2]

int mspass::utility::ProcessingHistory::number_inputs ( const std::string  uuidstr) const

Return the number of inputs defined for any data in the process chain.

This overloaded version of number_inputs asks for the number of inputs defined for an arbitrary uuid. This is useful only if backtracing the ancestory of a child.

Parameters
uuidstris the uuid string to check in the ancestory record.

◆ number_of_stages()

size_t mspass::utility::ProcessingHistory::number_of_stages ( )
overridevirtual

Return number of processing stages that have been applied to this object.

One might want to know how many processing steps have been previously applied to produce the current data. For linear algorithms that would be useful only in debugging, but for an iterative algorithm it can be essential to avoid infinite loops with a loop limit parameter. This method returns how many times something has been done to alter the associated data. It returns 0 if the data are raw.

Important note is that the number return is the number of processing steps since the last save. Because a save operation is assumed to save the history chain then flush it there is not easy way at present to keep track of the total number of stages. If we really need this functionality it could be easily retrofitted with another private variable that is not reset when the clear method is called.

Reimplemented from mspass::utility::BasicProcessingHistory.

139{ return current_stage; }

◆ operator=()

ProcessingHistory & mspass::utility::ProcessingHistory::operator= ( const ProcessingHistory parent)

Assignment operator.

687 {
688 if (this != (&parent)) {
689 this->BasicProcessingHistory::operator=(parent);
690 nodes = parent.nodes;
691 current_status = parent.current_status;
692 current_id = parent.current_id;
693 current_stage = parent.current_stage;
694 mytype = parent.mytype;
695 algorithm = parent.algorithm;
696 algid = parent.algid;
697 elog = parent.elog;
698 }
699 return *this;
700}

◆ set_as_origin()

void mspass::utility::ProcessingHistory::set_as_origin ( const std::string  alg,
const std::string  algid,
const std::string  uuid,
const AtomicType  typ,
bool  define_as_raw = false 
)

Set to define this as the top origin of a history chain.

This method should be called when a new object is created to initialize the history as an origin. Note again an origin may be raw but not all origins are define as raw. This interface controls that through the boolean define_as_raw (false by default). python wrappers should define an alternate set_as_raw method that calls this method with define_as_raw set true.

It is VERY IMPORTANT to realize that the uuid argument passed to this method is if fundamental importance. That string is assumed to be a uuid that can be linked to either a parent data object read from storage and/or linked to a history chain saved by a prior run. It becomes the current_id for the data to which this object is a parent. This method also always does two things that define how the contents can be used. current_stage is ALWAYS set 0. We distinguish a pure origin from an intermediate save ONLY by the status value saved in the history chain. That is, only uuids with status set to RAW are viewed as guaranteed to be stored. A record marked ORIGIN is assumed to passed through save operation. To retrieve the history chain from multiple runs the pieces have to be pieced together by history data stored in MongoDB.

The contents of the history data structures should be empty when this method is called. That would be the norm for any constructor except those that make a deep copy. If unsure the clear method should be called before this method is called. If it isn't empty it will be cleared anyway and a complaint message will be posted to elog.

Parameters
algis the algorithm names to assign to the origin node. This would normally be a reader name, but it could be a synthetic generator.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled.
uuidunique if for this data object (see note above)
typdefines the data type (C++ class) "this" points to. It might be possible to determine this dynamically, but a design choice was to only allow registered classes through this mechanism. i.e. the enum class typ implements has a finite number of C++ classes it accepts. The type must be a child ProcessingHistory.
define_as_rawsets status as RAW if true and ORIGIN otherwise.
Exceptions
Neverthrows an exception BUT this method will post a complaint to elog if the history data structures are not empty and it the clear method needs to be called internally.
151 {
152 const string base_error("ProcessingHistory::set_as_origin: ");
153 if (nodes.size() > 0) {
154 elog.log_error(alg + ":" + algid_in,
155 base_error + "Illegal usage. History chain was not empty. "
156 " Calling clear method and continuing",
157 ErrorSeverity::Complaint);
158 this->clear();
159 }
160 if (define_as_raw) {
161 current_status = ProcessingStatus::RAW;
162 } else {
163 current_status = ProcessingStatus::ORIGIN;
164 }
165 algorithm = alg;
166 algid = algid_in;
167 current_id = uuid;
168 mytype = typ;
169 /* Origin/raw are always defined as stage 0 even after a save. */
170 current_stage = 0;
171}

References clear(), and mspass::utility::ErrorLogger::log_error().

◆ set_id()

void mspass::utility::ProcessingHistory::set_id ( const std::string  newid)

Set the uuid manually.

It may occasionally be necessary to create a uuid by some other mechanism. This allows that, but this method should be used with caution and only if you understand the consequences.

Parameters
newidis string definition to use for the id.
658{ this->current_id = newid; }

References newid().

◆ stage()

int mspass::utility::ProcessingHistory::stage ( ) const
inline

Return the current stage count for this object.

We maintain a counter of the number of processing steps that have been applied to produce this data object. This simple method returns that counter. With this implementation this is identical to number_of_stages. We retain it in the API in the event we want to implement an accumulating counter.

571{ return current_stage; };

◆ status()

ProcessingStatus mspass::utility::ProcessingHistory::status ( ) const
inline

Return the current status definition (an enum).

573{ return current_status; };

The documentation for this class was generated from the following files: