Source code for mspasspy.client

import os
import pymongo

from mspasspy.db.client import DBClient
from mspasspy.db.database import Database
from mspasspy.global_history.manager import GlobalHistoryManager

try:
    from pyspark import SparkConf, SparkContext

    _mspasspy_has_pyspark = True
except ImportError:
    _mspasspy_has_pyspark = False

try:
    from pyspark.sql import SparkSession
except ImportError:
    _mspasspy_has_pyspark = False

try:
    from dask.distributed import Client as DaskClient

    _mspasspy_has_dask_distributed = True
except ImportError:
    _mspasspy_has_dask_distributed = False

from mspasspy.ccore.utility import MsPASSError


[docs]class Client: """ A client-side representation of MSPASS. This is the only client users should use in MSPASS. The client manages all the other clients or instances. It creates and manages a Database client. It creates and manages a Global Hisotry Manager. It creates and manages a scheduler(spark/dask) For the address and port of each client/instances, we first check the user specified parameters, if not then serach the environment varibales values, if not againm then use the default settings. """ def __init__( self, database_host=None, scheduler=None, scheduler_host=None, job_name="mspass", database_name="mspass", schema=None, collection=None, ): # job_name should be a string if database_host is not None and not type(database_host) is str: raise MsPASSError( "database_host should be a string but " + str(type(database_host)) + " is found.", "Fatal", ) if scheduler is not None and scheduler != "dask" and scheduler != "spark": raise MsPASSError( "scheduler should be either dask or spark but " + str(scheduler) + " is found.", "Fatal", ) if scheduler_host is not None and not type(scheduler_host) is str: raise MsPASSError( "scheduler_host should be a string but " + str(type(scheduler_host)) + " is found.", "Fatal", ) if job_name is not None and not type(job_name) is str: raise MsPASSError( "job_name should be a string but " + str(type(job_name)) + " is found.", "Fatal", ) if database_name is not None and not type(database_name) is str: raise MsPASSError( "database_name should be a string but " + str(type(database_name)) + " is found.", "Fatal", ) # collection should be a string if collection is not None and type(collection) is not str: raise MsPASSError( "collection should be a string but " + str(type(collection)) + " is found.", "Fatal", ) # check env variables MSPASS_DB_ADDRESS = os.environ.get("MSPASS_DB_ADDRESS") MONGODB_PORT = os.environ.get("MONGODB_PORT") MSPASS_SCHEDULER = os.environ.get("MSPASS_SCHEDULER") MSPASS_SCHEDULER_ADDRESS = os.environ.get("MSPASS_SCHEDULER_ADDRESS") DASK_SCHEDULER_PORT = os.environ.get("DASK_SCHEDULER_PORT") SPARK_MASTER_PORT = os.environ.get("SPARK_MASTER_PORT") # create a database client # priority: parameter -> env -> default database_host_has_port = False if database_host: database_address = database_host # check if database_host contains port number already if ":" in database_address: database_host_has_port = True elif MSPASS_DB_ADDRESS: database_address = MSPASS_DB_ADDRESS else: database_address = "127.0.0.1" # add port if not database_host_has_port and MONGODB_PORT: database_address += ":" + MONGODB_PORT try: self._db_client = DBClient(database_address) self._db_client.server_info() except Exception as err: raise MsPASSError( "Runntime error: cannot create a database client with: " + database_address, "Fatal", ) # set default database name self._default_database_name = database_name self._default_schema = schema self._default_collection = collection # create a Global History Manager if schema: global_history_manager_db = Database( self._db_client, database_name, db_schema=schema ) else: global_history_manager_db = Database(self._db_client, database_name) self._global_history_manager = GlobalHistoryManager( global_history_manager_db, job_name, collection=collection ) # set scheduler if scheduler: self._scheduler = scheduler elif MSPASS_SCHEDULER: self._scheduler = MSPASS_SCHEDULER else: if _mspasspy_has_dask_distributed: self._scheduler = "dask" elif _mspasspy_has_pyspark: self._scheduler = "spark" else: self._scheduler = None # scheduler configuration if self._scheduler == "spark": scheduler_host_has_port = False if scheduler_host: self._spark_master_url = scheduler_host # add spark:// prefix if not exist if "spark://" not in scheduler_host: self._spark_master_url = "spark://" + self._spark_master_url # check if spark host address contains port number already if self._spark_master_url.count(":") == 2: scheduler_host_has_port = True elif MSPASS_SCHEDULER_ADDRESS: self._spark_master_url = MSPASS_SCHEDULER_ADDRESS # add spark:// prefix if not exist if "spark://" not in MSPASS_SCHEDULER_ADDRESS: self._spark_master_url = "spark://" + self._spark_master_url else: self._spark_master_url = "local" # add port number # 1. not the default 'local' # 2. scheduler_host and does not contain port number # 3. SPARK_MASTER_PORT exists if ( (scheduler_host or MSPASS_SCHEDULER_ADDRESS) and not scheduler_host_has_port and SPARK_MASTER_PORT ): self._spark_master_url += ":" + SPARK_MASTER_PORT # sanity check try: spark = ( SparkSession.builder.appName("mspass") .master(self._spark_master_url) .getOrCreate() ) self._spark_context = spark.sparkContext except Exception as err: raise MsPASSError( "Runntime error: cannot create a spark configuration with: " + self._spark_master_url, "Fatal", ) elif self._scheduler == "dask": # if no defind scheduler_host and no MSPASS_SCHEDULER_ADDRESS, use local cluster to create a client if not scheduler_host and not MSPASS_SCHEDULER_ADDRESS: self._dask_client = DaskClient() else: scheduler_host_has_port = False # set host if scheduler_host: self._dask_client_address = scheduler_host # check if scheduler_host contains port number already if ":" in scheduler_host: scheduler_host_has_port = True else: self._dask_client_address = MSPASS_SCHEDULER_ADDRESS # add port if not scheduler_host_has_port and DASK_SCHEDULER_PORT: self._dask_client_address += ":" + DASK_SCHEDULER_PORT else: # use to port 8786 by default if not specified self._dask_client_address += ":8786" # sanity check try: self._dask_client = DaskClient(self._dask_client_address) except Exception as err: raise MsPASSError( "Runntime error: cannot create a dask client with: " + self._dask_client_address, "Fatal", ) else: print("There is no spark or dask installed, this client has no scheduler")
[docs] def get_database_client(self): """ Get the database client in the global history manager :return: :class:`mspasspy.db.database.Database` """ return self._db_client
[docs] def get_database(self, database_name=None): """ Get a database by database_name, if database_name is not specified, use the default one :param database_name: the name of database :type database_name: :class:`str` :return: :class:`mspasspy.db.database.Database` """ if not database_name: return Database(self._db_client, self._default_database_name) return Database(self._db_client, database_name)
[docs] def get_global_history_manager(self): """ Get the global history manager with this client :return: :class:`mspasspy.global_history.manager.GlobalHistoryManager` """ return self._global_history_manager
[docs] def get_scheduler(self): """ Get the scheduler(spark/dask) with this client :return: :class:`pyspark.SparkContext`/:class:`dask.distributed.Client`/None """ if self._scheduler == "spark": return self._spark_context elif self._scheduler == "dask": return self._dask_client else: print( "There is no spark or dask installed, this client has no scheduler, returned None" ) return None
[docs] def set_database_client(self, database_host, database_port=None): """ Set a database client by database_host(and database_port) :param database_host: the host address of database client :type database_host: :class:`str` :param database_port: the port of database client :type database_port: :class:`str` """ database_host_has_port = False database_address = database_host # check if port is already in the database_host address if ":" in database_host: database_host_has_port = True # add port if not database_host_has_port and database_port: database_address += ":" + database_port # sanity check temp_db_client = self._db_client try: self._db_client = DBClient(database_address) self._db_client.server_info() except Exception as err: # restore the _db_client self._db_client = temp_db_client raise MsPASSError( "Runntime error: cannot create a database client with: " + database_address, "Fatal", )
[docs] def set_global_history_manager(self, history_db, job_name, collection=None): """ Set a global history manager by history_db, job_name(and collection) :param history_db: the database will be set in the global history manager :type history_db: :class:`mspasspy.db.database.Database` :param job_name: the job name will be set in the global history manager :type job_name: :class:`str` :param collection: the collection name will be set in the history_db :type collection: :class:`str` """ if not isinstance(history_db, Database): raise TypeError( "history_db should be a mspasspy.db.Database but " + str(type(history_db)) + " is found." ) if not type(job_name) is str: raise TypeError( "job_name should be a string but " + str(type(job_name)) + " is found." ) if collection is not None and type(collection) is not str: raise TypeError( "collection should be a string but " + str(type(collection)) + " is found." ) self._global_history_manager = GlobalHistoryManager( history_db, job_name, collection=collection )
[docs] def set_scheduler(self, scheduler, scheduler_host, scheduler_port=None): """ Set a scheduler by scheduler type, scheduler_host(and scheduler_port) :param scheduler: the scheduler type, should be either dask or spark :type scheduler: :class:`str` :param scheduler_host: the host address of scheduler :type scheduler_host: :class:`str` :param scheduler_port: the port of scheduler :type scheduler_port: :class:`str` """ if scheduler != "dask" and scheduler != "spark": raise MsPASSError( "scheduler should be either dask or spark but " + str(scheduler) + " is found.", "Fatal", ) prev_scheduler = self._scheduler self._scheduler = scheduler if scheduler == "spark": scheduler_host_has_port = False self._spark_master_url = scheduler_host # add spark:// prefix if not exist if "spark://" not in scheduler_host: self._spark_master_url = "spark://" + self._spark_master_url # check if spark host address contains port number already if self._spark_master_url.count(":") == 2: scheduler_host_has_port = True # add port if not scheduler_host_has_port and scheduler_port: self._spark_master_url += ":" + scheduler_port # sanity check prev_spark_context = None prev_spark_conf = None if hasattr(self, "_spark_context"): prev_spark_context = self._spark_context prev_spark_conf = self._spark_context.getConf() try: if hasattr(self, "_spark_context") and isinstance( self._spark_context, SparkContext ): # update the confinguration spark_conf = self._spark_context._conf.setMaster( self._spark_master_url ) else: spark_conf = ( SparkConf() .setAppName("mspass") .setMaster(self._spark_master_url) ) # stop the previous spark context # FIXME if the new context does not start, we shouldn't stop the previous here. # if prev_spark_context: # prev_spark_context.stop() # create a new spark context -> might cause error so that execute exception code spark = SparkSession.builder.config(conf=spark_conf).getOrCreate() self._spark_context = spark.sparkContext except Exception as err: # restore the spark context by the previous spark configuration if prev_spark_conf: self._spark_context = SparkContext.getOrCreate(conf=prev_spark_conf) # restore the scheduler type if self._scheduler == "spark" and prev_scheduler == "dask": self._scheduler = prev_scheduler raise MsPASSError( "Runntime error: cannot create a spark configuration with: " + self._spark_master_url, "Fatal", ) # close previous dask client if success if hasattr(self, "_dask_client"): del self._dask_client elif scheduler == "dask": scheduler_host_has_port = False self._dask_client_address = scheduler_host # check if scheduler_host contains port number already if ":" in scheduler_host: scheduler_host_has_port = True # add port if not scheduler_host_has_port: if scheduler_port: self._dask_client_address += ":" + scheduler_port else: # use to port 8786 by default if not specified self._dask_client_address += ":8786" # sanity check prev_dask_client = None if hasattr(self, "_dask_client"): prev_dask_client = self._dask_client try: # create a new dask client self._dask_client = DaskClient(self._dask_client_address) except Exception as err: # restore the dask client if exists if prev_dask_client: self._dask_client = prev_dask_client # restore the scheduler type if self._scheduler == "dask" and prev_scheduler == "spark": self._scheduler = prev_scheduler raise MsPASSError( "Runntime error: cannot create a dask client with: " + self._dask_client_address, "Fatal", ) # remove previous spark context if success setting new dask client if hasattr(self, "_spark_context"): del self._spark_context