Source code for smartsim.entity.ensemble

# BSD 2-Clause License
#
# Copyright (c) 2021-2022, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from copy import deepcopy
from os import getcwd

from .._core.utils.helpers import init_default
from ..error import (
    EntityExistsError,
    SmartSimError,
    SSUnsupportedError,
    UserStrategyError,
)
from ..log import get_logger
from ..settings.base import BatchSettings, RunSettings
from .dbobject import DBModel, DBScript
from .entityList import EntityList
from .model import Model
from .strategies import create_all_permutations, random_permutations, step_values

logger = get_logger(__name__)


[docs]class Ensemble(EntityList): """``Ensemble`` is a group of ``Model`` instances that can be treated as a reference to a single instance. """ def __init__( self, name, params, params_as_args=None, batch_settings=None, run_settings=None, perm_strat="all_perm", **kwargs, ): """Initialize an Ensemble of Model instances. The kwargs argument can be used to pass custom input parameters to the permutation strategy. :param name: name of the ensemble :type name: str :param params: parameters to expand into ``Model`` members :type params: dict[str, Any] :param params_as_args: list of params which should be used as command line arguments to the ``Model`` member executables and not written to generator files :type arg_params: list[str] :param batch_settings: describes settings for ``Ensemble`` as batch workload :type batch_settings: BatchSettings, optional :param run_settings: describes how each ``Model`` should be executed :type run_settings: RunSettings, optional :param replicas: number of ``Model`` replicas to create - a keyword argument of kwargs :type replicas: int, optional :param perm_strategy: strategy for expanding ``params`` into ``Model`` instances from params argument options are "all_perm", "stepped", "random" or a callable function. Defaults to "all_perm". :type perm_strategy: str :return: ``Ensemble`` instance :rtype: ``Ensemble`` """ self.params = init_default({}, params, dict) self.params_as_args = init_default({}, params_as_args, (list, str)) self._key_prefixing_enabled = True self.batch_settings = init_default({}, batch_settings, BatchSettings) self.run_settings = init_default({}, run_settings, RunSettings) self._db_models = [] self._db_scripts = [] super().__init__(name, getcwd(), perm_strat=perm_strat, **kwargs) @property def models(self): return self.entities def _initialize_entities(self, **kwargs): """Initialize all the models within the ensemble based on the parameters passed to the ensemble and the permutation strategy given at init. :raises UserStrategyError: if user generation strategy fails """ strategy = self._set_strategy(kwargs.pop("perm_strat")) replicas = kwargs.pop("replicas", None) # if a ensemble has parameters and run settings, create # the ensemble and assign run_settings to each member if self.params: if self.run_settings: param_names, params = self._read_model_parameters() # Compute all combinations of model parameters and arguments all_model_params = strategy(param_names, params, **kwargs) if not isinstance(all_model_params, list): raise UserStrategyError(strategy) for i, param_set in enumerate(all_model_params): if not isinstance(param_set, dict): raise UserStrategyError(strategy) run_settings = deepcopy(self.run_settings) model_name = "_".join((self.name, str(i))) model = Model( model_name, param_set, self.path, run_settings=run_settings, params_as_args=self.params_as_args, ) model.enable_key_prefixing() model.params_to_args() logger.debug( f"Created ensemble member: {model_name} in {self.name}" ) self.add_model(model) # cannot generate models without run settings else: raise SmartSimError( "Ensembles without 'params' or 'replicas' argument to expand into members cannot be given run settings" ) else: if self.run_settings: if replicas: for i in range(replicas): model_name = "_".join((self.name, str(i))) model = Model( model_name, {}, self.path, run_settings=deepcopy(self.run_settings), ) model.enable_key_prefixing() logger.debug( f"Created ensemble member: {model_name} in {self.name}" ) self.add_model(model) else: raise SmartSimError( "Ensembles without 'params' or 'replicas' argument to expand into members cannot be given run settings" ) # if no params, no run settings and no batch settings, error because we # don't know how to run the ensemble elif not self.batch_settings: raise SmartSimError( "Ensemble must be provided batch settings or run settings" ) else: logger.info("Empty ensemble created for batch launch")
[docs] def add_model(self, model): """Add a model to this ensemble :param model: model instance to be added :type model: Model :raises TypeError: if model is not an instance of ``Model`` :raises EntityExistsError: if model already exists in this ensemble """ if not isinstance(model, Model): raise TypeError( f"Argument to add_model was of type {type(model)}, not Model" ) # "in" operator uses model name for __eq__ if model in self.entities: raise EntityExistsError( f"Model {model.name} already exists in ensemble {self.name}" ) if self._db_models: self._extend_entity_db_models(model, self._db_models) if self._db_scripts: self._extend_entity_db_scripts(model, self._db_scripts) self.entities.append(model)
[docs] def register_incoming_entity(self, incoming_entity): """Register future communication between entities. Registers the named data sources that this entity has access to by storing the key_prefix associated with that entity Only python clients can have multiple incoming connections :param incoming_entity: The entity that data will be received from :type incoming_entity: SmartSimEntity """ for model in self.entities: model.register_incoming_entity(incoming_entity)
[docs] def enable_key_prefixing(self): """If called, all models within this ensemble will prefix their keys with its own model name. """ for model in self.entities: model.enable_key_prefixing()
[docs] def query_key_prefixing(self): """Inquire as to whether each model within the ensemble will prefix its keys :returns: True if all models have key prefixing enabled, False otherwise :rtype: bool """ return all([model.query_key_prefixing() for model in self.entities])
[docs] def attach_generator_files(self, to_copy=None, to_symlink=None, to_configure=None): """Attach files to each model within the ensemble for generation Attach files needed for the entity that, upon generation, will be located in the path of the entity. During generation, files "to_copy" are copied into the path of the entity, and files "to_symlink" are symlinked into the path of the entity. Files "to_configure" are text based model input files where parameters for the model are set. Note that only models support the "to_configure" field. These files must have fields tagged that correspond to the values the user would like to change. The tag is settable but defaults to a semicolon e.g. THERMO = ;10; :param to_copy: files to copy, defaults to [] :type to_copy: list, optional :param to_symlink: files to symlink, defaults to [] :type to_symlink: list, optional :param to_configure: input files with tagged parameters, defaults to [] :type to_configure: list, optional """ for model in self.entities: model.attach_generator_files( to_copy=to_copy, to_symlink=to_symlink, to_configure=to_configure )
def _set_strategy(self, strategy): """Set the permutation strategy for generating models within the ensemble :param strategy: name of the strategy or callable function :type strategy: str :raises SSUnsupportedError: if str name is not supported :return: strategy function :rtype: callable """ if strategy == "all_perm": return create_all_permutations if strategy == "step": return step_values if strategy == "random": return random_permutations if callable(strategy): return strategy raise SSUnsupportedError( f"Permutation strategy given is not supported: {strategy}" ) def _read_model_parameters(self): """Take in the parameters given to the ensemble and prepare to create models for the ensemble :raises TypeError: if params are of the wrong type :return: param names and values for permutation strategy :rtype: tuple[list, list] """ if not isinstance(self.params, dict): raise TypeError( "Ensemble initialization argument 'params' must be of type dict" ) param_names = [] parameters = [] for name, val in self.params.items(): param_names.append(name) if isinstance(val, list): parameters.append(val) elif isinstance(val, (int, str)): parameters.append([val]) else: raise TypeError( "Incorrect type for ensemble parameters\n" + "Must be list, int, or string." ) return param_names, parameters
[docs] def add_ml_model( self, name, backend, model=None, model_path=None, device="CPU", devices_per_node=1, batch_size=0, min_batch_size=0, tag="", inputs=None, outputs=None, ): """A TF, TF-lite, PT, or ONNX model to load into the DB at runtime Each ML Model added will be loaded into an orchestrator (converged or not) prior to the execution of every entity belonging to this ensemble One of either model (in memory representation) or model_path (file) must be provided :param name: key to store model under :type name: str :param model: model in memory :type model: str | bytes | None :param model_path: serialized model :type model_path: file path to model :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) :type backend: str :param device: name of device for execution, defaults to "CPU" :type device: str, optional :param batch_size: batch size for execution, defaults to 0 :type batch_size: int, optional :param min_batch_size: minimum batch size for model execution, defaults to 0 :type min_batch_size: int, optional :param tag: additional tag for model information, defaults to "" :type tag: str, optional :param inputs: model inputs (TF only), defaults to None :type inputs: list[str], optional :param outputs: model outupts (TF only), defaults to None :type outputs: list[str], optional """ db_model = DBModel( name=name, backend=backend, model=model, model_file=model_path, device=device, devices_per_node=devices_per_node, batch_size=batch_size, min_batch_size=min_batch_size, tag=tag, inputs=inputs, outputs=outputs, ) self._db_models.append(db_model) for entity in self: self._extend_entity_db_models(entity, [db_model])
[docs] def add_script( self, name, script=None, script_path=None, device="CPU", devices_per_node=1 ): """TorchScript to launch with every entity belonging to this ensemble Each script added to the model will be loaded into an orchestrator (converged or not) prior to the execution of every entity belonging to this ensemble Device selection is either "GPU" or "CPU". If many devices are present, a number can be passed for specification e.g. "GPU:1". Setting ``devices_per_node=N``, with N greater than one will result in the model being stored in the first N devices of type ``device``. One of either script (in memory string representation) or script_path (file) must be provided :param name: key to store script under :type name: str :param script: TorchScript code :type script: str, optional :param script_path: path to TorchScript code :type script_path: str, optional :param device: device for script execution, defaults to "CPU" :type device: str, optional :param devices_per_node: number of devices on each host :type devices_per_node: int """ db_script = DBScript( name=name, script=script, script_path=script_path, device=device, devices_per_node=devices_per_node, ) self._db_scripts.append(db_script) for entity in self: self._extend_entity_db_scripts(entity, [db_script])
[docs] def add_function(self, name, function=None, device="CPU", devices_per_node=1): """TorchScript function to launch with every entity belonging to this ensemble Each script function to the model will be loaded into a non-converged orchestrator prior to the execution of every entity belonging to this ensemble. For converged orchestrators, the :meth:`add_script` method should be used. Device selection is either "GPU" or "CPU". If many devices are present, a number can be passed for specification e.g. "GPU:1". Setting ``devices_per_node=N``, with N greater than one will result in the model being stored in the first N devices of type ``device``. :param name: key to store function under :type name: str :param script: TorchScript code :type script: str, optional :param script_path: path to TorchScript code :type script_path: str, optional :param device: device for script execution, defaults to "CPU" :type device: str, optional :param devices_per_node: number of devices on each host :type devices_per_node: int """ db_script = DBScript( name=name, script=function, device=device, devices_per_node=devices_per_node ) self._db_scripts.append(db_script) for entity in self: self._extend_entity_db_scripts(entity, [db_script])
def _extend_entity_db_models(self, model, db_models): entity_db_models = [db_model.name for db_model in model._db_models] for db_model in db_models: if not db_model.name in entity_db_models: model._append_db_model(db_model) def _extend_entity_db_scripts(self, model, db_scripts): entity_db_scripts = [db_script.name for db_script in model._db_scripts] for db_script in db_scripts: if not db_script.name in entity_db_scripts: model._append_db_script(db_script)