version  0.0.1
Defines the C++ API for MsPASS
Public Member Functions | Public Attributes | Protected Attributes | List of all members
mspass::utility::ProcessingHistory Class Reference

Lightweight class to preserve procesing chain of atomic objects. More...

#include <ProcessingHistory.h>

Inheritance diagram for mspass::utility::ProcessingHistory:
mspass::utility::BasicProcessingHistory mspass::seismic::Seismogram mspass::seismic::TimeSeries mspass::seismic::SeismogramWGaps mspass::seismic::TimeSeriesWGaps

Public Member Functions

 ProcessingHistory ()
 
 ProcessingHistory (const std::string jobnm, const std::string jid)
 
 ProcessingHistory (const ProcessingHistory &parent)
 
bool is_empty () const
 
bool is_raw () const
 
bool is_origin () const
 
bool is_volatile () const
 
bool is_saved () const
 
size_t number_of_stages () override
 Return number of processing stages that have been applied to this object. More...
 
void set_as_origin (const std::string alg, const std::string algid, const std::string uuid, const AtomicType typ, bool define_as_raw=false)
 
std::string new_ensemble_process (const std::string alg, const std::string algid, const AtomicType typ, const std::vector< ProcessingHistory * > parents, const bool create_newid=true)
 
void add_one_input (const ProcessingHistory &data_to_add)
 Add one datum as an input for current data. More...
 
void add_many_inputs (const std::vector< ProcessingHistory * > &d)
 Define several data objects as inputs. More...
 
void merge (const ProcessingHistory &data_to_add)
 Merge the history nodes from another. More...
 
void accumulate (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingHistory &newinput)
 Method to use with a spark reduce algorithm. More...
 
std::string clean_accumulate_uuids ()
 Clean up inconsistent uuids that can be produced by reduce. More...
 
std::string new_map (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingStatus newstatus=ProcessingStatus::VOLATILE)
 Define this algorithm as a one-to-one map of same type data. More...
 
std::string new_map (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingHistory &data_to_clone, const ProcessingStatus newstatus=ProcessingStatus::VOLATILE)
 Define this algorithm as a one-to-one map. More...
 
std::string map_as_saved (const std::string alg, const std::string algid, const AtomicType typ)
 Prepare the current data for saving. More...
 
void clear ()
 
std::multimap< std::string, mspass::utility::NodeDataget_nodes () const
 
int stage () const
 
ProcessingStatus status () const
 
std::string id () const
 
std::pair< std::string, std::string > created_by () const
 
NodeData current_nodedata () const
 
std::string newid ()
 
int number_inputs () const
 
int number_inputs (const std::string uuidstr) const
 
void set_id (const std::string newid)
 
std::list< mspass::utility::NodeDatainputs (const std::string id_to_find) const
 Return a list of data that define the inputs to a give uuids. More...
 
ProcessingHistoryoperator= (const ProcessingHistory &parent)
 
- Public Member Functions inherited from mspass::utility::BasicProcessingHistory
 BasicProcessingHistory (const std::string jobname, const std::string jobid)
 
 BasicProcessingHistory (const BasicProcessingHistory &parent)
 
std::string jobid () const
 
void set_jobid (const std::string &newjid)
 
std::string jobname () const
 
void set_jobname (const std::string jobname)
 
BasicProcessingHistoryoperator= (const BasicProcessingHistory &parent)
 

Public Attributes

ErrorLogger elog
 

Protected Attributes

std::multimap< std::string, mspass::utility::NodeDatanodes
 
- Protected Attributes inherited from mspass::utility::BasicProcessingHistory
std::string jid
 
std::string jnm
 

Detailed Description

Lightweight class to preserve procesing chain of atomic objects.

This class is intended to be used as a parent for any data object in MsPASS that should be considered atomic. It is designed to completely preserve the chain of processing algorithms applied to any atomic data to put it in it's current state. It is designed to save that information during processing with the core information that can then be saved to define the state. Writers for atomic objects inheriting this class should arrange to save the data contained in it to history collection in MongoDB. Note that actually doing the inverse is a different problem that are expected to be implemented as extesions of this class to be used in special programs used to reconstrut a data workflow and the processing chain applied to produce any final output.

The design was complicated by the need to keep the history data from causing memory bloat. A careless implementation could be prone to that problem even for modest chains, but we were particularly worried about iterative algorithms that could conceivably multiply the size of out of control. There was also the fundamental problem of dealing with transient versus data stored in longer term storage instead of just in memory. Our implementation was simplified by using the concept of a unique id with a Universal Unique IDentifier. (UUID) Our history mechanism assumes each data object has a uuid assigned to it on creation by an implementation id of the one object this particular record is associated with on dependent mechanism. That is, whenever a new object is created in MsPASS using the history feature one of these records will be created for each data object that is defined as atomic. This string defines unique key for the object it could be connected to with the this pointer. The parents of the current object are defined by the inputs data structure below.

In the current implementation id is string representation of a uuid maintained by each atomic object. We use a string to maximize flexibility at a minor cost for storage.

