# BSD 2-Clause License
#
# Copyright (c) 2021-2022, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os.path as osp
import time
from os import getcwd
from tabulate import tabulate
from tqdm import trange
from ._core import Controller, Generator, Manifest
from ._core.utils import init_default
from .database import Orchestrator
from .entity import Ensemble, Model
from .error import SmartSimError
from .log import get_logger
from .settings import settings
from .wlm import detect_launcher
logger = get_logger(__name__)
[docs]class Experiment:
"""Experiments are the Python user interface for SmartSim.
Experiment is a factory class that creates stages of a workflow
and manages their execution.
The instances created by an Experiment represent executable code
that is either user-specified, like the ``Model`` instance created
by ``Experiment.create_model``, or pre-configured, like the ``Orchestrator``
instance created by ``Experiment.create_database``.
Experiment methods that accept a variable list of arguments, such as
``Experiment.start`` or ``Experiment.stop``, accept any number of the
instances created by the Experiment.
In general, the Experiment class is designed to be initialized once
and utilized throughout runtime.
"""
def __init__(self, name, exp_path=None, launcher="local"):
"""Initialize an Experiment instance
With the default settings, the Experiment will use the
local launcher, which will start all Experiment created
instances on the localhost.
Example of initializing an Experiment with the local launcher
.. highlight:: python
.. code-block:: python
exp = Experiment(name="my_exp", launcher="local")
SmartSim supports multiple launchers which also can be specified
based on the type of system you are running on.
.. highlight:: python
.. code-block:: python
exp = Experiment(name="my_exp", launcher="slurm")
If you wish your driver script and Experiment to be run across
multiple system with different schedulers (workload managers)
you can also use the `auto` argument to have the Experiment guess
which launcher to use based on system installed binaries and libraries
.. highlight:: python
.. code-block:: python
exp = Experiment(name="my_exp", launcher="auto")
The Experiment path will default to the current working directory
and if the ``Experiment.generate`` method is called, a directory
with the Experiment name will be created to house the output
from the Experiment.
:param name: name for the ``Experiment``
:type name: str
:param exp_path: path to location of ``Experiment`` directory if generated
:type exp_path: str, optional
:param launcher: type of launcher being used, options are "slurm", "pbs",
"cobalt", "lsf", or "local". If set to "auto",
an attempt will be made to find an available launcher on the system.
Defaults to "local"
:type launcher: str, optional
"""
self.name = name
if exp_path:
if not isinstance(exp_path, str):
raise TypeError("exp_path argument was not of type str")
if not osp.isdir(osp.abspath(exp_path)):
raise NotADirectoryError("Experiment path provided does not exist")
exp_path = osp.abspath(exp_path)
self.exp_path = init_default(osp.join(getcwd(), name), exp_path, str)
if launcher == "auto":
launcher = detect_launcher()
self._control = Controller(launcher=launcher)
self._launcher = launcher.lower()
[docs] def start(self, *args, block=True, summary=False, kill_on_interrupt=True):
"""Start passed instances using Experiment launcher
Any instance ``Model``, ``Ensemble`` or ``Orchestrator``
instance created by the Experiment can be passed as
an argument to the start method.
.. highlight:: python
.. code-block:: python
exp = Experiment(name="my_exp", launcher="slurm")
settings = exp.create_run_settings(exe="./path/to/binary")
model = exp.create_model("my_model", settings)
exp.start(model)
Multiple instance can also be passed to the start method
at once no matter which type of instance they are. These will
all be launched together.
.. highlight:: python
.. code-block:: python
exp.start(model_1, model_2, db, ensemble, block=True)
# alternatively
stage_1 = [model_1, model_2, db, ensemble]
exp.start(*stage_1, block=True)
If `block==True` the Experiment will poll the launched instances
at runtime until all non-database jobs have completed. Database
jobs *must* be killed by the user by passing them to
``Experiment.stop``. This allows for multiple stages of a workflow
to produce to and consume from the same Orchestrator database.
If `kill_on_interrupt=True`, then all jobs launched by this
experiment are guaranteed to be killed when ^C (SIGINT) signal is
received. If `kill_on_interrupt=False`, then it is not guaranteed
that all jobs launched by this experiment will be killed, and the
zombie processes will need to be manually killed.
:param block: block execution until all non-database
jobs are finished, defaults to True
:type block: bool, optional
:param summary: print a launch summary prior to launch,
defaults to False
:type summary: bool, optional
:param kill_on_interrupt: flag for killing jobs when ^C (SIGINT)
signal is received.
:type kill_on_interrupt: bool, optional
"""
start_manifest = Manifest(*args)
try:
if summary:
self._launch_summary(start_manifest)
self._control.start(
manifest=start_manifest,
block=block,
kill_on_interrupt=kill_on_interrupt,
)
except SmartSimError as e:
logger.error(e)
raise
[docs] def stop(self, *args):
"""Stop specific instances launched by this ``Experiment``
Instances of ``Model``, ``Ensemble`` and ``Orchestrator``
can all be passed as arguments to the stop method.
Whichever launcher was specified at Experiment initialization
will be used to stop the instance. For example, which using
the slurm launcher, this equates to running `scancel` on the
instance.
Example
.. highlight:: python
.. code-block:: python
exp.stop(model)
# multiple
exp.stop(model_1, model_2, db, ensemble)
:raises TypeError: if wrong type
:raises SmartSimError: if stop request fails
"""
try:
stop_manifest = Manifest(*args)
for entity in stop_manifest.models:
self._control.stop_entity(entity)
for entity_list in stop_manifest.all_entity_lists:
self._control.stop_entity_list(entity_list)
except SmartSimError as e:
logger.error(e)
raise
[docs] def generate(self, *args, tag=None, overwrite=False):
"""Generate the file structure for an ``Experiment``
``Experiment.generate`` creates directories for each instance
passed to organize Experiments that launch many instances.
If files or directories are attached to ``Model`` objects
using ``Model.attach_generator_files()``, those files or
directories will be symlinked, copied, or configured and
written into the created directory for that instance.
Instances of ``Model``, ``Ensemble`` and ``Orchestrator``
can all be passed as arguments to the generate method.
:param tag: tag used in `to_configure` generator files
:type tag: str, optional
:param overwrite: overwrite existing folders and contents,
defaults to False
:type overwrite: bool, optional
"""
try:
generator = Generator(self.exp_path, overwrite=overwrite)
if tag:
generator.set_tag(tag)
generator.generate_experiment(*args)
except SmartSimError as e:
logger.error(e)
raise
[docs] def poll(self, interval=10, verbose=True, kill_on_interrupt=True):
"""Monitor jobs through logging to stdout.
This method should only be used if jobs were launched
with ``Experiment.start(block=False)``
The internal specified will control how often the
logging is performed, not how often the polling occurs.
By default, internal polling is set to every second for
local launcher jobs and every 10 seconds for all other
launchers.
If internal polling needs to be slower or faster based on
system or site standards, set the ``SMARTSIM_JM_INTERNAL``
environment variable to control the internal polling interval
for SmartSim.
For more verbose logging output, the ``SMARTSIM_LOG_LEVEL``
environment variable can be set to `debug`
If `kill_on_interrupt=True`, then all jobs launched by this
experiment are guaranteed to be killed when ^C (SIGINT) signal is
received. If `kill_on_interrupt=False`, then it is not guaranteed
that all jobs launched by this experiment will be killed, and the
zombie processes will need to be manually killed.
:param interval: frequency (in seconds) of logging to stdout,
defaults to 10 seconds
:type interval: int, optional
:param verbose: set verbosity, defaults to True
:type verbose: bool, optional
:param kill_on_interrupt: flag for killing jobs when SIGINT is received
:type kill_on_interrupt: bool, optional
:raises SmartSimError:
"""
try:
self._control.poll(interval, verbose, kill_on_interrupt=kill_on_interrupt)
except SmartSimError as e:
logger.error(e)
raise
[docs] def finished(self, entity):
"""Query if a job has completed.
An instance of ``Model`` or ``Ensemble`` can be passed
as an argument.
Passing ``Orchestrator`` will return an error as a
database deployment is never finished until stopped
by the user.
:param entity: object launched by this ``Experiment``
:type entity: Model | Ensemble
:returns: True if job has completed, False otherwise
:rtype: bool
:raises SmartSimError: if entity has not been launched
by this ``Experiment``
"""
try:
return self._control.finished(entity)
except SmartSimError as e:
logger.error(e)
raise
[docs] def get_status(self, *args):
"""Query the status of launched instances
Return a smartsim.status string representing
the status of the launched instance.
.. highlight:: python
.. code-block:: python
exp.get_status(model)
As with an Experiment method, multiple instance of
varying types can be passed to and all statuses will
be returned at once.
.. highlight:: python
.. code-block:: python
statuses = exp.get_status(model, ensemble, orchestrator)
assert all([status == smartsim.status.STATUS_COMPLETED for status in statuses])
:returns: status of the instances passed as arguments
:rtype: list[str]
:raises SmartSimError: if status retrieval fails
"""
try:
manifest = Manifest(*args)
statuses = []
for entity in manifest.models:
statuses.append(self._control.get_entity_status(entity))
for entity_list in manifest.all_entity_lists:
statuses.extend(self._control.get_entity_list_status(entity_list))
return statuses
except SmartSimError as e:
logger.error(e)
raise
[docs] def create_ensemble(
self,
name,
params=None,
batch_settings=None,
run_settings=None,
replicas=None,
perm_strategy="all_perm",
**kwargs,
):
"""Create an ``Ensemble`` of ``Model`` instances
Ensembles can be launched sequentially or as a batch
if using a non-local launcher. e.g. slurm
Ensembles require one of the following combinations
of arguments
- ``run_settings`` and ``params``
- ``run_settings`` and ``replicas``
- ``batch_settings``
- ``batch_settings``, ``run_settings``, and ``params``
- ``batch_settings``, ``run_settings``, and ``replicas``
If given solely batch settings, an empty ensemble
will be created that models can be added to manually
through ``Ensemble.add_model()``.
The entire ensemble will launch as one batch.
Provided batch and run settings, either ``params``
or ``replicas`` must be passed and the entire ensemble
will launch as a single batch.
Provided solely run settings, either ``params``
or ``replicas`` must be passed and the ensemble members
will each launch sequentially.
The kwargs argument can be used to pass custom input
parameters to the permutation strategy.
:param name: name of the ensemble
:type name: str
:param params: parameters to expand into ``Model`` members
:type params: dict[str, Any]
:param batch_settings: describes settings for ``Ensemble`` as batch workload
:type batch_settings: BatchSettings
:param run_settings: describes how each ``Model`` should be executed
:type run_settings: RunSettings
:param replicas: number of replicas to create
:type replicas: int
:param perm_strategy: strategy for expanding ``params`` into
``Model`` instances from params argument
options are "all_perm", "stepped", "random"
or a callable function. Default is "all_perm".
:type perm_strategy: str, optional
:raises SmartSimError: if initialization fails
:return: ``Ensemble`` instance
:rtype: Ensemble
"""
try:
new_ensemble = Ensemble(
name,
params,
batch_settings=batch_settings,
run_settings=run_settings,
perm_strat=perm_strategy,
replicas=replicas,
**kwargs,
)
return new_ensemble
except SmartSimError as e:
logger.error(e)
raise
[docs] def create_model(
self, name, run_settings, params=None, path=None, enable_key_prefixing=False
):
"""Create a general purpose ``Model``
The ``Model`` class is the most general encapsulation of
executable code in SmartSim. ``Model`` instances are named
references to pieces of a workflow that can be parameterized,
and executed.
``Model`` instances can be launched sequentially or as a batch
by adding them into an ``Ensemble``.
Parameters supplied in the `params` argument can be written into
configuration files supplied at runtime to the model through
``Model.attach_generator_files``. `params` can also be turned
into executable arguments by calling ``Model.params_to_args``
By default, ``Model`` instances will be executed in the
current working directory if no `path` argument is supplied.
If a ``Model`` instance is passed to ``Experiment.generate``,
a directory within the ``Experiment`` directory will be created
to house the input and output files from the model.
Example initialization of a ``Model`` instance
.. highlight:: python
.. code-block:: python
from smartsim import Experiment
run_settings = exp.create_run_settings("python", "run_pytorch_model.py")
model = exp.create_model("pytorch_model", run_settings)
# adding parameters to a model
run_settings = exp.create_run_settings("python", "run_pytorch_model.py")
train_params = {
"batch": 32,
"epoch": 10,
"lr": 0.001
}
model = exp.create_model("pytorch_model", run_settings, params=params)
model.attach_generator_files(to_configure="./train.cfg")
exp.generate(model)
New in 0.4.0, ``Model`` instances can be co-located with an
Orchestrator database shard through ``Model.colocate_db``. This
will launch a single ``Orchestrator`` instance on each compute
host used by the (possibly distributed) application. This is
useful for performant online inference or processing
at runtime.
:param name: name of the model
:type name: str
:param run_settings: defines how ``Model`` should be run
:type run_settings: RunSettings
:param params: model parameters for writing into configuration files
:type params: dict, optional
:param path: path to where the model should be executed at runtime
:type path: str, optional
:param enable_key_prefixing: If True, data sent to the Orchestrator
using SmartRedis from this ``Model`` will
be prefixed with the ``Model`` name.
Default is True.
:type enable_key_prefixing: bool, optional
:raises SmartSimError: if initialization fails
:return: the created ``Model``
:rtype: Model
"""
path = init_default(getcwd(), path, str)
params = init_default({}, params, dict)
try:
new_model = Model(name, params, path, run_settings)
if enable_key_prefixing:
new_model.enable_key_prefixing()
return new_model
except SmartSimError as e:
logger.error(e)
raise
[docs] def create_run_settings(
self,
exe,
exe_args=None,
run_command="auto",
run_args=None,
env_vars=None,
container=None,
**kwargs,
):
"""Create a ``RunSettings`` instance.
run_command="auto" will attempt to automatically
match a run command on the system with a RunSettings
class in SmartSim. If found, the class corresponding
to that run_command will be created and returned.
If the local launcher is being used, auto detection will
be turned off.
If a recognized run command is passed, the ``RunSettings``
instance will be a child class such as ``SrunSettings``
If not supported by smartsim, the base ``RunSettings`` class
will be created and returned with the specified run_command and run_args
will be evaluated literally.
Run Commands with implemented helper classes:
- aprun (ALPS)
- srun (SLURM)
- mpirun (OpenMPI)
- jsrun (LSF)
:param run_command: command to run the executable
:type run_command: str
:param exe: executable to run
:type exe: str
:param exe_args: arguments to pass to the executable
:type exe_args: list[str], optional
:param run_args: arguments to pass to the ``run_command``
:type run_args: list[str], optional
:param env_vars: environment variables to pass to the executable
:type env_vars: dict[str, str], optional
:return: the created ``RunSettings``
:rtype: RunSettings
"""
try:
return settings.create_run_settings(
self._launcher,
exe,
exe_args=exe_args,
run_command=run_command,
run_args=run_args,
env_vars=env_vars,
container=container,
**kwargs,
)
except SmartSimError as e:
logger.error(e)
raise
[docs] def create_batch_settings(
self, nodes=1, time="", queue="", account="", batch_args=None, **kwargs
):
"""Create a ``BatchSettings`` instance
Batch settings parameterize batch workloads. The result of this
function can be passed to the ``Ensemble`` initialization.
the `batch_args` parameter can be used to pass in a dictionary
of additional batch command arguments that aren't supported through
the smartsim interface
.. highlight:: python
.. code-block:: python
# i.e. for Slurm
batch_args = {
"distribution": "block"
"exclusive": None
}
bs = exp.create_batch_settings(nodes=3,
time="10:00:00",
batch_args=batch_args)
bs.set_account("default")
:param nodes: number of nodes for batch job, defaults to 1
:type nodes: int, optional
:param time: length of batch job, defaults to ""
:type time: str, optional
:param queue: queue or partition (if slurm), defaults to ""
:type queue: str, optional
:param account: user account name for batch system, defaults to ""
:type account: str, optional
:param batch_args: additional batch arguments, defaults to None
:type batch_args: dict[str, str], optional
:return: a newly created BatchSettings instance
:rtype: BatchSettings
:raises SmartSimError: if batch creation fails
"""
try:
return settings.create_batch_settings(
self._launcher,
nodes=nodes,
time=time,
queue=queue,
account=account,
batch_args=batch_args,
**kwargs,
)
except SmartSimError as e:
logger.error(e)
raise
[docs] def create_database(
self,
port=6379,
db_nodes=1,
batch=False,
hosts=None,
run_command="auto",
interface="ipogif0",
account=None,
time=None,
queue=None,
single_cmd=True,
**kwargs,
):
"""Initialize an Orchestrator database
The ``Orchestrator`` database is a key-value store based
on Redis that can be launched together with other Experiment
created instances for online data storage.
When launched, ``Orchestrator`` can be used to communicate
data between Fortran, Python, C, and C++ applications.
Machine Learning models in Pytorch, Tensorflow, and ONNX (i.e. scikit-learn)
can also be stored within the Orchestrator database where they
can be called remotely and executed on CPU or GPU where
the database is hosted.
To enable a SmartSim ``Model`` to communicate with the database
the workload must utilize the SmartRedis clients. For more
information on the database, and SmartRedis clients see the
documentation at www.craylabs.org
:param port: TCP/IP port, defaults to 6379
:type port: int, optional
:param db_nodes: number of database shards, defaults to 1
:type db_nodes: int, optional
:param batch: run as a batch workload, defaults to False
:type batch: bool, optional
:param hosts: specify hosts to launch on, defaults to None
:type hosts: list[str], optional
:param run_command: specify launch binary or detect automatically, defaults to "auto"
:type run_command: str, optional
:param interface: Network interface, defaults to "ipogif0"
:type interface: str, optional
:param account: account to run batch on, defaults to None
:type account: str, optional
:param time: walltime for batch 'HH:MM:SS' format, defaults to None
:type time: str, optional
:param queue: queue to run the batch on, defaults to None
:type queue: str, optional
:param single_cmd: run all shards with one (MPMD) command, defaults to True
:type single_cmd: bool, optional
:raises SmartSimError: if detection of launcher or of run command fails
:raises SmartSimError: if user indicated an incompatible run command for the launcher
:return: Orchestrator
:rtype: Orchestrator or derived class
"""
return Orchestrator(
port=port,
db_nodes=db_nodes,
batch=batch,
hosts=hosts,
run_command=run_command,
interface=interface,
account=account,
time=time,
queue=queue,
single_cmd=single_cmd,
launcher=self._launcher,
**kwargs,
)
[docs] def reconnect_orchestrator(self, checkpoint):
"""Reconnect to a running ``Orchestrator``
This method can be used to connect to a ``Orchestrator`` deployment
that was launched by a previous ``Experiment``. This can be
helpful in the case where separate runs of an ``Experiment``
wish to use the same ``Orchestrator`` instance currently
running on a system.
:param checkpoint: the `smartsim_db.dat` file created
when an ``Orchestrator`` is launched
:type checkpoint: str
"""
try:
orc = self._control.reload_saved_db(checkpoint)
return orc
except SmartSimError as e:
logger.error(e)
raise
[docs] def summary(self, format="github"):
"""Return a summary of the ``Experiment``
The summary will show each instance that has been
launched and completed in this ``Experiment``
:param format: the style in which the summary table is formatted,
for a full list of styles see:
https://github.com/astanin/python-tabulate#table-format,
defaults to "github"
:type format: str, optional
:return: tabulate string of ``Experiment`` history
:rtype: str
"""
values = []
headers = [
"Name",
"Entity-Type",
"JobID",
"RunID",
"Time",
"Status",
"Returncode",
]
for job in self._control.get_jobs().values():
for run in range(job.history.runs + 1):
values.append(
[
job.entity.name,
job.entity.type,
job.history.jids[run],
run,
job.history.job_times[run],
job.history.statuses[run],
job.history.returns[run],
]
)
else:
return tabulate(
values, headers, showindex=True, tablefmt=format, missingval="None"
)
def _launch_summary(self, manifest):
"""Experiment pre-launch summary of entities that will be launched
:param manifest: Manifest of deployables.
:type manifest: Manifest
"""
summary = "\n\n=== Launch Summary ===\n"
summary += f"Experiment: {self.name}\n"
summary += f"Experiment Path: {self.exp_path}\n"
summary += f"Launcher: {self._launcher}\n"
if manifest.ensembles or manifest.ray_clusters:
summary += (
f"Ensembles: {len(manifest.ensembles) + len(manifest.ray_clusters)}\n"
)
if manifest.models:
summary += f"Models: {len(manifest.models)}\n"
if self._control.orchestrator_active:
summary += f"Database Status: active\n"
elif manifest.db:
summary += f"Database Status: launching\n"
else:
summary += f"Database Status: inactive\n"
summary += f"\n{str(manifest)}"
logger.info(summary)
wait, steps = 10, 100
prog_bar = trange(
steps,
desc="Launching in...",
leave=False,
ncols=80,
mininterval=0.25,
bar_format="{desc}: {bar}| {remaining} {elapsed}",
)
for _ in prog_bar:
time.sleep(wait / steps)
def __str__(self):
return self.name