# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import typing as t
from shutil import which
from .._core.launcher.slurm.slurmCommands import salloc, scancel, scontrol, sinfo
from .._core.launcher.slurm.slurmParser import parse_salloc, parse_salloc_error
from .._core.launcher.util.launcherUtil import ComputeNode, Partition
from ..error import (
AllocationError,
LauncherError,
SmartSimError,
SSReservedKeywordError,
)
from ..log import get_logger
from ..settings.slurmSettings import fmt_walltime
logger = get_logger(__name__)
[docs]def get_allocation(
nodes: int = 1,
time: t.Optional[str] = None,
account: t.Optional[str] = None,
options: t.Optional[t.Dict[str, str]] = None,
) -> str:
"""Request an allocation
This function requests an allocation with the specified arguments.
Anything passed to the options will be processed as a Slurm
argument and appended to the salloc command with the appropriate
prefix (e.g. "-" or "--").
The options can be used to pass extra settings to the
workload manager such as the following for Slurm:
- nodelist="nid00004"
For arguments without a value, pass None or and empty
string as the value. For Slurm:
- exclusive=None
:param nodes: number of nodes for the allocation
:param time: wall time of the allocation, HH:MM:SS format
:param account: account id for allocation
:param options: additional options for the slurm wlm
:raises LauncherError: if the allocation is not successful
:return: the id of the allocation
"""
if not which("salloc"):
raise LauncherError(
"Attempted slurm function without access to slurm(salloc) at the call site"
)
options = options or {}
salloc_args = _get_alloc_cmd(nodes, time, account, options=options)
debug_msg = " ".join(salloc_args[1:])
logger.debug(f"Allocation settings: {debug_msg}")
_, err = salloc(salloc_args)
alloc_id = parse_salloc(err)
if alloc_id:
logger.info(f"Allocation successful with Job ID: {str(alloc_id)}")
else:
logger.debug(err)
error = parse_salloc_error(err)
if not error:
logger.error(err)
raise AllocationError("Error retrieving Slurm allocation")
raise AllocationError(error)
return str(alloc_id)
[docs]def release_allocation(alloc_id: str) -> None:
"""Free an allocation's resources
:param alloc_id: allocation id
:raises LauncherError: if allocation could not be freed
"""
if not which("scancel"):
raise LauncherError(
"Attempted slurm function without access to slurm(salloc) at the call site"
)
logger.info(f"Releasing allocation: {alloc_id}")
returncode, _, _ = scancel([str(alloc_id)])
if returncode != 0:
logger.error(f"Unable to revoke your allocation for jobid {str(alloc_id)}")
logger.error(
"The job may have already timed out, or you may "
"need to cancel the job manually"
)
raise AllocationError(
f"Unable to revoke your allocation for jobid {str(alloc_id)}"
)
logger.info(f"Successfully freed allocation {alloc_id}")
[docs]def validate(nodes: int = 1, ppn: int = 1, partition: t.Optional[str] = None) -> bool:
"""Check that there are sufficient resources in the provided Slurm partitions.
if no partition is provided, the default partition is found and used.
:param nodes: Override the default node count to validate
:param ppn: Override the default processes per node to validate
:param partition: partition to validate
:raises: LauncherError
:returns: True if resources are available, False otherwise
"""
sys_partitions = _get_system_partition_info()
n_avail_nodes = 0
avail_nodes = set()
p_name = partition
if p_name is None or p_name == "default":
try:
p_name = get_default_partition()
except LauncherError as e:
raise LauncherError(
"No partition provided and default partition could not be found"
) from e
if not p_name in sys_partitions:
raise LauncherError(f"Partition {p_name} is not found on this system")
for node in sys_partitions[p_name].nodes:
if node.ppn is not None and node.ppn >= ppn:
avail_nodes.add(node)
n_avail_nodes = len(avail_nodes)
logger.debug(f"Found {n_avail_nodes} nodes that match the constraints provided")
if n_avail_nodes < nodes:
logger.warning(
f"{nodes} nodes are not available on the specified partitions. Only "
f"{n_avail_nodes} nodes available."
)
return False
logger.info("Successfully validated Slurm with sufficient resources")
return True
[docs]def get_default_partition() -> str:
"""Returns the default partition from Slurm
This default partition is assumed to be the partition with
a star following its partition name in sinfo output
:returns: the name of the default partition
"""
sinfo_output, _ = sinfo(["--noheader", "--format", "%P"])
default = None
for line in sinfo_output.split("\n"):
if line.endswith("*"):
default = line.strip("*")
if not default:
raise LauncherError("Could not find default partition!")
return default
def _get_system_partition_info() -> t.Dict[str, Partition]:
"""Build a dictionary of slurm partitions
:returns: dict of Partition objects
"""
sinfo_output, _ = sinfo(["--noheader", "--format", "%R %n %c"])
partitions: t.Dict[str, Partition] = {}
for line in sinfo_output.split("\n"):
line = line.strip()
if line == "":
continue
p_info = line.split(" ")
p_name = p_info[0]
p_node = p_info[1]
p_ppn = int(p_info[2])
if not p_name in partitions:
partitions.update({p_name: Partition()})
partitions[p_name].name = p_name
partitions[p_name].nodes.add(ComputeNode(node_name=p_node, node_ppn=p_ppn))
return partitions
def _get_alloc_cmd(
nodes: int,
time: t.Optional[str] = None,
account: t.Optional[str] = None,
options: t.Optional[t.Dict[str, str]] = None,
) -> t.List[str]:
"""Return the command to request an allocation from Slurm with
the class variables as the slurm options.
"""
salloc_args = [
"--no-shell",
"-N",
str(nodes),
"-J",
"SmartSim",
]
if time:
salloc_args.extend(["-t", _validate_time_format(time)])
if account:
salloc_args.extend(["-A", str(account)])
arguments = set(options.keys() if options is not None else {})
invalid = {"t", "time", "N", "nodes", "A", "account"}
if valid := arguments.intersection(invalid):
raise SSReservedKeywordError(
f"Expecting time, nodes, account as an argument. Also received: {valid}"
)
for opt, val in (options or {}).items():
short_arg = bool(len(str(opt)) == 1)
prefix = "-" if short_arg else "--"
if not val:
salloc_args += [prefix + opt]
else:
if short_arg:
salloc_args += [prefix + opt, str(val)]
else:
salloc_args += ["=".join((prefix + opt, str(val)))]
return salloc_args
def _validate_time_format(time: str) -> str:
"""Convert time into valid walltime format
By defualt the formatted wall time is the total number of seconds.
:param time: number of hours to run job
:returns: Formatted walltime
"""
try:
hours, minutes, seconds = map(int, time.split(":"))
except ValueError as e:
raise ValueError(
"Input time must be formatted as `HH:MM:SS` with valid Integers."
) from e
return fmt_walltime(hours, minutes, seconds)
[docs]def get_hosts() -> t.List[str]:
"""Get the name of the nodes used in a slurm allocation.
.. note::
This method requires access to ``scontrol`` from the node
on which it is run
:returns: Names of the host nodes
:raises LauncherError: Could not access ``scontrol``
:raises SmartSimError: ``SLURM_JOB_NODELIST`` is not set
"""
if "SLURM_JOB_NODELIST" in os.environ:
if not which("scontrol"):
raise LauncherError(
(
"Attempted slurm function without access to "
"slurm(scontrol) at the call site"
)
)
nodelist, _ = scontrol(
["show", "hostnames", os.environ.get("SLURM_JOB_NODELIST", "")]
)
return sorted(nodelist.split())
raise SmartSimError("Could not parse allocation nodes from SLURM_JOB_NODELIST")
[docs]def get_queue() -> str:
"""Get the name of queue in a slurm allocation.
:returns: The name of the queue
:raises SmartSimError: ``SLURM_JOB_PARTITION`` is not set
"""
if job_partition := os.environ.get("SLURM_JOB_PARTITION", None):
return job_partition
raise SmartSimError("Could not parse queue from SLURM_JOB_PARTITION")
[docs]def get_tasks() -> int:
"""Get the number of tasks in a slurm allocation.
:returns: Then number of tasks in the allocation
:raises SmartSimError: ``SLURM_NTASKS`` is not set
"""
if ntasks_str := os.environ.get("SLURM_NTASKS", 0):
return int(ntasks_str)
raise SmartSimError("Could not parse number of requested tasks from SLURM_NTASKS")
[docs]def get_tasks_per_node() -> t.Dict[str, int]:
"""Get the number of tasks per each node in a slurm allocation.
.. note::
This method requires access to ``scontrol`` from the node
on which it is run
:returns: Map of nodes to number of tasks on that node
:raises SmartSimError: ``SLURM_TASKS_PER_NODE`` is not set
"""
if "SLURM_TASKS_PER_NODE" in os.environ:
tasks_per_node_strs = os.environ.get("SLURM_TASKS_PER_NODE", "").split(",")
tasks_per_node_list = []
for tasks_per_node_str in tasks_per_node_strs:
if "(" in tasks_per_node_str:
tasks, quantity = tasks_per_node_str.split("(")
quantity = quantity.rstrip(")").lstrip("x")
tasks_per_node_list.extend([int(tasks)] * int(quantity))
else:
tasks_per_node_list.append(int(tasks_per_node_str))
return dict(zip(get_hosts(), tasks_per_node_list))
raise SmartSimError("Could not parse tasks per node from SLURM_TASKS_PER_NODE")