# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import annotations
import copy
import typing as t
from pprint import pformat
from ..error import SSUnsupportedError
from ..log import get_logger
from .base import BatchSettings, RunSettings
logger = get_logger(__name__)
[docs]class JsrunSettings(RunSettings):
def __init__(
self,
exe: str,
exe_args: t.Optional[t.Union[str, t.List[str]]] = None,
run_args: t.Optional[t.Dict[str, t.Union[int, str, float, None]]] = None,
env_vars: t.Optional[t.Dict[str, t.Optional[str]]] = None,
**_kwargs: t.Any,
) -> None:
"""Settings to run job with ``jsrun`` command
``JsrunSettings`` should only be used on LSF-based systems.
:param exe: executable
:param exe_args: executable arguments
:param run_args: arguments for run command
:param env_vars: environment vars to launch job with
"""
super().__init__(
exe,
exe_args,
run_command="jsrun",
run_args=run_args,
env_vars=env_vars,
)
# Parameters needed for MPMD run
self.erf_sets = {"host": "*", "cpu": "*", "ranks": "1"}
self.mpmd_preamble_lines: t.List[str] = []
self.mpmd: t.List[RunSettings] = []
self.individual_suffix = ""
reserved_run_args = {"chdir", "h"}
[docs] def set_num_rs(self, num_rs: t.Union[str, int]) -> None:
"""Set the number of resource sets to use
This sets ``--nrs``.
:param num_rs: Number of resource sets or `ALL_HOSTS`
"""
if isinstance(num_rs, str):
self.run_args["nrs"] = num_rs
else:
self.run_args["nrs"] = int(num_rs)
[docs] def set_cpus_per_rs(self, cpus_per_rs: int) -> None:
"""Set the number of cpus to use per resource set
This sets ``--cpu_per_rs``
:param cpus_per_rs: number of cpus to use per resource set or ALL_CPUS
"""
if self.colocated_db_settings:
db_cpus = int(t.cast(int, self.colocated_db_settings.get("db_cpus", 0)))
if not db_cpus:
raise ValueError("db_cpus must be configured on colocated_db_settings")
if cpus_per_rs < db_cpus:
raise ValueError(
f"Cannot set cpus_per_rs ({cpus_per_rs}) to less than "
+ f"db_cpus ({db_cpus})"
)
if isinstance(cpus_per_rs, str):
self.run_args["cpu_per_rs"] = cpus_per_rs
else:
self.run_args["cpu_per_rs"] = int(cpus_per_rs)
[docs] def set_gpus_per_rs(self, gpus_per_rs: int) -> None:
"""Set the number of gpus to use per resource set
This sets ``--gpu_per_rs``
:param gpus_per_rs: number of gpus to use per resource set or ALL_GPUS
"""
if isinstance(gpus_per_rs, str):
self.run_args["gpu_per_rs"] = gpus_per_rs
else:
self.run_args["gpu_per_rs"] = int(gpus_per_rs)
[docs] def set_rs_per_host(self, rs_per_host: int) -> None:
"""Set the number of resource sets to use per host
This sets ``--rs_per_host``
:param rs_per_host: number of resource sets to use per host
"""
self.run_args["rs_per_host"] = int(rs_per_host)
[docs] def set_tasks(self, tasks: int) -> None:
"""Set the number of tasks for this job
This sets ``--np``
:param tasks: number of tasks
"""
self.run_args["np"] = int(tasks)
[docs] def set_tasks_per_rs(self, tasks_per_rs: int) -> None:
"""Set the number of tasks per resource set
This sets ``--tasks_per_rs``
:param tasks_per_rs: number of tasks per resource set
"""
self.run_args["tasks_per_rs"] = int(tasks_per_rs)
[docs] def set_tasks_per_node(self, tasks_per_node: int) -> None:
"""Set the number of tasks per resource set.
This function is an alias for `set_tasks_per_rs`.
:param tasks_per_node: number of tasks per resource set
"""
self.set_tasks_per_rs(int(tasks_per_node))
[docs] def set_cpus_per_task(self, cpus_per_task: int) -> None:
"""Set the number of cpus per tasks.
This function is an alias for `set_cpus_per_rs`.
:param cpus_per_task: number of cpus per resource set
"""
self.set_cpus_per_rs(int(cpus_per_task))
[docs] def set_memory_per_rs(self, memory_per_rs: int) -> None:
"""Specify the number of megabytes of memory to assign to a resource set
This sets ``--memory_per_rs``
:param memory_per_rs: Number of megabytes per rs
"""
self.run_args["memory_per_rs"] = int(memory_per_rs)
[docs] def set_memory_per_node(self, memory_per_node: int) -> None:
"""Specify the number of megabytes of memory to assign to a resource set
Alias for `set_memory_per_rs`.
:param memory_per_node: Number of megabytes per rs
"""
self.set_memory_per_rs(int(memory_per_node))
[docs] def set_binding(self, binding: str) -> None:
"""Set binding
This sets ``--bind``
:param binding: Binding, e.g. `packed:21`
"""
self.run_args["bind"] = binding
[docs] def make_mpmd(self, settings: RunSettings) -> None:
"""Make step an MPMD (or SPMD) job.
This method will activate job execution through an ERF file.
Optionally, this method adds an instance of ``JsrunSettings`` to
the list of settings to be launched in the same ERF file.
:param settings: ``JsrunSettings`` instance
"""
if self.colocated_db_settings:
raise SSUnsupportedError(
"Colocated models cannot be run as a mpmd workload"
)
self.mpmd.append(settings)
[docs] def set_mpmd_preamble(self, preamble_lines: t.List[str]) -> None:
"""Set preamble used in ERF file. Typical lines include
`oversubscribe-cpu : allow` or `overlapping-rs : allow`.
Can be used to set `launch_distribution`. If it is not present,
it will be inferred from the settings, or set to `packed` by
default.
:param preamble_lines: lines to put at the beginning of the ERF
file.
"""
self.mpmd_preamble_lines = preamble_lines
[docs] def set_erf_sets(self, erf_sets: t.Dict[str, str]) -> None:
"""Set resource sets used for ERF (SPMD or MPMD) steps.
``erf_sets`` is a dictionary used to fill the ERF
line representing these settings, e.g.
`{"host": "1", "cpu": "{0:21}, {21:21}", "gpu": "*"}`
can be used to specify rank (or rank_count), hosts, cpus, gpus,
and memory.
The key `rank` is used to give specific ranks, as in
`{"rank": "1, 2, 5"}`, while the key `rank_count` is used to specify
the count only, as in `{"rank_count": "3"}`. If both are specified,
only `rank` is used.
:param hosts: dictionary of resources
"""
self.erf_sets = copy.deepcopy(erf_sets)
[docs] def set_individual_output(self, suffix: t.Optional[str] = None) -> None:
"""Set individual std output.
This sets ``--stdio_mode individual``
and inserts the suffix into the output name. The resulting
output name will be ``self.name + suffix + .out``.
:param suffix: Optional suffix to add to output file names,
it can contain `%j`, `%h`, `%p`, or `%t`,
as specified by `jsrun` options.
"""
self.run_args["stdio_mode"] = "individual"
if suffix:
self.individual_suffix = suffix
def __str__(self) -> str:
string = super().__str__()
if self.mpmd:
string += "\nERF settings: " + pformat(self.erf_sets)
return string
def _prep_colocated_db(self, db_cpus: int) -> None:
cpus_per_flag_set = False
for cpu_per_rs_flag in ["cpu_per_rs", "c"]:
if run_arg_value := self.run_args.get(cpu_per_rs_flag, 0):
cpus_per_flag_set = True
cpu_per_rs = int(run_arg_value)
if cpu_per_rs < db_cpus:
msg = (
f"{cpu_per_rs_flag} flag was set to {cpu_per_rs}, but "
f"colocated DB requires {db_cpus} CPUs per RS. Automatically "
f"setting {cpu_per_rs_flag} flag to {db_cpus}"
)
logger.info(msg)
self.run_args[cpu_per_rs_flag] = db_cpus
if not cpus_per_flag_set:
msg = f"Colocated DB requires {db_cpus} CPUs per RS. Automatically setting "
msg += f"--cpus_per_rs=={db_cpus}"
logger.info(msg)
self.set_cpus_per_rs(db_cpus)
rs_per_host_set = False
for rs_per_host_flag in ["rs_per_host", "r"]:
if rs_per_host_flag in self.run_args:
rs_per_host_set = True
rs_per_host = self.run_args[rs_per_host_flag]
if rs_per_host != 1:
msg = f"{rs_per_host_flag} flag was set to {rs_per_host}, "
msg += (
"but colocated DB requires running ONE resource set per host. "
)
msg += f"Automatically setting {rs_per_host_flag} flag to 1"
logger.info(msg)
self.run_args[rs_per_host_flag] = "1"
if not rs_per_host_set:
msg = "Colocated DB requires one resource set per host. "
msg += " Automatically setting --rs_per_host==1"
logger.info(msg)
self.set_rs_per_host(1)
[docs]class BsubBatchSettings(BatchSettings):
def __init__(
self,
nodes: t.Optional[int] = None,
time: t.Optional[str] = None,
project: t.Optional[str] = None,
batch_args: t.Optional[t.Dict[str, t.Optional[str]]] = None,
smts: int = 0,
**kwargs: t.Any,
) -> None:
"""Specify ``bsub`` batch parameters for a job
:param nodes: number of nodes for batch
:param time: walltime for batch job in format hh:mm
:param project: project for batch launch
:param batch_args: overrides for LSF batch arguments
:param smts: SMTs
"""
self.project: t.Optional[str] = None
if project:
kwargs.pop("account", None)
else:
project = kwargs.pop("account", None)
super().__init__(
"bsub",
batch_args=batch_args,
nodes=nodes,
account=project,
time=time,
**kwargs,
)
self.smts = 0
if smts:
self.set_smts(smts)
self.expert_mode = False
self.easy_settings = ["ln_slots", "ln_mem", "cn_cu", "nnodes"]
[docs] def set_walltime(self, walltime: str) -> None:
"""Set the walltime
This sets ``-W``.
:param walltime: Time in hh:mm format, e.g. "10:00" for 10 hours,
if time is supplied in hh:mm:ss format, seconds
will be ignored and walltime will be set as ``hh:mm``
"""
# For compatibility with other launchers, as explained in docstring
if walltime:
if len(walltime.split(":")) > 2:
walltime = ":".join(walltime.split(":")[:2])
self.walltime = walltime
[docs] def set_smts(self, smts: int) -> None:
"""Set SMTs
This sets ``-alloc_flags``. If the user sets
SMT explicitly through ``-alloc_flags``, then that
takes precedence.
:param smts: SMT (e.g on Summit: 1, 2, or 4)
"""
self.smts = smts
[docs] def set_project(self, project: str) -> None:
"""Set the project
This sets ``-P``.
:param time: project name
"""
if project:
self.project = project
[docs] def set_account(self, account: str) -> None:
"""Set the project
this function is an alias for `set_project`.
:param account: project name
"""
self.set_project(account)
[docs] def set_nodes(self, num_nodes: int) -> None:
"""Set the number of nodes for this batch job
This sets ``-nnodes``.
:param nodes: number of nodes
"""
if num_nodes:
self.batch_args["nnodes"] = str(int(num_nodes))
[docs] def set_expert_mode_req(self, res_req: str, slots: int) -> None:
"""Set allocation for expert mode. This
will activate expert mode (``-csm``) and
disregard all other allocation options.
This sets ``-csm -n slots -R res_req``
:param res_req: specific resource requirements
:param slots: number of resources to allocate
"""
self.expert_mode = True
self.batch_args["csm"] = "y"
self.batch_args["R"] = res_req
self.batch_args["n"] = str(slots)
[docs] def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None:
"""Specify the hostlist for this job
:param host_list: hosts to launch on
:raises TypeError: if not str or list of str
"""
if isinstance(host_list, str):
host_list = [host_list.strip()]
if not isinstance(host_list, list):
raise TypeError("host_list argument must be a list of strings")
if not all(isinstance(host, str) for host in host_list):
raise TypeError("host_list argument must be list of strings")
self.batch_args["m"] = '"' + " ".join(host_list) + '"'
[docs] def set_tasks(self, tasks: int) -> None:
"""Set the number of tasks for this job
This sets ``-n``
:param tasks: number of tasks
"""
self.batch_args["n"] = str(int(tasks))
[docs] def set_queue(self, queue: str) -> None:
"""Set the queue for this job
:param queue: The queue to submit the job on
"""
if queue:
self.batch_args["q"] = queue
def _format_alloc_flags(self) -> None:
"""Format ``alloc_flags`` checking if user already
set it. Currently only adds SMT flag if missing
and ``self.smts`` is set.
"""
if self.smts:
if "alloc_flags" not in self.batch_args.keys():
self.batch_args["alloc_flags"] = f"smt{self.smts}"
else:
# Check if smt is in the flag, otherwise add it
flags: t.List[str] = []
if flags_arg := self.batch_args.get("alloc_flags", ""):
flags = flags_arg.strip('"').split()
if not any(flag.startswith("smt") for flag in flags):
flags.append(f"smt{self.smts}")
self.batch_args["alloc_flags"] = " ".join(flags)
# Check if alloc_flags has to be enclosed in quotes
if "alloc_flags" in self.batch_args.keys():
flags = []
if flags_arg := self.batch_args.get("alloc_flags", ""):
flags = flags_arg.strip('"').split()
if len(flags) > 1:
self.batch_args["alloc_flags"] = '"' + " ".join(flags) + '"'