# BSD 2-Clause License
#
# Copyright (c) 2021-2022, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import itertools
import sys
from os import getcwd
from shlex import split as sh_split
from warnings import simplefilter, warn
import psutil
from smartredis import Client
from smartredis.error import RedisReplyError
from .._core.config import CONFIG
from .._core.utils import db_is_active
from .._core.utils.helpers import is_valid_cmd
from .._core.utils.network import get_ip_from_host
from ..entity import DBNode, EntityList
from ..error import SmartSimError, SSConfigError, SSUnsupportedError
from ..log import get_logger
from ..settings import (
AprunSettings,
BsubBatchSettings,
CobaltBatchSettings,
JsrunSettings,
MpirunSettings,
QsubBatchSettings,
SbatchSettings,
SrunSettings,
)
from ..settings.settings import create_batch_settings, create_run_settings
from ..wlm import detect_launcher
logger = get_logger(__name__)
[docs]class Orchestrator(EntityList):
"""The Orchestrator is an in-memory database that can be launched
alongside entities in SmartSim. Data can be transferred between
entities by using one of the Python, C, C++ or Fortran clients
within an entity.
"""
def __init__(
self,
port=6379,
interface="lo",
launcher="local",
run_command="auto",
db_nodes=1,
batch=False,
hosts=None,
account=None,
time=None,
alloc=None,
single_cmd=False,
**kwargs,
):
"""Initialize an Orchestrator reference for local launch
:param port: TCP/IP port, defaults to 6379
:type port: int, optional
:param interface: network interface, defaults to "lo"
:type interface: str, optional
Extra configurations for RedisAI
See https://oss.redislabs.com/redisai/configuration/
:param threads_per_queue: threads per GPU device
:type threads_per_queue: int, optional
:param inter_op_threads: threads accross CPU operations
:type inter_op_threads: int, optional
:param intra_op_threads: threads per CPU operation
:type intra_op_threads: int, optional
"""
if launcher == "auto":
launcher = detect_launcher()
by_launcher = {
"slurm": ["srun", "mpirun"],
"pbs": ["aprun", "mpirun"],
"cobalt": ["aprun", "mpirun"],
"lsf": ["jsrun"],
"local": [None],
}
def _detect_command(launcher):
if launcher in by_launcher:
for cmd in by_launcher[launcher]:
if launcher == "local":
return cmd
if is_valid_cmd(cmd):
return cmd
msg = f"Could not automatically detect a run command to use for launcher {launcher}"
msg += f"\nSearched for and could not find the following commands: {by_launcher[launcher]}"
raise SmartSimError(msg)
if run_command == "auto":
run_command = _detect_command(launcher)
if run_command not in by_launcher[launcher]:
msg = f"Run command {run_command} is not supported on launcher {launcher}\n"
msg += f"Supported run commands for the given launcher are: {by_launcher[launcher]}"
raise SmartSimError(msg)
if launcher == "local" and batch:
msg = "Local orchestrator can not be launched with batch=True"
raise SmartSimError(msg)
if run_command == "aprun" and batch and single_cmd:
msg = "aprun can not launch an orchestrator with batch=True and single_cmd=True. "
msg += "Automatically switching to single_cmd=False."
logger.info(msg)
single_cmd = False
self.launcher = launcher
self.run_command = run_command
self.ports = []
self.path = getcwd()
self._hosts = []
self._interface = interface
self._check_network_interface()
self.queue_threads = kwargs.get("threads_per_queue", None)
self.inter_threads = kwargs.get("inter_op_threads", None)
self.intra_threads = kwargs.get("intra_op_threads", None)
if self.launcher == "lsf":
gpus_per_shard = kwargs.pop("gpus_per_shard", 0)
cpus_per_shard = kwargs.pop("cpus_per_shard", 4)
else:
gpus_per_shard = None
cpus_per_shard = None
super().__init__(
"orchestrator",
self.path,
port=port,
interface=interface,
db_nodes=db_nodes,
batch=batch,
launcher=launcher,
run_command=run_command,
alloc=alloc,
single_cmd=single_cmd,
gpus_per_shard=gpus_per_shard,
cpus_per_shard=cpus_per_shard,
**kwargs,
)
# detect if we can find at least the redis binaries. We
# don't want to force the user to launch with RedisAI so
# it's ok if that isn't present.
try:
# try to obtain redis binaries needed to launch Redis
# will raise SSConfigError if not found
self._redis_exe
self._redis_conf
CONFIG.database_cli
except SSConfigError as e:
msg = "SmartSim not installed with pre-built extensions (Redis)\n"
msg += "Use the `smart` cli tool to install needed extensions\n"
msg += "or set REDIS_PATH and REDIS_CLI_PATH in your environment\n"
msg += "See documentation for more information"
raise SSConfigError(msg) from e
if launcher != "local":
self.batch_settings = self._build_batch_settings(
db_nodes, alloc, batch, account, time, launcher=launcher, **kwargs
)
if hosts:
self.set_hosts(hosts)
elif not hosts and run_command == "mpirun":
raise SmartSimError(
"hosts argument is required when launching Orchestrator with mpirun"
)
self._reserved_run_args = {}
self._reserved_batch_args = {}
self._fill_reserved()
@property
def num_shards(self):
"""Return the number of DB shards contained in the orchestrator.
This might differ from the number of ``DBNode`` objects, as each
``DBNode`` may start more than one shard (e.g. with MPMD).
:returns: num_shards
:rtype: int
"""
return self.db_nodes
@property
def hosts(self):
"""Return the hostnames of orchestrator instance hosts
Note that this will only be populated after the orchestrator
has been launched by SmartSim.
:return: hostnames
:rtype: list[str]
"""
if not self._hosts:
self._hosts = self._get_db_hosts()
return self._hosts
[docs] def remove_stale_files(self):
"""Can be used to remove database files of a previous launch"""
for dbnode in self.entities:
dbnode.remove_stale_dbnode_files()
[docs] def get_address(self):
"""Return database addresses
:return: addresses
:rtype: list[str]
:raises SmartSimError: If database address cannot be found or is not active
"""
if not self._hosts:
raise SmartSimError("Could not find database address")
elif not self.is_active():
raise SmartSimError("Database is not active")
return self._get_address()
def _get_address(self):
addresses = []
for ip, port in itertools.product(self._hosts, self.ports):
addresses.append(":".join((ip, str(port))))
return addresses
[docs] def is_active(self):
"""Check if the database is active
:return: True if database is active, False otherwise
:rtype: bool
"""
if not self._hosts:
return False
return db_is_active(self._hosts, self.ports, self.num_shards)
@property
def _rai_module(self):
"""Get the RedisAI module from third-party installations
:return: path to module or "" if not found
:rtype: str
"""
module = ["--loadmodule", CONFIG.redisai]
if self.queue_threads:
module.append(f"THREADS_PER_QUEUE {self.queue_threads}")
if self.inter_threads:
module.append(f"INTER_OP_PARALLELISM {self.inter_threads}")
if self.intra_threads:
module.append(f"INTRA_OP_PARALLELISM {self.intra_threads}")
return " ".join(module)
@property
def _redis_exe(self):
return CONFIG.database_exe
@property
def _redis_conf(self):
return CONFIG.database_conf
[docs] def set_cpus(self, num_cpus):
"""Set the number of CPUs available to each database shard
This effectively will determine how many cpus can be used for
compute threads, background threads, and network I/O.
:param num_cpus: number of cpus to set
:type num_cpus: int
"""
if self.batch:
if self.launcher == "pbs" or self.launcher == "cobalt":
self.batch_settings.set_ncpus(num_cpus)
if self.launcher == "slurm":
self.batch_settings.set_cpus_per_task(num_cpus)
for db in self:
db.run_settings.set_cpus_per_task(num_cpus)
if db._mpmd:
for mpmd in db.run_settings.mpmd:
mpmd.set_cpus_per_task(num_cpus)
[docs] def set_walltime(self, walltime):
"""Set the batch walltime of the orchestrator
Note: This will only effect orchestrators launched as a batch
:param walltime: amount of time e.g. 10 hours is 10:00:00
:type walltime: str
:raises SmartSimError: if orchestrator isn't launching as batch
"""
if not self.batch:
raise SmartSimError("Not running as batch, cannot set walltime")
self.batch_settings.set_walltime(walltime)
[docs] def set_hosts(self, host_list):
"""Specify the hosts for the ``Orchestrator`` to launch on
:param host_list: list of host (compute node names)
:type host_list: str, list[str]
:raises TypeError: if wrong type
"""
if isinstance(host_list, str):
host_list = [host_list.strip()]
if not isinstance(host_list, list):
raise TypeError("host_list argument must be a list of strings")
if not all([isinstance(host, str) for host in host_list]):
raise TypeError("host_list argument must be list of strings")
# TODO check length
if self.batch:
self.batch_settings.set_hostlist(host_list)
if self.launcher == "lsf":
for db in self:
db.set_hosts(host_list)
else:
for host, db in zip(host_list, self.entities):
if isinstance(db.run_settings, AprunSettings):
if not self.batch:
db.run_settings.set_hostlist([host])
else:
db.run_settings.set_hostlist([host])
if db._mpmd:
for i, mpmd_runsettings in enumerate(db.run_settings.mpmd):
mpmd_runsettings.set_hostlist(host_list[i + 1])
[docs] def set_batch_arg(self, arg, value):
"""Set a batch argument the orchestrator should launch with
Some commonly used arguments such as --job-name are used
by SmartSim and will not be allowed to be set.
:param arg: batch argument to set e.g. "exclusive"
:type arg: str
:param value: batch param - set to None if no param value
:type value: str | None
:raises SmartSimError: if orchestrator not launching as batch
"""
if not self.batch:
raise SmartSimError("Not running as batch, cannot set batch_arg")
if arg in self._reserved_batch_args[type(self.batch_settings)]:
logger.warning(
f"Can not set batch argument {arg}: it is a reserved keyword in Orchestrator"
)
else:
self.batch_settings.batch_args[arg] = value
[docs] def set_run_arg(self, arg, value):
"""Set a run argument the orchestrator should launch
each node with (it will be passed to `jrun`)
Some commonly used arguments are used
by SmartSim and will not be allowed to be set.
For example, "n", "N", etc.
:param arg: run argument to set
:type arg: str
:param value: run parameter - set to None if no parameter value
:type value: str | None
"""
if arg in self._reserved_run_args[type(self.entities[0].run_settings)]:
logger.warning(
f"Can not set run argument {arg}: it is a reserved keyword in Orchestrator"
)
else:
for db in self.entities:
db.run_settings.run_args[arg] = value
if db._mpmd:
for mpmd in db.run_settings.mpmd:
mpmd.run_args[arg] = value
[docs] def enable_checkpoints(self, frequency):
"""Sets the database's save configuration to save the
DB every 'frequency' seconds given that at least one
write operation against the DB occurred in that time.
For example, if `frequency` is 900, then the database
will save to disk after 900 seconds if there is at least
1 change to the dataset.
:param frequency: the given number of seconds before the DB saves
:type frequency: int
"""
self.set_db_conf("save", str(frequency) + " 1")
[docs] def set_max_memory(self, mem):
"""Sets the max memory configuration. By default there is no memory limit.
Setting max memory to zero also results in no memory limit. Once a limit is
surpassed, keys will be removed according to the eviction strategy. The
specified memory size is case insensitive and supports the typical forms of:
1k => 1000 bytes
1kb => 1024 bytes
1m => 1000000 bytes
1mb => 1024*1024 bytes
1g => 1000000000 bytes
1gb => 1024*1024*1024 bytes
:param mem: the desired max memory size e.g. 3gb
:type mem: str
:raises SmartSimError: If 'mem' is an invalid memory value
:raises SmartSimError: If database is not active
"""
self.set_db_conf("maxmemory", mem)
[docs] def set_eviction_strategy(self, strategy):
"""Sets how the database will select what to remove when
'maxmemory' is reached. The default is noeviction.
:param strategy: The max memory policy to use e.g. "volatile-lru", "allkeys-lru", etc.
:type strategy: str
:raises SmartSimError: If 'strategy' is an invalid maxmemory policy
:raises SmartSimError: If database is not active
"""
self.set_db_conf("maxmemory-policy", strategy)
[docs] def set_max_clients(self, clients=50_000):
"""Sets the max number of connected clients at the same time.
When the number of DB shards contained in the orchestrator is
more than two, then every node will use two connections, one
incoming and another outgoing.
:param clients: the maximum number of connected clients
:type clients: int, optional
"""
self.set_db_conf("maxclients", str(clients))
[docs] def set_max_message_size(self, size=1_073_741_824):
"""Sets the database's memory size limit for bulk requests,
which are elements representing single strings. The default
is 1 gigabyte. Message size must be greater than or equal to 1mb.
The specified memory size should be an integer that represents
the number of bytes. For example, to set the max message size
to 1gb, use 1024*1024*1024.
:param size: maximum message size in bytes
:type size: int, optional
"""
self.set_db_conf("proto-max-bulk-len", str(size))
[docs] def set_db_conf(self, key, value):
"""Set any valid configuration at runtime without the need
to restart the database. All configuration parameters
that are set are immediately loaded by the database and
will take effect starting with the next command executed.
:param key: the configuration parameter
:type key: str
:param value: the database configuration parameter's new value
:type value: str
"""
if self.is_active():
addresses = []
for host in self.hosts:
for port in self.ports:
address = ":".join([get_ip_from_host(host), str(port)])
addresses.append(address)
is_cluster = self.num_shards > 2
client = Client(address=addresses[0], cluster=is_cluster)
try:
for address in addresses:
client.config_set(key, value, address)
except RedisReplyError:
raise SmartSimError(
f"Invalid CONFIG key-value pair ({key}: {value})"
) from None
except TypeError:
raise TypeError(
"Incompatible function arguments. The key and value used for setting the database configurations must be strings."
) from None
else:
raise SmartSimError(
"The SmartSim Orchestrator must be active in order to set the database's configurations."
)
def _build_batch_settings(self, db_nodes, alloc, batch, account, time, **kwargs):
batch_settings = None
launcher = kwargs.pop("launcher")
# enter this conditional if user has not specified an allocation to run
# on or if user specified batch=False (alloc will be found through env)
if not alloc and batch:
batch_settings = create_batch_settings(
launcher, nodes=db_nodes, time=time, account=account, **kwargs
)
return batch_settings
def _build_run_settings(self, exe, exe_args, **kwargs):
run_args = kwargs.pop("run_args", {})
db_nodes = kwargs.get("db_nodes", 1)
single_cmd = kwargs.get("single_cmd", True)
mpmd_nodes = single_cmd and db_nodes > 1
if mpmd_nodes:
run_settings = create_run_settings(
exe=exe, exe_args=exe_args[0], run_args=run_args.copy(), **kwargs
)
if self.launcher != "local":
run_settings.set_tasks(1)
for exe_arg in exe_args[1:]:
mpmd_run_settings = create_run_settings(
exe=exe, exe_args=exe_arg, run_args=run_args.copy(), **kwargs
)
mpmd_run_settings.set_tasks(1)
mpmd_run_settings.set_tasks_per_node(1)
run_settings.make_mpmd(mpmd_run_settings)
else:
run_settings = create_run_settings(
exe=exe, exe_args=exe_args, run_args=run_args.copy(), **kwargs
)
if self.launcher != "local":
run_settings.set_tasks(1)
if self.launcher != "local":
run_settings.set_tasks_per_node(1)
# Put it back in case it is needed again
kwargs["run_args"] = run_args
return run_settings
def _build_run_settings_lsf(self, exe, exe_args, **kwargs):
run_args = kwargs.pop("run_args", {})
cpus_per_shard = kwargs.get("cpus_per_shard", None)
gpus_per_shard = kwargs.get("gpus_per_shard", None)
erf_rs = None
# We always run the DB on cpus 0:cpus_per_shard-1
# and gpus 0:gpus_per_shard-1
for shard_id, args in enumerate(exe_args):
host = shard_id
run_args["launch_distribution"] = "packed"
run_settings = JsrunSettings(exe, args, run_args=run_args.copy())
run_settings.set_binding("none")
# This makes sure output is written to orchestrator_0.out, orchestrator_1.out, and so on
run_settings.set_individual_output("_%t")
erf_sets = {
"rank": str(shard_id),
"host": str(1 + host),
"cpu": "{" + f"0:{cpus_per_shard}" + "}",
}
if gpus_per_shard > 1: # pragma: no-cover
erf_sets["gpu"] = "{" + f"0-{gpus_per_shard-1}" + "}"
elif gpus_per_shard > 0:
erf_sets["gpu"] = "{" + str(0) + "}"
run_settings.set_erf_sets(erf_sets)
if erf_rs:
erf_rs.make_mpmd(run_settings)
else:
run_settings.make_mpmd()
erf_rs = run_settings
kwargs["run_args"] = run_args
return erf_rs
def _initialize_entities(self, **kwargs):
self.db_nodes = kwargs.get("db_nodes", 1)
single_cmd = kwargs.get("single_cmd", True)
if int(self.db_nodes) == 2:
raise SSUnsupportedError("Orchestrator does not support clusters of size 2")
if self.launcher == "local" and self.db_nodes > 1:
raise ValueError(
"Local Orchestrator does not support multiple database shards"
)
mpmd_nodes = (single_cmd and self.db_nodes > 1) or self.launcher == "lsf"
if mpmd_nodes:
self._initialize_entities_mpmd(**kwargs)
else:
port = kwargs.get("port", 6379)
cluster = not bool(self.db_nodes < 3)
for db_id in range(self.db_nodes):
db_node_name = "_".join((self.name, str(db_id)))
# create the exe_args list for launching multiple databases
# per node. also collect port range for dbnode
start_script_args = self._get_start_script_args(
db_node_name, port, cluster
)
exe_args = " ".join(start_script_args)
# if only launching 1 db per command, we don't need a list of exe args lists
run_settings = self._build_run_settings(
sys.executable, exe_args, **kwargs
)
node = DBNode(
db_node_name,
self.path,
run_settings,
[port],
[db_node_name + ".out"],
)
self.entities.append(node)
self.ports = [port]
def _initialize_entities_mpmd(self, **kwargs):
port = kwargs.get("port", 6379)
cluster = not bool(self.db_nodes < 3)
exe_args_mpmd = []
for db_id in range(self.db_nodes):
db_shard_name = "_".join((self.name, str(db_id)))
# create the exe_args list for launching multiple databases
# per node. also collect port range for dbnode
start_script_args = self._get_start_script_args(
db_shard_name, port, cluster
)
exe_args = " ".join(start_script_args)
exe_args_mpmd.append(sh_split(exe_args))
if self.launcher == "lsf":
run_settings = self._build_run_settings_lsf(
sys.executable, exe_args_mpmd, **kwargs
)
output_files = [
"_".join((self.name, str(db_id))) + ".out"
for db_id in range(self.db_nodes)
]
else:
run_settings = self._build_run_settings(
sys.executable, exe_args_mpmd, **kwargs
)
output_files = [self.name + ".out"]
node = DBNode(self.name, self.path, run_settings, [port], output_files)
node._mpmd = True
node._num_shards = self.db_nodes
self.entities.append(node)
self.ports = [port]
@staticmethod
def _get_cluster_args(name, port):
"""Create the arguments necessary for cluster creation"""
cluster_conf = "".join(("nodes-", name, "-", str(port), ".conf"))
db_args = ["--cluster-enabled yes", "--cluster-config-file", cluster_conf]
return db_args
def _get_start_script_args(self, name, port, cluster):
start_script_args = [
"-m",
"smartsim._core.entrypoints.redis", # entrypoint
f"+ifname={self._interface}", # pass interface to start script
"+command", # command flag for argparser
self._redis_exe, # redis-server
self._redis_conf, # redis6.conf file
self._rai_module, # redisai.so
"--port", # redis port
str(port), # port number
]
if cluster:
start_script_args += self._get_cluster_args(name, port)
return start_script_args
def _get_db_hosts(self):
hosts = []
for dbnode in self.entities:
if not dbnode._mpmd:
hosts.append(dbnode.host)
else:
hosts.extend(dbnode.hosts)
return hosts
def _check_network_interface(self):
net_if_addrs = psutil.net_if_addrs()
if self._interface not in net_if_addrs and self._interface != "lo":
available = list(net_if_addrs.keys())
logger.warning(
f"{self._interface} is not a valid network interface on this node. \n"
"This could be because the head node doesn't have the same networks, if so, ignore this."
)
logger.warning(f"Found network interfaces are: {available}")
def _fill_reserved(self):
"""Fill the reserved batch and run arguments dictionaries"""
self._reserved_run_args[MpirunSettings] = [
"np",
"N",
"c",
"output-filename",
"n",
"wdir",
"wd",
"host",
]
self._reserved_run_args[SrunSettings] = [
"nodes",
"N",
"ntasks",
"n",
"ntasks-per-node",
"output",
"o",
"error",
"e",
"job-name",
"J",
"jobid",
"multi-prog",
"w",
"chdir",
"D",
]
self._reserved_run_args[AprunSettings] = [
"pes",
"n",
"pes-per-node",
"N",
"l",
"pes-per-numa-node",
"S",
"wdir",
]
self._reserved_batch_args[SbatchSettings] = [
"nodes",
"N",
"ntasks",
"n",
"ntasks-per-node",
"output",
"o",
"error",
"e",
"job-name",
"J",
"jobid",
"multi-prog",
"w",
"chdir",
"D",
]
self._reserved_batch_args[CobaltBatchSettings] = [
"cwd",
"error",
"e",
"output",
"o",
"outputprefix",
"N",
"l",
"jobname",
]
self._reserved_batch_args[QsubBatchSettings] = ["e", "o", "N", "l"]
self._reserved_run_args[JsrunSettings] = [
"chdir",
"h",
"stdio_stdout",
"o",
"stdio_stderr",
"k",
"tasks_per_rs",
"a",
"np",
"p",
"cpu_per_rs",
"c",
"gpu_per_rs",
"g",
"latency_priority",
"l",
"memory_per_rs",
"m",
"nrs",
"n",
"rs_per_host",
"r",
"rs_per_socket",
"K",
"appfile",
"f",
"allocate_only",
"A",
"launch_node_task",
"H",
"use_reservation",
"J",
"use_resources",
"bind",
"b",
"launch_distribution",
"d",
]
self._reserved_batch_args[BsubBatchSettings] = [
"J",
"o",
"e",
"m",
"n",
"nnodes",
]
#
# Deprecated Orchestrator Classes
#
# Same functionality incorporated into the Orchestrator base class
#
class CobaltOrchestrator(Orchestrator):
def __init__(
self,
port=6379,
db_nodes=1,
batch=True,
hosts=None,
run_command="aprun",
interface="ipogif0",
account=None,
queue=None,
time=None,
single_cmd=True,
**kwargs,
):
"""Initialize an Orchestrator reference for Cobalt based systems
The orchestrator launches as a batch by default. If batch=False,
at launch, the orchestrator will look for an interactive
allocation to launch on.
The Cobalt orchestrator does not support multiple databases per node.
:param port: TCP/IP port, defaults to 6379
:type port: int
:param db_nodes: number of database shards, defaults to 1
:type db_nodes: int, optional
:param batch: Run as a batch workload, defaults to True
:type batch: bool, optional
:param hosts: specify hosts to launch on, defaults to None. Optional if not launching with OpenMPI
:type hosts: list[str]
:param run_command: specify launch binary. Options are ``mpirun`` and ``aprun``, defaults to ``aprun``.
:type run_command: str, optional
:param interface: network interface to use, defaults to "ipogif0"
:type interface: str, optional
:param account: account to run batch on
:type account: str, optional
:param queue: queue to launch batch in
:type queue: str, optional
:param time: walltime for batch 'HH:MM:SS' format
:type time: str, optional
"""
simplefilter("once", DeprecationWarning)
msg = "CobaltOrchestrator(...) is deprecated and will be removed in a future release.\n"
msg += "Please update your code to use Orchestrator(launcher='cobalt', ...)."
warn(msg, DeprecationWarning, stacklevel=2)
super().__init__(
port,
interface,
db_nodes=db_nodes,
batch=batch,
run_command=run_command,
single_cmd=single_cmd,
launcher="cobalt",
hosts=hosts,
account=account,
queue=queue,
time=time,
**kwargs,
)
class LSFOrchestrator(Orchestrator):
def __init__(
self,
port=6379,
db_nodes=1,
cpus_per_shard=4,
gpus_per_shard=0,
batch=True,
hosts=None,
project=None,
time=None,
interface="ib0",
single_cmd=True,
**kwargs,
):
"""Initialize an Orchestrator reference for LSF based systems
The orchestrator launches as a batch by default. If
batch=False, at launch, the orchestrator will look for an interactive
allocation to launch on.
The LSFOrchestrator port provided will be incremented if multiple
databases per host are launched (``db_per_host>1``).
Each database shard is assigned a resource set with cpus and gpus
allocated contiguously on the host:
it is the user's responsibility to check if
enough resources are available on each host.
A list of hosts to launch the database on can be specified
these addresses must correspond to
those of the first ``db_nodes//db_per_host`` compute nodes
in the allocation: for example, for 8 ``db_nodes`` and 2 ``db_per_host``
the ``host_list`` must contain the addresses of hosts 1, 2, 3, and 4.
``LSFOrchestrator`` is launched with only one ``jsrun`` command
as launch binary, and an Explicit Resource File (ERF) which is
automatically generated. The orchestrator is always launched on the
first ``db_nodes//db_per_host`` compute nodes in the allocation.
:param port: TCP/IP port
:type port: int
:param db_nodes: number of database shards, defaults to 1
:type db_nodes: int, optional
:param cpus_per_shard: cpus to allocate per shard, defaults to 4
:type cpus_per_shard: int, optional
:param gpus_per_shard: gpus to allocate per shard, defaults to 0
:type gpus_per_shard: int, optional
:param batch: Run as a batch workload, defaults to True
:type batch: bool, optional
:param hosts: specify hosts to launch on
:type hosts: list[str], optional
:param project: project to run batch on
:type project: str, optional
:param time: walltime for batch 'HH:MM' format
:type time: str, optional
:param interface: network interface to use
:type interface: str
"""
simplefilter("once", DeprecationWarning)
msg = "LSFOrchestrator(...) is deprecated and will be removed in a future release.\n"
msg += "Please update your code to use Orchestrator(launcher='lsf', ...)."
warn(msg, DeprecationWarning, stacklevel=2)
if single_cmd != True:
raise SSUnsupportedError(
"LSFOrchestrator can only be run with single_cmd=True (MPMD)."
)
super().__init__(
port,
interface,
db_nodes=db_nodes,
batch=batch,
run_command="jsrun",
launcher="lsf",
project=project,
hosts=hosts,
time=time,
cpus_per_shard=cpus_per_shard,
gpus_per_shard=gpus_per_shard,
**kwargs,
)
class SlurmOrchestrator(Orchestrator):
def __init__(
self,
port=6379,
db_nodes=1,
batch=True,
hosts=None,
run_command="srun",
account=None,
time=None,
alloc=None,
db_per_host=1,
interface="ipogif0",
single_cmd=False,
**kwargs,
):
"""Initialize an Orchestrator reference for Slurm based systems
The orchestrator launches as a batch by default. The Slurm orchestrator
can also be given an allocation to run on. If no allocation is provided,
and batch=False, at launch, the orchestrator will look for an interactive
allocation to launch on.
The SlurmOrchestrator port provided will be incremented if multiple
databases per node are launched.
SlurmOrchestrator supports launching with both ``srun`` and ``mpirun``
as launch binaries. If mpirun is used, the hosts parameter should be
populated with length equal to that of the ``db_nodes`` argument.
:param port: TCP/IP port
:type port: int
:param db_nodes: number of database shards, defaults to 1
:type db_nodes: int, optional
:param batch: Run as a batch workload, defaults to True
:type batch: bool, optional
:param hosts: specify hosts to launch on
:type hosts: list[str]
:param run_command: specify launch binary. Options are "mpirun" and "srun", defaults to "srun"
:type run_command: str, optional
:param account: account to run batch on
:type account: str, optional
:param time: walltime for batch 'HH:MM:SS' format
:type time: str, optional
:param alloc: allocation to launch on, defaults to None
:type alloc: str, optional
:param db_per_host: number of database shards per system host (MPMD), defaults to 1
:type db_per_host: int, optional
:param single_cmd: run all shards with one (MPMD) command, defaults to True
:type single_cmd: bool
"""
simplefilter("once", DeprecationWarning)
msg = "SlurmOrchestrator(...) is deprecated and will be removed in a future release.\n"
msg += "Please update your code to use Orchestrator(launcher='slurm', ...)."
warn(msg, DeprecationWarning, stacklevel=2)
super().__init__(
port,
interface,
db_nodes=db_nodes,
batch=batch,
run_command=run_command,
alloc=alloc,
db_per_host=db_per_host,
single_cmd=single_cmd,
launcher="slurm",
account=account,
hosts=hosts,
time=time,
**kwargs,
)
class PBSOrchestrator(Orchestrator):
def __init__(
self,
port=6379,
db_nodes=1,
batch=True,
hosts=None,
run_command="aprun",
interface="ipogif0",
account=None,
time=None,
queue=None,
single_cmd=True,
**kwargs,
):
"""Initialize an Orchestrator reference for PBSPro based systems
The ``PBSOrchestrator`` launches as a batch by default. If batch=False,
at launch, the ``PBSOrchestrator`` will look for an interactive
allocation to launch on.
The PBS orchestrator does not support multiple databases per node.
If ``mpirun`` is specifed as the ``run_command``, then the ``hosts``
argument is required.
:param port: TCP/IP port
:type port: int
:param db_nodes: number of compute nodes to span accross, defaults to 1
:type db_nodes: int, optional
:param batch: run as a batch workload, defaults to True
:type batch: bool, optional
:param hosts: specify hosts to launch on, defaults to None
:type hosts: list[str]
:param run_command: specify launch binary. Options are ``mpirun`` and ``aprun``, defaults to "aprun"
:type run_command: str, optional
:param interface: network interface to use, defaults to "ipogif0"
:type interface: str, optional
:param account: account to run batch on
:type account: str, optional
:param time: walltime for batch 'HH:MM:SS' format
:type time: str, optional
:param queue: queue to launch batch in
:type queue: str, optional
"""
simplefilter("once", DeprecationWarning)
msg = "PBSOrchestrator(...) is deprecated and will be removed in a future release.\n"
msg += "Please update your code to use Orchestrator(launcher='pbs', ...)."
warn(msg, DeprecationWarning, stacklevel=2)
super().__init__(
port,
interface,
db_nodes=db_nodes,
batch=batch,
run_command=run_command,
single_cmd=single_cmd,
launcher="pbs",
hosts=hosts,
account=account,
queue=queue,
time=time,
**kwargs,
)