Names used imply the following concepts: raw - means the data is new input to mspass (raw data from data center, field experiment, or simulation). That tag means no prior history can be reconstructed. origin - top-level ancestor of current data. The top of a processing chain is always tagged as an origin. A top level can also be "raw" but not necessarily. In particular, readers that load partially processed data should mark the data read as an origin, but not raw. stage - all processed data objects that are volatile elements within a workflow are defined as a stage. They are presumed to leave their existence known only through ancestory preserved in the processing chain. A stage becomes a potential root only when it is saved by a writer where the writer will mark that position as a save. Considered calling this a branch, but that doesn't capture the concept right since we require this mechanism to correctly perserve splits into multiple outputs. We preserve that cleanly for each data object. That is, the implementation make it easy to reconstruct the history of a single final data object, but reconstructing interlinks between objects in an overall processing flow will be a challenge. That was a necessary compomise to avoid memory bloat. The history is properly viewed as a tree branching from a single root (the final output) to leaves that define all it's parents.

The concepts of raw, origin, and stage are implemented with the enum class defined above called ProcessingStatus. Each history record has that as an attribute, but each call to new_stage updates a copy kept inside this object to simplify the python wrappers.

Constructor & Destructor Documentation

◆ ProcessingHistory() [1/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( )

Default constructor.

86  :elog()
87 {
88  current_status=ProcessingStatus::UNDEFINED;
89  current_id="UNDEFINED";
90  current_stage=-1; //illegal value that could be used as signal for uninitalized
91  mytype=AtomicType::UNDEFINED;
92  algorithm="UNDEFINED";
93  algid="UNDEFINED";
94 }

◆ ProcessingHistory() [2/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( const std::string  jobnm,
const std::string  jid 
)

Construct and fill in BasicProcessingHistory job attributes.

Parameters
jobnm- set as jobname
jid- set as jobid

◆ ProcessingHistory() [3/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( const ProcessingHistory parent)

Standard copy constructor.

106  : BasicProcessingHistory(parent),elog(parent.elog),nodes(parent.nodes),
107  algorithm(parent.algorithm),algid(parent.algid)
108 {
109  current_status=parent.current_status;
110  current_id=parent.current_id;
111  current_stage=parent.current_stage;
112  mytype=parent.mytype;
113 }

Member Function Documentation

◆ accumulate()

void mspass::utility::ProcessingHistory::accumulate ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingHistory newinput 
)

Method to use with a spark reduce algorithm.

A reduce operator in spark utilizes a binary function where two inputs are used to generate a single output object. Because the inputs could be scattered on multiple processor nodes this operation must be associative. The new_ensemble_process method does not satisfy that constraint so this method was necessary to handle that type of algorithm correctly.

The way this algorithm works is it fundamentally branches on two different cases: (1) initialization, which is detected by testing if the node data map is empty or (2) secondary calls. This should work even if multiple inputs are combined at the end of the reduce operation because the copies being merged will not be empty. Note an empty input will create a complaint entry in the error log.

511 {
512  ProcessingHistory newinput(ni);
513  if((newinput.algorithm != algin) || (newinput.algid != algidin)
514  || (newinput.jid != newinput.jobid()) || (newinput.jnm != newinput.jobname()))
515  {
516  NodeData nd;
517  nd=newinput.current_nodedata();
518  newinput.newid();
519  pair<string,NodeData> pn(newinput.current_id,nd);
520  newinput.nodes.insert(pn);
521  newinput.jid=newinput.jobid();
522  newinput.jnm=newinput.jobname();
523  newinput.algorithm=algin;
524  newinput.algid=algidin;
525  newinput.current_status=ProcessingStatus::VOLATILE;
526  newinput.current_stage=nd.stage+1;
527  newinput.mytype=typ;
528  }
529  /* We have to detect an initialization condition without losing the
530  stored history. There are two conditions we need to handle. First,
531  if we create an empty container to hold the accmulator and put it on the
532  left hand side we will want to clear the history chain or we will
533  accumulate random junk. The second condition is if we accumulate in
534  a way were the left hand side is some existing data where we do want to
535  preserve the history. For the is_empty logic: we just copy the
536  newinput's history and add make its current node data the connection
537  backward - i.e. we have to make a new uuid and add an entry. */
538  if(this->is_empty())
539  {
540  this->newid();
541  nodes=ni.get_nodes();
542  NodeData nd;
543  nd=ni.current_nodedata();
544  pair<string,NodeData> pn(current_id,nd);
545  this->nodes.insert(pn);
546  this->set_jobid(ni.jobid());
547  this->set_jobname(ni.jobname());
548  algorithm=algin;
549  algid=algidin;
550  current_status=ProcessingStatus::VOLATILE;
551  current_stage=nd.stage+1;
552  mytype=typ;
553  }
554  /* This is the condition for a left hand side that is not empty but not
555  yet initialized. We detect this condition by a mismatch in all the unique
556  names and ids that mark the current process define this reduce operation*/
557  else if((this->algorithm != algin) || (this->algid != algidin)
558  || (this->jid != newinput.jobid()) || (this->jnm != newinput.jobname()))
559  {
560  /* This is similar to the block above, but the key difference here is we
561  have to push this's history data to convert it's current data to define an input.
562  That means getting a new uuid and pushing current node data to the nodes map
563  as an input */
564  NodeData nd;
565  nd=this->current_nodedata();
566  this->newid();
567  pair<string,NodeData> pn(current_id,nd);
568  this->nodes.insert(pn);
569  this->jid=newinput.jobid();
570  this->jnm=newinput.jobname();
571  this->algorithm=algin;
572  this->algid=algidin;
573  this->current_status=ProcessingStatus::VOLATILE;
574  this->current_stage=nd.stage+1;
575  this->mytype=typ;
576  this->merge(newinput);
577  }
578  else
579  {
580  this->merge(newinput);
581  }
582 }
NodeData current_nodedata() const
Definition: ProcessingHistory.cc:712
void merge(const ProcessingHistory &data_to_add)
Merge the history nodes from another.
Definition: ProcessingHistory.cc:463
ProcessingHistory()
Definition: ProcessingHistory.cc:86
bool is_empty() const
Definition: ProcessingHistory.cc:114
std::string newid()
Definition: ProcessingHistory.cc:700

References current_nodedata(), and newid().

◆ add_many_inputs()

void mspass::utility::ProcessingHistory::add_many_inputs ( const std::vector< ProcessingHistory * > &  d)

Define several data objects as inputs.

This method acts like add_one_input in that it alters only the inputs chain. In fact it is nothing more than a loop over the components of the vector calling add_one_input for each component.

Parameters
dis the vector of data to define as inputs
335 {
336  vector<ProcessingHistory*>::const_iterator dptr;
337  for(dptr=d.begin();dptr!=d.end();++dptr)
338  {
339  ProcessingHistory *ptr;
340  ptr=(*dptr);
341  this->add_one_input(*ptr);
342  }
343 }
void add_one_input(const ProcessingHistory &data_to_add)
Add one datum as an input for current data.
Definition: ProcessingHistory.cc:285

References add_one_input().

◆ add_one_input()

void mspass::utility::ProcessingHistory::add_one_input ( const ProcessingHistory data_to_add)

Add one datum as an input for current data.

This method MUST ONLY be called after a call to new_ensemble_process in the situation were additional inputs need to be defined that were not available at the time new_ensemble_process was called. An example might be a stack that was created within the scope of "algorithm" and then used in some way to create the output data. In any case it differs fundamentally from new_ensemble_process in that it does not touch attributes that define the current state of "this". It simply says this is another input to the data "this" contains.

Parameters
data_to_addis the ProcessingHistory of the data object to be defined as input. Note the type of the data to which it is linked will be saved as the base of the input chain from data_to_add. It can be different from the type of "this".
286 {
287 
288  if(data_to_add.is_empty())
289  {
290  stringstream ss;
291  ss<<"Data with uuid="<<data_to_add.id()<<" has an empty history chain"<<endl
292  << "At best this will leave ProcessingHistory incomplete"<<endl;
293  elog.log_error("ProcessingHistory::add_one_input",ss.str(),
294  ErrorSeverity::Complaint);
295  }
296  else
297  {
298  multimap<string,NodeData>::iterator nptr;
299  multimap<string,NodeData> newhistory = data_to_add.get_nodes();
300  multimap<string,NodeData>::iterator nl,nu;
301  /* As above this one needs check for duplicates and only add
302  a node if the data are unique. This is simple compared to new_ensemble_process
303  because we just have to check one object's history at a time. */
304  for(nptr=newhistory.begin();nptr!=newhistory.end();++nptr)
305  {
306  string key(nptr->first);
307  if(this->nodes.count(key)>0)
308  {
309  nl=this->nodes.lower_bound(key);
310  nu=this->nodes.upper_bound(key);
311  for(auto ptr=nl;ptr!=nu;++ptr)
312  {
313  NodeData ndtest(ptr->second);
314  if(ndtest != (nptr->second))
315  {
316  this->nodes.insert(*nptr);
317  }
318  }
319  }
320  else
321  {
322  this->nodes.insert(*nptr);
323  }
324  }
325  /* Don't forget head node data*/
326  NodeData nd=data_to_add.current_nodedata();
327  NodeData ndhere=this->current_nodedata();
328  pair<string,NodeData> pnd(current_id,nd);
329  this->nodes.insert(pnd);
330  }
331 }
int log_error(const mspass::utility::MsPASSError &merr)
Definition: ErrorLogger.cc:81

References id(), is_empty(), and mspass::utility::ErrorLogger::log_error().

◆ clean_accumulate_uuids()

string mspass::utility::ProcessingHistory::clean_accumulate_uuids ( )

Clean up inconsistent uuids that can be produced by reduce.

In a spark reduce operation it is possible to create multiple uuid keys for inputs to the same algorithm instance. That happpens because the mechanism used by ProcessingHistory to define the process history tree is not associative. When a reduce gets sprayed across multiple nodes multiple initializations can occur that make artifical inconsitent uuids. This method should normally be called after a reduce operator if history is being preserved or the history chain may be foobarred - no invalid just mess up with extra branches in the processing tree.

A VERY IMPORTANT limitation of the algorithm used by this method is that the combination of algorithm and algid in "this" MUST be unique for a given job run when a reduce is called. i.e. if an earlier workflow had used alg and algid but with a different jobid and jobname the distintion cannot be detected with this algorithm. This means our global history handling must guarantee algid is unique for each run.

Returns
unique uuid for alg,algid match set in the history chain. Note if there are no duplicates it simply returns the only one it finds. If there are duplicates it returns the lexically smallest (first in alphabetic order) uuid. Most importantly if there is no match or if history is empty it returns the string UNDEFINED.
585 {
586  /* Return undefined immediately if the history chain is empty */
587  if(this->is_empty()) return string("UNDEFINED");
588  NodeData ndthis=this->current_nodedata();
589  string alg(ndthis.algorithm);
590  string algidtest(ndthis.algid);
591  /* The algorithm here finds all entries for which algorithm is alg and
592  algid matches aldid. We build a list of uuids (keys) linked to that unique
593  algorithm. We then use the id in ndthis as the master*/
594  set<string> matching_ids;
595  matching_ids.insert(ndthis.uuid);
596  /* this approach of pushing iterators to this list that match seemed to
597  be the only way I could make this work correctly. Not sure why, but
598  the added cost over handling this correctly in the loops is small. */
599  std::list<multimap<string,NodeData>::iterator> need_to_erase;
600  for(auto nptr=this->nodes.begin();nptr!=this->nodes.end();++nptr)
601  {
602  /* this copy operation is somewhat inefficient, but the cost is small
603  compared to how obscure the code will look if we directly manipulate the
604  second value */
605  NodeData nd(nptr->second);
606  /* this depends upon the distinction between set and multiset. i.e. an insert
607  of a duplicate does nothing*/
608  if((alg==nd.algorithm) && (algidtest==nd.algid))
609  {
610  matching_ids.insert(nd.uuid);
611  need_to_erase.push_back(nptr);
612  }
613  }
614  // handle no match situation gracefully
615  if(matching_ids.empty())
616  return string("UNDEFINED");
617  /* Nothing more to do but return the uuid if there is only one*/
618  if(matching_ids.size()==1)
619  return *(matching_ids.begin());
620  else
621  {
622  for(auto sptr=need_to_erase.begin();sptr!=need_to_erase.end();++sptr)
623  {
624  nodes.erase(*sptr);
625  }
626  need_to_erase.clear();
627  }
628  /* Here is the complicated case. We use the uuid from ndthis as the master
629  and change all the others. This operation works ONLY because in a multimap
630  erase only invalidates the iterator it points to and others remain valid.
631  */
632  string master_uuid=ndthis.uuid;
633  for(auto sptr=matching_ids.begin();sptr!=matching_ids.end();++sptr)
634  {
635  /* Note this test is necessary to stip the master_uuid - no else needed*/
636  if((*sptr)!=master_uuid)
637  {
638  multimap<string,NodeData>::iterator nl,nu;
639  nl=this->nodes.lower_bound(*sptr);
640  nu=this->nodes.upper_bound(*sptr);
641  for(auto nptr=nl;nptr!=nu;++nptr)
642  {
643  NodeData nd;
644  nd=(nptr->second);
645  need_to_erase.push_back(nptr);
646  nodes.insert(pair<string,NodeData>(master_uuid,nd));
647  }
648  }
649  }
650  for(auto sptr=need_to_erase.begin();sptr!=need_to_erase.end();++sptr)
651  {
652  nodes.erase(*sptr);
653  }
654 
655  return master_uuid;
656 }

References mspass::utility::NodeData::algid, mspass::utility::NodeData::algorithm, current_nodedata(), is_empty(), and mspass::utility::NodeData::uuid.

◆ clear()

void mspass::utility::ProcessingHistory::clear ( )

Clear this history chain - use with caution.

678 {
679  nodes.clear();
680  current_status=ProcessingStatus::UNDEFINED;
681  current_stage=0;
682  mytype=AtomicType::UNDEFINED;
683  algorithm="UNDEFINED";
684  algid="UNDEFINED";
685 }

◆ created_by()

std::pair<std::string,std::string> mspass::utility::ProcessingHistory::created_by ( ) const
inline

Return the algorithm name and id that created current node.

615  {
616  std::pair<std::string,std::string> result(algorithm,algid);
617  return result;
618  }

◆ current_nodedata()

NodeData mspass::utility::ProcessingHistory::current_nodedata ( ) const

Return all the attributes of current.

This is a convenience method strictly for the C++ interface (it too nonpythonic to be useful to wrap for python). It returns a NodeData class containing the attributes of the head of the chain. Like the getters above that is needed to save that data.

713 {
714  NodeData nd;
715  nd.status=current_status;
716  nd.uuid=current_id;
717  nd.type=mytype;
718  nd.stage=current_stage;
719  nd.algorithm=algorithm;
720  nd.algid=algid;
721  return nd;
722 }

References mspass::utility::NodeData::algid, mspass::utility::NodeData::algorithm, mspass::utility::NodeData::stage, mspass::utility::NodeData::status, mspass::utility::NodeData::type, and mspass::utility::NodeData::uuid.

◆ get_nodes()

multimap< string, NodeData > mspass::utility::ProcessingHistory::get_nodes ( ) const

Retrieve the nodes multimap that defines the tree stucture branches.

This method does more than just get the protected multimap called nodes. It copies the map and then pushes the "current" contents to the map before returning the copy. This allows the data defines as current to not be pushed into the tree until they are needed.

658 {
659  /* Return empty map if it has no data - necessary or the logic
660  below will insert an empty head to the chain. */
661  if(this->is_empty())
662  return nodes; // a way to return an empty container
663  /* This is wrong, I think, but retained to test before removing.
664  remove this once current idea is confirmed. Note if that
665  proves true we can also remove the two lines above as they do
666  nothing useful*/
667  /*
668  NodeData nd;
669  nd=this->current_nodedata();
670  pair<string,NodeData> pn(current_id,nd);
671  multimap<string,NodeData> result(this->nodes);
672  result.insert(pn);
673  return result;
674  */
675  return nodes;
676 }

References is_empty().

◆ id()

std::string mspass::utility::ProcessingHistory::id ( ) const
inline

Return the id of this object set for this history chain.

We maintain the uuid for a data object inside this class. This method fetches the string representation of the uuid of this data object.

610  {
611  return current_id;
612  };

◆ inputs()

list< NodeData > mspass::utility::ProcessingHistory::inputs ( const std::string  id_to_find) const

Return a list of data that define the inputs to a give uuids.

This low level getter returns the NodeData objects that define the inputs to the uuid of some piece of data that was used as input at some stage for the current object.

Parameters
id_to_findis the uuid for which input data is desired.
Returns
list of NodeData that define the inputs. Will silently return empty list if the key is not found.
725 {
726  list<NodeData> result;
727  // Return empty list immediately if key not found
728  if(nodes.count(id_to_find)<=0) return result;
729  /* Note these have to be const_iterators because method is tagged const*/
730  multimap<string,NodeData>::const_iterator upper,lower;
731  lower=nodes.lower_bound(id_to_find);
732  upper=nodes.upper_bound(id_to_find);
733  multimap<string,NodeData>::const_iterator mptr;
734  for(mptr=lower;mptr!=upper;++mptr)
735  {
736  result.push_back(mptr->second);
737  }
738  return result;
739 };

◆ is_empty()

bool mspass::utility::ProcessingHistory::is_empty ( ) const

Return true if the processing chain is empty.

This method provides a standard test for an invalid, empty processing chain. Constructors except the copy constructor will all put this object in an invalid state that will cause this method to return true. Only if the chain is initialized properly with a call to set_as_origin will this method return a false.

115 {
116  if( (current_status==ProcessingStatus::UNDEFINED)
117  && (nodes.empty()) )return true;
118  return false;
119 }

◆ is_origin()

bool mspass::utility::ProcessingHistory::is_origin ( ) const

Return true if the current data is in state defined as "origin" - see class description

128 {
129  if(current_status==ProcessingStatus::RAW || current_status==ProcessingStatus::ORIGIN)
130  return true;
131  else
132  return false;
133 }

◆ is_raw()

bool mspass::utility::ProcessingHistory::is_raw ( ) const

Return true if the current data is in state defined as "raw" - see class description

121 {
122  if(current_status==ProcessingStatus::RAW)
123  return true;
124  else
125  return false;
126 }

◆ is_saved()

bool mspass::utility::ProcessingHistory::is_saved ( ) const

Return true if the current data is in state defined as "saved" - see class description

142 {
143  if(current_status==ProcessingStatus::SAVED)
144  return true;
145  else
146  return false;
147 }

◆ is_volatile()

bool mspass::utility::ProcessingHistory::is_volatile ( ) const

Return true if the current data is in state defined as "volatile" - see class description

135 {
136  if(current_status==ProcessingStatus::VOLATILE)
137  return true;
138  else
139  return false;
140 }

◆ map_as_saved()

string mspass::utility::ProcessingHistory::map_as_saved ( const std::string  alg,
const std::string  algid,
const AtomicType  typ 
)

Prepare the current data for saving.

Saving data is treated as a special form of map operation. That is because a save by our definition is always a one-to-one operation with an index entry for each atomic object. This method pushes a new entry in the history chain tagged by the algorithm/algid field for the writer. It differs from new_map in the important sense that the uuid is not changed. The record this sets in the nodes multimap will then have the same uuid for the key as the that in NodeData. That along with the status set SAVED can be used downstream to recognize save records.

It is VERY IMPORTANT for use of this method to realize this method saves nothing. It only preps the history chain data so calls that follow will retrieve the right information to reconstruct the full history chain. Writers should follow this sequence:

  1. call map_as_saved with the writer name for algorithm definition
  2. save the data and history chain to MongoDB.
  3. be sure you have a copy of the uuid string of the data just saved and call the clear method.
  4. call the set_as_origin method using the uuid saved with the algorithm/id the same as used for earlier call to map_as_saved. This makes the put ProcessingHistory in a state identical to that produced by a reader.
Parameters
algis the algorithm names to assign to the ouput. This would normally be name defining the writer.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups. Note one model to distinguish records of actual save and redefinition of the data as an origin (see above) is to use a different id for the call to map_as_saved and later call to set_as_origin. This code doesn't care, but that is an implementation detail in how this will work with MongoDB.
typdefines the data type (C++ class) that was just saved.
422 {
423  if(this->is_empty())
424  {
425  stringstream ss;
426  ss << "Attempt to call this method on an empty history chain for uuid="
427  << this->id()<<endl
428  << "Cannot preserve history for writer="<<alg<<" with id="<<algid<<endl;
429  elog.log_error("ProcessingHistory::map_as_saved",ss.str(),ErrorSeverity::Complaint);
430  return current_id;
431  }
432  /* This is essentially pushing current data to the end of the history chain
433  but using a special id that may or may not be saved by the caller.
434  We use a fixed keyword defined in ProcessingHistory.h assuming saves
435  are always a one-to-one operation (definition of atomic really)*/
436  NodeData nd(this->current_nodedata());
437  pair<string,NodeData> pn(SAVED_ID_KEY,nd);
438  this->nodes.insert(pn);
439  /* Now we reset current to define it as the saver. Then calls to the
440  getters for the multimap will properly insert this data as the end of the
441  chain. Note a key difference from new_map is we don't create a new uuid.
442  I don't think that will cause an ambiguity, but it might be better to
443  just create a new one here - will do it this way unless that proves a problem
444  as the equality of the two might be a useful test for other purposes */
445  algorithm=alg;
446  algid=algid_in;
447  current_status=ProcessingStatus::SAVED;
448  current_id=SAVED_ID_KEY;
449  if(current_stage>=0)
450  ++current_stage;
451  else
452  {
453  elog.log_error("ProcessingHistory::map_as_saved",
454  "current_stage on entry had not been initialized\nImproper usage will create an invalid history chain that may cause downstream problems",
455  ErrorSeverity::Complaint);
456  current_stage=0;
457  }
458  mytype=typ;
459  return current_id;
460 }
std::string id() const
Definition: ProcessingHistory.h:609

References id(), is_empty(), and mspass::utility::ErrorLogger::log_error().

◆ merge()

void mspass::utility::ProcessingHistory::merge ( const ProcessingHistory data_to_add)

Merge the history nodes from another.

Parameters
data_to_addis the ProcessingHistory of the data object to be merged.
464 {
465 
466  if(data_to_add.is_empty())
467  {
468  stringstream ss;
469  ss<<"Data with uuid="<<data_to_add.id()<<" has an empty history chain"<<endl
470  << "At best this will leave ProcessingHistory incomplete"<<endl;
471  elog.log_error("ProcessingHistory::merge",ss.str(),
472  ErrorSeverity::Complaint);
473  }
474  else
475  {
476  multimap<string,NodeData>::iterator nptr;
477  multimap<string,NodeData> newhistory = data_to_add.get_nodes();
478  multimap<string,NodeData>::iterator nl,nu;
479  for(nptr=newhistory.begin();nptr!=newhistory.end();++nptr)
480  {
481  string key(nptr->first);
482  /* if the data_to_add's key matches its current id,
483  we merge all the nodes under the current id of *this. */
484  if(key == data_to_add.current_id)
485  {
486  this->nodes.insert(std::make_pair(this->current_id, nptr->second));
487  }
488  else if(this->nodes.count(key)>0)
489  {
490  nl=this->nodes.lower_bound(key);
491  nu=this->nodes.upper_bound(key);
492  for(auto ptr=nl;ptr!=nu;++ptr)
493  {
494  NodeData ndtest(ptr->second);
495  if(ndtest != (nptr->second))
496  {
497  this->nodes.insert(*nptr);
498  }
499  }
500  }
501  else
502  {
503  this->nodes.insert(*nptr);
504  }
505  }
506  }
507 }

References id(), is_empty(), and mspass::utility::ErrorLogger::log_error().

◆ new_ensemble_process()

string mspass::utility::ProcessingHistory::new_ensemble_process ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const std::vector< ProcessingHistory * >  parents,
const bool  create_newid = true 
)

Define history chain for an algorithm with multiple inputs in an ensemble.

Use this method to define the history chain for an algorithm that has multiple inputs for each output. Each output needs to call this method to build the connections that define how all inputs link to the the new data being created by the algorithm that calls this method. Use this method for map operators that have an ensemble object as input and a single data object as output. This method should be called in creation of the output object. If the algorthm builds multiple outputs to build an output ensemble call this method for each output before pushing it to the output ensemble container.

This method should not be used for a reduce operation in spark. It does not satisfy the associative rule for reduce. Use accumulate for reduce operations.

Normally, it makes sense to have the boolean create_newid true so it is guaranteed the current_id is unique. There is little cost in creating a new one if there is any doubt the current_id is not a duplicate. The false option is there only for rare cases where the current id value needs to be preserved.

Note the vector of data passed is raw pointers for efficiency to avoid excessive copying. For normal use this should not create memory leaks but make sure you don't try to free what the pointers point to or problems are guaranteed. It is VERY IMPORTANT to realize that all the pointers are presumed to point to the ProcessingHistory component of a set of larger data object (Seismogram or TimeSeries). The parents do not all have be a common type as if they have valid history data within them their current type will be defined.

This method ALWAYS marks the status as VOLATILE.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
create_newidis a boolean defining how the current id is handled. As described above, if true the method will call newid and set that as the current id of this data object. If false the current value is left intact.
Returns
a string representation of the uuid of the data to which this ProcessingHistory is now attached.
190 {
191  if(create_newid)
192  {
193  this->newid();
194  }
195  /* We need to clear the tree contents because all the parents will
196  branch from this. Hence, we have to put the node data into an empty
197  container */
198  this->clear();
199  algorithm=alg;
200  algid=algid_in;
201  mytype=typ;
202  /* Initialize current stage but assume it will be updated as max of
203  parents below */
204  current_stage=0;
205  multimap<string,NodeData>::const_iterator nptr,nl,nu;
206  size_t i;
207  /* current_stage can be ambiguous from multiple inputs. We define
208  the current stage from a reduce as the largest stage value found
209  in all inputs. Note we only test the stage value at the head for
210  each parent */
211  int max_stage(0);
212  for(i=0;i<parents.size();++i)
213  {
214  if(parents[i]->is_empty())
215  {
216  stringstream ss;
217  ss << "Vector member number "<<i<<" with uuid="<<parents[i]->id()
218  << " has an empty history chain"<<endl
219  << "At best the processing history data will be incomplete"<<endl;
220  elog.log_error("ProcessingHistory::new_ensemble_process",ss.str(),
221  ErrorSeverity::Complaint);
222  continue;
223  }
224  multimap<string,NodeData> parent_node_data(parents[i]->get_nodes());
225  /* We also have to get the head data with this method now */
226  NodeData nd=parents[i]->current_nodedata();
227  if(nd.stage>max_stage) max_stage=nd.stage;
228  for(nptr=parent_node_data.begin();nptr!=parent_node_data.end();++nptr)
229  {
230  /*Adding to nodes multimap has a complication. It is possible in
231  some situations to have duplicate node data coming from different
232  inputs. The method we use to reconstruct the processing history tree
233  will be confused by such duplicates so we need to test for pure
234  duplicates in NodeData values. This algorithm would not scale well
235  if the number of values with a common key is large for either
236  this or parent[i]*/
237  string key(nptr->first);
238  if(this->nodes.count(key)>0)
239  {
240  nl=this->nodes.lower_bound(key);
241  nu=this->nodes.upper_bound(key);
242  for(auto ptr=nl;ptr!=nu;++ptr)
243  {
244  NodeData ndtest(ptr->second);
245  if(ndtest != (nptr->second))
246  {
247  this->nodes.insert(*nptr);
248  }
249  }
250  }
251  else
252  {
253  /* No problem just inserting a node if there were no previous
254  entries*/
255  this->nodes.insert(*nptr);
256  }
257  }
258  /* Also insert the head data */
259  pair<string,NodeData> pnd(current_id,nd);
260  this->nodes.insert(pnd);
261  }
262  current_stage=max_stage;
263  /* Now reset the current contents to make it the base of the history tree.
264  Be careful of uninitialized current_stage*/
265  if(current_stage>=0)
266  ++current_stage;
267  else
268  {
269  elog.log_error("ProcessingHistory::new_ensemble_process",
270  "current_stage for none of the parents was initialized\nImproper usage will create an invalid history chain that may cause downstream problems",
271  ErrorSeverity::Complaint);
272  current_stage=0;
273  }
274  algorithm=alg;
275  algid=algid_in;
276  // note this is output type - inputs can be variable and defined by nodes
277  mytype=typ;
278  current_status=ProcessingStatus::VOLATILE;
279  return current_id;
280 }
void clear()
Definition: ProcessingHistory.cc:677
std::multimap< std::string, mspass::utility::NodeData > get_nodes() const
Definition: ProcessingHistory.cc:657

References clear(), is_empty(), mspass::utility::ErrorLogger::log_error(), and newid().

◆ new_map() [1/2]

std::string mspass::utility::ProcessingHistory::new_map ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingHistory data_to_clone,
const ProcessingStatus  newstatus = ProcessingStatus::VOLATILE 
)

