import pickle
import numpy as np
from mspasspy.ccore.seismic import TimeSeries, TimeSeriesEnsemble
from mspasspy.ccore.utility import MsPASSError, ErrorSeverity
from obspy import UTCDateTime
[docs]class ApplyCalibEngine:
"""
A special case of response correction is a simple scalar multiply by a
constant used to convert data from raw counts to the (now) standard units
of nm/s. For many applications using broadband data that correction
is sufficient to assure ensembles of signals are on a common amplitude
scale. A basic rule if the passband of the processing is inside the
instrument passband a full response correction is not alway necessary.
A case in point is reeiver function processing where the passband of
actual data is inside the passband of most "broadband sensors".
This class can be used to process TimeSeries or TimeSeriesEnsemble
objects to convert data from raw counts to units of nm/s.
The effort required to extract the conversion factor from the
archain response format of seed and station xml is not trivial.
We rely here on the conversion of downloaded station metadata from
fdsn sources via web services and obspy's downloading methods.
Obspy converts that archain data to what they call an Inventory
object. In MsPASS we disaggregate the complicated Inventory object
into a set of MongoDB documents with one entry for each
seed time period fr each channel of data. Inside that document is a
an attibute with the tag "serialized_channel_data" that contains
the detailed response data serialized with pickle. The
constructor for this object runs pickle.loads on that data to yield
an obspy Response object. We only extract the "sensitivity" value from
Response. A major complication is unit restrictions and invalid
Response objects. These are handled by the constructor as described
below. The current implementation can only handle Response objects
with "input units" of meters per second and "output units" of counts.
Note input and output is because the Response object uses
sensitivity = 1/calib. i.e. sensitivity is the scale to convert
physical units to counts which backward from what this class is
designed for - converting counts to physical units of nm/s.
"""
def __init__(
self,
db,
query=None,
collection="channel",
ounits=["counts", "COUNTS", "count", "COUNT"],
iunits=["m/s", "M/S"],
response_data_key="serialized_channel_data",
verbose=False,
):
"""
Constructor for this object loads the serialized response data
and builds an internal cache to cross-reference channel_id with
calib values.
:param db: MongoDB Database handle assumed to contained a
channell collection created from an Inventory object with
the MsPASS database method `save_inventory`.
:param query: optional MongoDB query operator to apply to
collection before using. Size, for exampe, might be reduced
by using a time range query.
:type query: python dictionary. Default is None which is taken
to mean use the entire colleciton without a query.
:param collection: optional alternative collection name.
Rarely if ever should be changed as the code here has dependence
on MsPASS specific keys and the way we disaggregate obspy's
Inventory object.
:type collection: str (default "channel")
:param ounits: list of acceptable "output unit" definitions,
The default is various perturbation of"counts" .
:type ounits: list of string defining acceptable units.
:param iunits: list of acceptable names for "input units" in
the Response data structure. Default is known to work with
most FDSN data for velocity instruments. You would likely
change this list only if you wanted to wanted to use
accelerometer data or some other nonstandard unit data.
Note "input units" in FDSN jargon is kind of confusing in this
context as the purpose of this object is to convert raw data to
physical units so this object's output is actually the units
of iunits.
:type inunits: list of string names that must match xml posted
units. The default should almost always be sufficient.
:param response_data_key: key used to access the serialized
response data from each MongoDB document parsed by this construtor.
:type response_data_key: str (default is key sed in mspass and should
not normally be chaged. )
:param verbose: When set True (default is False) will print
error messages when it finds response data it cannot handle.
Turned off by default because a database with a few thousand
channels can generate a lot of error if you mix up acceleration
data with normal velocity sensor data. This class will not
handle acceleration channels as converting acceleration data to
velocity data is true response correction not a simple calib
correction.
"""
if query:
cursor = db[collection].find(query)
else:
cursor = db[collection].find({})
self.calib = dict()
for doc in cursor:
if response_data_key in doc:
chandata = pickle.loads(doc[response_data_key])
resp = chandata.response
sens = resp.instrument_sensitivity
if sens is None and verbose:
stastr = self._parse_stadata(doc)
print(stastr, " invalid instrument response")
print("pickle.loads returned this: ", str(resp))
elif sens:
if verbose:
message = self._parse_stadata(doc) + ": "
if sens.input_units and sens.output_units:
if sens.input_units in iunits:
if sens.output_units in ounits:
if sens.value is None:
if verbose:
message += "units ok but sensitivity value is undefined"
print(message)
else:
id = doc["_id"]
# inventory saves a "sensitivity" value
# which is he reciprocal of calib
self.calib[str(id)] = 1e9 / sens.value
elif verbose:
message += "Illegal output_unit value="
message += sens.output_units + "\n"
print(message)
elif verbose:
message += "Illegal input_units value="
message += sens.input_units + "\n"
print(message)
elif verbose:
message += (
"sensitivity data is undefined in this response object"
)
print(message)
elif verbose:
stastr = self._parse_stadata(doc)
print(
stastr,
" does not contain pickled response data - keey=",
response_data_key,
)
if len(self.calib) == 0:
message = (
"ApplyCalibEngine construtor: Database has no valid response data"
)
raise MsPASSError(message, ErrorSeverity.Invalid)
[docs] def apply_calib(self, d, id_key="channel_id", kill_if_undefined=True):
"""
Use this method to apply a calib value to a TimeSeries or all the
live members of a TimeSeriesEnsemble.
This is the processing function used for applying calibration.
It is a method instead of a function because the class allows the
calib values to be cached inside the object. The input(s) must
contain a MongoDB ObjectId that can be matched with the ids
loaded in the cache by the constructor. By default the key
used is "channel_id". That can be changed with the "id_key"
argument. If a match is found all
the sample valued are multiplied by calib AND the calib attribute is
set. Note a complaint will be issued if calib was found to already
be defined in AND is not 1.0 (a default sometimes appropriate)
This method can easily become a mass murderer. By default any
datum with an undefined calib value in the cache of this object will
be marked dead on return with a error message posted to its elog
container. If the `kill_if_undefined` argument is set False
such data will not be killed but an elog message will be posted
marked complaint.
:param d: input data to process
:type d: `TimeSeries` or `TimeSeriesEnsemle` This method will
raise a ValueError exception if d is anything else.
:param kill_if_undefined: bolean that when True (default) will
cause any datum for which a matching calib cannot be found to be
killed. Note this operation is atomic so for ensembles only
members that fail the match will be killed.
:return: copy of input with amplitudes multiplied by calib factor.
"""
if isinstance(d, TimeSeries):
alg = "ApplyCalibEngine.apply_calib"
if d.live:
if id_key in d:
idstr = str(d[id_key])
if idstr in self.calib:
this_calib = self.calib[idstr]
if "calib" in d:
if not np.isclose(d["calib"], 1.0):
old_calib = d["calib"]
new_metadata_calib = old_calib * this_calib
message = "calib was already defined in this datum as {}\n".format(
d["calib"]
)
message += "Data will be multiplied by calib defined in this object={}\n".format(
this_calib
)
message += "New calib in header = {}\n".format(
new_metadata_calib
)
message += "Amplitudes may be wrong with this datum"
d.elog.log_error(alg, message, ErrorSeverity.Complaint)
d["calib"] = new_metadata_calib
else:
d["calib"] = this_calib
d.data *= this_calib
else:
message = "calib factor could not be determined\n"
message += "Response data missing or flawed"
if kill_if_undefined:
d.elog.log_error(alg, message, ErrorSeverity.Invalid)
d.kill()
else:
d.elog.log_error(alg, message, ErrorSeverity.Complaint)
else:
message = "Missing required channel_id need to get calib factor"
if kill_if_undefined:
d.elog.log_error(alg, message, ErrorSeverity.Invalid)
d.kill()
else:
d.elog.log_error(alg, message, ErrorSeverity.Complaint)
elif isinstance(d, TimeSeriesEnsemble):
for i in range(len(d.member)):
d.member[i] = self.apply_calib(d.member[i])
else:
message = "Illegal input data type={}".format(str(type(d)))
raise ValueError(message)
return d
[docs] def size(self):
"""
Return size of the cache of calib values stored in the object.
"""
return len(self.calib)
def _parse_stadata(self, doc, null_value="Undefined") -> str:
"""
Internal method to cautiously parse document doc to
always a string of the form net:sta:chan:loc. Any keys
not defined will be replaced with null_value string.
"""
outstr = ""
count = 0
for k in ["net", "sta", "chan", "loc"]:
if k in doc:
outstr += doc[k]
else:
outstr += null_value
if count < 3:
outstr += ":"
count += 1
outstr += " for time range "
count = 0
for k in ["starttime", "endtime"]:
if k in doc:
outstr += str(UTCDateTime(doc[k]))
else:
outstr += null_value
if count == 0:
outstr += "->"
return outstr