Source code for smartsim.entity.model

# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

import itertools
import numbers
import re
import sys
import typing as t
import warnings
from os import getcwd
from os import path as osp

from smartsim._core.types import Device

from .._core.utils.helpers import cat_arg_and_value
from ..error import EntityExistsError, SSUnsupportedError
from ..log import get_logger
from ..settings.base import BatchSettings, RunSettings
from .dbobject import DBModel, DBScript
from .entity import SmartSimEntity
from .files import EntityFiles

logger = get_logger(__name__)


def _parse_model_parameters(params_dict: t.Dict[str, t.Any]) -> t.Dict[str, str]:
    """Convert the values in a params dict to strings
    :raises TypeError: if params are of the wrong type
    :return: param dictionary with values and keys cast as strings
    """
    param_names: t.List[str] = []
    parameters: t.List[str] = []
    for name, val in params_dict.items():
        param_names.append(name)
        if isinstance(val, (str, numbers.Number)):
            parameters.append(str(val))
        else:
            raise TypeError(
                "Incorrect type for model parameters\n"
                + "Must be numeric value or string."
            )
    return dict(zip(param_names, parameters))


[docs]class Model(SmartSimEntity): def __init__( self, name: str, params: t.Dict[str, str], run_settings: RunSettings, path: t.Optional[str] = getcwd(), params_as_args: t.Optional[t.List[str]] = None, batch_settings: t.Optional[BatchSettings] = None, ): """Initialize a ``Model`` :param name: name of the model :param params: model parameters for writing into configuration files or to be passed as command line arguments to executable. :param path: path to output, error, and configuration files :param run_settings: launcher settings specified in the experiment :param params_as_args: list of parameters which have to be interpreted as command line arguments to be added to run_settings :param batch_settings: Launcher settings for running the individual model as a batch job """ super().__init__(name, str(path), run_settings) self.params = _parse_model_parameters(params) self.params_as_args = params_as_args self.incoming_entities: t.List[SmartSimEntity] = [] self._key_prefixing_enabled = False self.batch_settings = batch_settings self._db_models: t.List[DBModel] = [] self._db_scripts: t.List[DBScript] = [] self.files: t.Optional[EntityFiles] = None @property def db_models(self) -> t.Iterable[DBModel]: """Retrieve an immutable collection of attached models :return: Return an immutable collection of attached models """ return (model for model in self._db_models) @property def db_scripts(self) -> t.Iterable[DBScript]: """Retrieve an immutable collection attached of scripts :return: Return an immutable collection of attached scripts """ return (script for script in self._db_scripts) @property def colocated(self) -> bool: """Return True if this Model will run with a colocated Orchestrator :return: Return True of the Model will run with a colocated Orchestrator """ return bool(self.run_settings.colocated_db_settings)
[docs] def register_incoming_entity(self, incoming_entity: SmartSimEntity) -> None: """Register future communication between entities. Registers the named data sources that this entity has access to by storing the key_prefix associated with that entity :param incoming_entity: The entity that data will be received from :raises SmartSimError: if incoming entity has already been registered """ if incoming_entity.name in [ in_entity.name for in_entity in self.incoming_entities ]: raise EntityExistsError( f"'{incoming_entity.name}' has already " + "been registered as an incoming entity" ) self.incoming_entities.append(incoming_entity)
[docs] def enable_key_prefixing(self) -> None: """If called, the entity will prefix its keys with its own model name""" self._key_prefixing_enabled = True
[docs] def disable_key_prefixing(self) -> None: """If called, the entity will not prefix its keys with its own model name""" self._key_prefixing_enabled = False
[docs] def query_key_prefixing(self) -> bool: """Inquire as to whether this entity will prefix its keys with its name :return: Return True if entity will prefix its keys with its name """ return self._key_prefixing_enabled
[docs] def attach_generator_files( self, to_copy: t.Optional[t.List[str]] = None, to_symlink: t.Optional[t.List[str]] = None, to_configure: t.Optional[t.List[str]] = None, ) -> None: """Attach files to an entity for generation Attach files needed for the entity that, upon generation, will be located in the path of the entity. Invoking this method after files have already been attached will overwrite the previous list of entity files. During generation, files "to_copy" are copied into the path of the entity, and files "to_symlink" are symlinked into the path of the entity. Files "to_configure" are text based model input files where parameters for the model are set. Note that only models support the "to_configure" field. These files must have fields tagged that correspond to the values the user would like to change. The tag is settable but defaults to a semicolon e.g. THERMO = ;10; :param to_copy: files to copy :param to_symlink: files to symlink :param to_configure: input files with tagged parameters """ to_copy = to_copy or [] to_symlink = to_symlink or [] to_configure = to_configure or [] # Check that no file collides with the parameter file written # by Generator. We check the basename, even though it is more # restrictive than what we need (but it avoids relative path issues) for strategy in [to_copy, to_symlink, to_configure]: if strategy is not None and any( osp.basename(filename) == "smartsim_params.txt" for filename in strategy ): raise ValueError( "`smartsim_params.txt` is a file automatically " + "generated by SmartSim and cannot be ovewritten." ) self.files = EntityFiles(to_configure, to_copy, to_symlink)
@property def attached_files_table(self) -> str: """Return a list of attached files as a plain text table :returns: String version of table """ if not self.files: return "No file attached to this model." return str(self.files)
[docs] def print_attached_files(self) -> None: """Print a table of the attached files on std out""" print(self.attached_files_table)
[docs] def colocate_db(self, *args: t.Any, **kwargs: t.Any) -> None: """An alias for ``Model.colocate_db_tcp``""" warnings.warn( ( "`colocate_db` has been deprecated and will be removed in a \n" "future release. Please use `colocate_db_tcp` or `colocate_db_uds`." ), FutureWarning, ) self.colocate_db_tcp(*args, **kwargs)
[docs] def colocate_db_uds( self, unix_socket: str = "/tmp/redis.socket", socket_permissions: int = 755, db_cpus: int = 1, custom_pinning: t.Optional[t.Iterable[t.Union[int, t.Iterable[int]]]] = None, debug: bool = False, db_identifier: str = "", **kwargs: t.Any, ) -> None: """Colocate an Orchestrator instance with this Model over UDS. This method will initialize settings which add an unsharded database to this Model instance. Only this Model will be able to communicate with this colocated database by using Unix Domain sockets. Extra parameters for the db can be passed through kwargs. This includes many performance, caching and inference settings. .. highlight:: python .. code-block:: python example_kwargs = { "maxclients": 100000, "threads_per_queue": 1, "inter_op_threads": 1, "intra_op_threads": 1, "server_threads": 2 # keydb only } Generally these don't need to be changed. :param unix_socket: path to where the socket file will be created :param socket_permissions: permissions for the socketfile :param db_cpus: number of cpus to use for orchestrator :param custom_pinning: CPUs to pin the orchestrator to. Passing an empty iterable disables pinning :param debug: launch Model with extra debug information about the colocated db :param kwargs: additional keyword arguments to pass to the orchestrator database """ if not re.match(r"^[a-zA-Z0-9.:\,_\-/]*$", unix_socket): raise ValueError( f"Invalid name for unix socket: {unix_socket}. Must only " "contain alphanumeric characters or . : _ - /" ) uds_options: t.Dict[str, t.Union[int, str]] = { "unix_socket": unix_socket, "socket_permissions": socket_permissions, # This is hardcoded to 0 as recommended by redis for UDS "port": 0, } common_options = { "cpus": db_cpus, "custom_pinning": custom_pinning, "debug": debug, "db_identifier": db_identifier, } self._set_colocated_db_settings(uds_options, common_options, **kwargs)
[docs] def colocate_db_tcp( self, port: int = 6379, ifname: t.Union[str, list[str]] = "lo", db_cpus: int = 1, custom_pinning: t.Optional[t.Iterable[t.Union[int, t.Iterable[int]]]] = None, debug: bool = False, db_identifier: str = "", **kwargs: t.Any, ) -> None: """Colocate an Orchestrator instance with this Model over TCP/IP. This method will initialize settings which add an unsharded database to this Model instance. Only this Model will be able to communicate with this colocated database by using the loopback TCP interface. Extra parameters for the db can be passed through kwargs. This includes many performance, caching and inference settings. .. highlight:: python .. code-block:: python ex. kwargs = { maxclients: 100000, threads_per_queue: 1, inter_op_threads: 1, intra_op_threads: 1, server_threads: 2 # keydb only } Generally these don't need to be changed. :param port: port to use for orchestrator database :param ifname: interface to use for orchestrator :param db_cpus: number of cpus to use for orchestrator :param custom_pinning: CPUs to pin the orchestrator to. Passing an empty iterable disables pinning :param debug: launch Model with extra debug information about the colocated db :param kwargs: additional keyword arguments to pass to the orchestrator database """ tcp_options = {"port": port, "ifname": ifname} common_options = { "cpus": db_cpus, "custom_pinning": custom_pinning, "debug": debug, "db_identifier": db_identifier, } self._set_colocated_db_settings(tcp_options, common_options, **kwargs)
def _set_colocated_db_settings( self, connection_options: t.Mapping[str, t.Union[int, t.List[str], str]], common_options: t.Dict[ str, t.Union[ t.Union[t.Iterable[t.Union[int, t.Iterable[int]]], None], bool, int, str, None, ], ], **kwargs: t.Union[int, None], ) -> None: """ Ingest the connection-specific options (UDS/TCP) and set the final settings for the colocated database """ if hasattr(self.run_settings, "mpmd") and len(self.run_settings.mpmd) > 0: raise SSUnsupportedError( "Models colocated with databases cannot be run as a mpmd workload" ) if hasattr(self.run_settings, "_prep_colocated_db"): # pylint: disable-next=protected-access self.run_settings._prep_colocated_db(common_options["cpus"]) if "limit_app_cpus" in kwargs: raise SSUnsupportedError( "Pinning app CPUs via limit_app_cpus is not supported. Modify " "RunSettings using the correct binding option for your launcher." ) # TODO list which db settings can be extras custom_pinning_ = t.cast( t.Optional[t.Iterable[t.Union[int, t.Iterable[int]]]], common_options.get("custom_pinning"), ) cpus_ = t.cast(int, common_options.get("cpus")) common_options["custom_pinning"] = self._create_pinning_string( custom_pinning_, cpus_ ) colo_db_config: t.Dict[ str, t.Union[ bool, int, str, None, t.List[str], t.Iterable[t.Union[int, t.Iterable[int]]], t.List[DBModel], t.List[DBScript], t.Dict[str, t.Union[int, None]], t.Dict[str, str], ], ] = {} colo_db_config.update(connection_options) colo_db_config.update(common_options) redis_ai_temp = { "threads_per_queue": kwargs.get("threads_per_queue", None), "inter_op_parallelism": kwargs.get("inter_op_parallelism", None), "intra_op_parallelism": kwargs.get("intra_op_parallelism", None), } # redisai arguments for inference settings colo_db_config["rai_args"] = redis_ai_temp colo_db_config["extra_db_args"] = { k: str(v) for k, v in kwargs.items() if k not in redis_ai_temp } self._check_db_objects_colo() colo_db_config["db_models"] = self._db_models colo_db_config["db_scripts"] = self._db_scripts self.run_settings.colocated_db_settings = colo_db_config @staticmethod def _create_pinning_string( pin_ids: t.Optional[t.Iterable[t.Union[int, t.Iterable[int]]]], cpus: int ) -> t.Optional[str]: """Create a comma-separated string of CPU ids. By default, ``None`` returns 0,1,...,cpus-1; an empty iterable will disable pinning altogether, and an iterable constructs a comma separated string of integers (e.g. ``[0, 2, 5]`` -> ``"0,2,5"``) """ def _stringify_id(_id: int) -> str: """Return the cPU id as a string if an int, otherwise raise a ValueError""" if isinstance(_id, int): if _id < 0: raise ValueError("CPU id must be a nonnegative number") return str(_id) raise TypeError(f"Argument is of type '{type(_id)}' not 'int'") try: pin_ids = tuple(pin_ids) if pin_ids is not None else None except TypeError: raise TypeError( "Expected a cpu pinning specification of type iterable of ints or " f"iterables of ints. Instead got type `{type(pin_ids)}`" ) from None # Deal with MacOSX limitations first. The "None" (default) disables pinning # and is equivalent to []. The only invalid option is a non-empty pinning if sys.platform == "darwin": if pin_ids: warnings.warn( "CPU pinning is not supported on MacOSX. Ignoring pinning " "specification.", RuntimeWarning, ) return None # Flatten the iterable into a list and check to make sure that the resulting # elements are all ints if pin_ids is None: return ",".join(_stringify_id(i) for i in range(cpus)) if not pin_ids: return None pin_ids = ((x,) if isinstance(x, int) else x for x in pin_ids) to_fmt = itertools.chain.from_iterable(pin_ids) return ",".join(sorted({_stringify_id(x) for x in to_fmt}))
[docs] def params_to_args(self) -> None: """Convert parameters to command line arguments and update run settings.""" if self.params_as_args is not None: for param in self.params_as_args: if not param in self.params: raise ValueError( f"Tried to convert {param} to command line argument for Model " f"{self.name}, but its value was not found in model params" ) if self.run_settings is None: raise ValueError( "Tried to configure command line parameter for Model " f"{self.name}, but no RunSettings are set." ) self.run_settings.add_exe_args( cat_arg_and_value(param, self.params[param]) )
[docs] def add_ml_model( self, name: str, backend: str, model: t.Optional[bytes] = None, model_path: t.Optional[str] = None, device: str = Device.CPU.value.upper(), devices_per_node: int = 1, first_device: int = 0, batch_size: int = 0, min_batch_size: int = 0, min_batch_timeout: int = 0, tag: str = "", inputs: t.Optional[t.List[str]] = None, outputs: t.Optional[t.List[str]] = None, ) -> None: """A TF, TF-lite, PT, or ONNX model to load into the DB at runtime Each ML Model added will be loaded into an orchestrator (converged or not) prior to the execution of this Model instance One of either model (in memory representation) or model_path (file) must be provided :param name: key to store model under :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) :param model: A model in memory (only supported for non-colocated orchestrators) :param model_path: serialized model :param device: name of device for execution :param devices_per_node: The number of GPU devices available on the host. This parameter only applies to GPU devices and will be ignored if device is specified as CPU. :param first_device: The first GPU device to use on the host. This parameter only applies to GPU devices and will be ignored if device is specified as CPU. :param batch_size: batch size for execution :param min_batch_size: minimum batch size for model execution :param min_batch_timeout: time to wait for minimum batch size :param tag: additional tag for model information :param inputs: model inputs (TF only) :param outputs: model outupts (TF only) """ db_model = DBModel( name=name, backend=backend, model=model, model_file=model_path, device=device, devices_per_node=devices_per_node, first_device=first_device, batch_size=batch_size, min_batch_size=min_batch_size, min_batch_timeout=min_batch_timeout, tag=tag, inputs=inputs, outputs=outputs, ) self.add_ml_model_object(db_model)
[docs] def add_script( self, name: str, script: t.Optional[str] = None, script_path: t.Optional[str] = None, device: str = Device.CPU.value.upper(), devices_per_node: int = 1, first_device: int = 0, ) -> None: """TorchScript to launch with this Model instance Each script added to the model will be loaded into an orchestrator (converged or not) prior to the execution of this Model instance Device selection is either "GPU" or "CPU". If many devices are present, a number can be passed for specification e.g. "GPU:1". Setting ``devices_per_node=N``, with N greater than one will result in the script being stored in the first N devices of type ``device``; alternatively, setting ``first_device=M`` will result in the script being stored on nodes M through M + N - 1. One of either script (in memory string representation) or script_path (file) must be provided :param name: key to store script under :param script: TorchScript code (only supported for non-colocated orchestrators) :param script_path: path to TorchScript code :param device: device for script execution :param devices_per_node: The number of GPU devices available on the host. This parameter only applies to GPU devices and will be ignored if device is specified as CPU. :param first_device: The first GPU device to use on the host. This parameter only applies to GPU devices and will be ignored if device is specified as CPU. """ db_script = DBScript( name=name, script=script, script_path=script_path, device=device, devices_per_node=devices_per_node, first_device=first_device, ) self.add_script_object(db_script)
[docs] def add_function( self, name: str, function: t.Optional[str] = None, device: str = Device.CPU.value.upper(), devices_per_node: int = 1, first_device: int = 0, ) -> None: """TorchScript function to launch with this Model instance Each script function to the model will be loaded into a non-converged orchestrator prior to the execution of this Model instance. For converged orchestrators, the :meth:`add_script` method should be used. Device selection is either "GPU" or "CPU". If many devices are present, a number can be passed for specification e.g. "GPU:1". Setting ``devices_per_node=N``, with N greater than one will result in the model being stored in the first N devices of type ``device``. :param name: key to store function under :param function: TorchScript function code :param device: device for script execution :param devices_per_node: The number of GPU devices available on the host. This parameter only applies to GPU devices and will be ignored if device is specified as CPU. :param first_device: The first GPU device to use on the host. This parameter only applies to GPU devices and will be ignored if device is specified as CPU. """ db_script = DBScript( name=name, script=function, device=device, devices_per_node=devices_per_node, first_device=first_device, ) self.add_script_object(db_script)
def __hash__(self) -> int: return hash(self.name) def __eq__(self, other: object) -> bool: if not isinstance(other, Model): return False if self.name == other.name: return True return False def __str__(self) -> str: # pragma: no cover entity_str = "Name: " + self.name + "\n" entity_str += "Type: " + self.type + "\n" entity_str += str(self.run_settings) + "\n" if self._db_models: entity_str += "DB Models: \n" + str(len(self._db_models)) + "\n" if self._db_scripts: entity_str += "DB Scripts: \n" + str(len(self._db_scripts)) + "\n" return entity_str def add_ml_model_object(self, db_model: DBModel) -> None: if not db_model.is_file and self.colocated: err_msg = "ML model can not be set from memory for colocated databases.\n" err_msg += ( f"Please store the ML model named {db_model.name} in binary format " ) err_msg += "and add it to the SmartSim Model as file." raise SSUnsupportedError(err_msg) self._db_models.append(db_model) def add_script_object(self, db_script: DBScript) -> None: if db_script.func and self.colocated: if not isinstance(db_script.func, str): err_msg = ( "Functions can not be set from memory for colocated databases.\n" f"Please convert the function named {db_script.name} " "to a string or store it as a text file and add it to the " "SmartSim Model with add_script." ) raise SSUnsupportedError(err_msg) self._db_scripts.append(db_script) def _check_db_objects_colo(self) -> None: for db_model in self._db_models: if not db_model.is_file: err_msg = ( "ML model can not be set from memory for colocated databases.\n" f"Please store the ML model named {db_model.name} in binary " "format and add it to the SmartSim Model as file." ) raise SSUnsupportedError(err_msg) for db_script in self._db_scripts: if db_script.func: if not isinstance(db_script.func, str): err_msg = ( "Functions can not be set from memory for colocated " "databases.\nPlease convert the function named " f"{db_script.name} to a string or store it as a text" "file and add it to the SmartSim Model with add_script." ) raise SSUnsupportedError(err_msg)