Define this algorithm as a one-to-one map.

Many algorithms define a one-to-one map where each one input data object creates one output data object. This class allows the input and output to be different data types requiring only that one input will map to one output. It differs from the overloaded method with fewer arguments in that it should be used if you need to clear and refresh the history chain for any reason. Known examples are creating simulation waveforms for testing within a workflow that have no prior history data loaded but which clone some properties of another piece of data. This method should be used in any situation where the history chain in the current data is wrong but the contents are the linked to some other process chain. It is supplied to cover odd cases, but use will likely be rare.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
data_to_cloneis reference to the ProcessingHistory section of a parent data object that should be used to override the existing history chain.
newstatusis how the status marking for the output. Normal (default) would be VOLATILE. This argument was included mainly for flexibility in case we wanted to extend the allowed entries in ProcessingStatus.

◆ new_map() [2/2]

std::string mspass::utility::ProcessingHistory::new_map ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingStatus  newstatus = ProcessingStatus::VOLATILE 
)

Define this algorithm as a one-to-one map of same type data.

Many algorithms define a one-to-one map where each one input data object creates one output data object. This (overloaded) version of this method is most appropriate when input and output are the same type and the history chain (ProcessingHistory) is what the new algorithm will alter to make the result when it finishes. Use the overloaded version with a separate ProcessingHistory copy if the current object's data are not correct. In this algorithm the chain for this algorithm is simply appended with new definitions.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
newstatusis how the status marking for the output. Normal (default) would be VOLATILE. This argument was included mainly for flexibility in case we wanted to extend the allowed entries in ProcessingStatus.

◆ newid()

string mspass::utility::ProcessingHistory::newid ( )

Create a new id.

This creates a new uuid - how is an implementation detail but here we use boost's random number generator uuid generator that has some absurdly small probability of generating two equal ids. It returns the string representation of the id created.

701 {
702  boost::uuids::random_generator gen;
703  boost::uuids::uuid uuidval;
704  uuidval=gen();
705  this->current_id=boost::uuids::to_string(uuidval);
706  return current_id;
707 }

◆ number_inputs() [1/2]

int mspass::utility::ProcessingHistory::number_inputs ( ) const

Return the number of inputs used to create current data.

In a number of contexts it can be useful to know the number of inputs defined for the current object. This returns that count.

697 {
698  return this->number_inputs(current_id);
699 }
int number_inputs() const
Definition: ProcessingHistory.cc:696

◆ number_inputs() [2/2]

int mspass::utility::ProcessingHistory::number_inputs ( const std::string  uuidstr) const

Return the number of inputs defined for any data in the process chain.

This overloaded version of number_inputs asks for the number of inputs defined for an arbitrary uuid. This is useful only if backtracing the ancestory of a child.

Parameters
uuidstris the uuid string to check in the ancestory record.

◆ number_of_stages()

size_t mspass::utility::ProcessingHistory::number_of_stages ( )
overridevirtual

Return number of processing stages that have been applied to this object.

One might want to know how many processing steps have been previously applied to produce the current data. For linear algorithms that would be useful only in debugging, but for an iterative algorithm it can be essential to avoid infinite loops with a loop limit parameter. This method returns how many times something has been done to alter the associated data. It returns 0 if the data are raw.

Important note is that the number return is the number of processing steps since the last save. Because a save operation is assumed to save the history chain then flush it there is not easy way at present to keep track of the total number of stages. If we really need this functionality it could be easily retrofitted with another private variable that is not reset when the clear method is called.

Reimplemented from mspass::utility::BasicProcessingHistory.

149 {
150  return current_stage;
151 }

◆ operator=()

ProcessingHistory & mspass::utility::ProcessingHistory::operator= ( const ProcessingHistory parent)

Assignment operator.

742 {
743  if(this!=(&parent))
744  {
745  this->BasicProcessingHistory::operator=(parent);
746  nodes=parent.nodes;
747  current_status=parent.current_status;
748  current_id=parent.current_id;
749  current_stage=parent.current_stage;
750  mytype=parent.mytype;
751  algorithm=parent.algorithm;
752  algid=parent.algid;
753  elog=parent.elog;
754  }
755  return *this;
756 }

◆ set_as_origin()

void mspass::utility::ProcessingHistory::set_as_origin ( const std::string  alg,
const std::string  algid,
const std::string  uuid,
const AtomicType  typ,
bool  define_as_raw = false 
)

Set to define this as the top origin of a history chain.

This method should be called when a new object is created to initialize the history as an origin. Note again an origin may be raw but not all origins are define as raw. This interface controls that through the boolean define_as_raw (false by default). python wrappers should define an alternate set_as_raw method that calls this method with define_as_raw set true.

It is VERY IMPORTANT to realize that the uuid argument passed to this method is if fundamental importance. That string is assumed to be a uuid that can be linked to either a parent data object read from storage and/or linked to a history chain saved by a prior run. It becomes the current_id for the data to which this object is a parent. This method also always does two things that define how the contents can be used. current_stage is ALWAYS set 0. We distinguish a pure origin from an intermediate save ONLY by the status value saved in the history chain. That is, only uuids with status set to RAW are viewed as guaranteed to be stored. A record marked ORIGIN is assumed to passed through save operation. To retrieve the history chain from multiple runs the pieces have to be pieced together by history data stored in MongoDB.

The contents of the history data structures should be empty when this method is called. That would be the norm for any constructor except those that make a deep copy. If unsure the clear method should be called before this method is called. If it isn't empty it will be cleared anyway and a complaint message will be posted to elog.

Parameters
algis the algorithm names to assign to the origin node. This would normally be a reader name, but it could be a synthetic generator.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled.
uuidunique if for this data object (see note above)
typdefines the data type (C++ class) "this" points to. It might be possible to determine this dynamically, but a design choice was to only allow registered classes through this mechanism. i.e. the enum class typ implements has a finite number of C++ classes it accepts. The type must be a child ProcessingHistory.
define_as_rawsets status as RAW if true and ORIGIN otherwise.
Exceptions
Neverthrows an exception BUT this method will post a complaint to elog if the history data structures are not empty and it the clear method needs to be called internally.
163 {
164  const string base_error("ProcessingHistory::set_as_origin: ");
165  if( nodes.size()>0 )
166  {
167  elog.log_error(alg+":"+algid_in,
168  base_error + "Illegal usage. History chain was not empty. Calling clear method and continuing",
169  ErrorSeverity::Complaint);
170  this->clear();
171  }
172  if(define_as_raw)
173  {
174  current_status=ProcessingStatus::RAW;
175  }
176  else
177  {
178  current_status=ProcessingStatus::ORIGIN;
179  }
180  algorithm=alg;
181  algid=algid_in;
182  current_id=uuid;
183  mytype=typ;
184  /* Origin/raw are always defined as stage 0 even after a save. */
185  current_stage=0;
186 }

References mspass::utility::ErrorLogger::log_error().

◆ set_id()

void mspass::utility::ProcessingHistory::set_id ( const std::string  newid)

Set the uuid manually.

It may occasionally be necessary to create a uuid by some other mechanism. This allows that, but this method should be used with caution and only if you understand the consequences.

Parameters
newidis string definition to use for the id.
709 {
710  this->current_id=newid;
711 }

References newid().

◆ stage()

int mspass::utility::ProcessingHistory::stage ( ) const
inline

Return the current stage count for this object.

We maintain a counter of the number of processing steps that have been applied to produce this data object. This simple method returns that counter. With this implementation this is identical to number_of_stages. We retain it in the API in the event we want to implement an accumulating counter.

596  {
597  return current_stage;
598  };

◆ status()

ProcessingStatus mspass::utility::ProcessingHistory::status ( ) const
inline

Return the current status definition (an enum).

601  {
602  return current_status;
603  };

The documentation for this class was generated from the following files: