# BSD 2-Clause License ## Copyright (c) 2021-2024, Hewlett Packard Enterprise# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions are met:## 1. Redistributions of source code must retain the above copyright notice, this# list of conditions and the following disclaimer.## 2. Redistributions in binary form must reproduce the above copyright notice,# this list of conditions and the following disclaimer in the documentation# and/or other materials provided with the distribution.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.from__future__importannotationsimportcopyimporttypingastfromsmartsim.settings.containersimportContainerfrom.._core.utils.helpersimportexpand_exe_path,fmt_dict,is_valid_cmdfrom..entity.dbobjectimportDBModel,DBScriptfrom..logimportget_loggerlogger=get_logger(__name__)# fmt: offclassSettingsBase:...# fmt: on# pylint: disable=too-many-public-methods
[docs]classRunSettings(SettingsBase):# pylint: disable=unused-argumentdef__init__(self,exe:str,exe_args:t.Optional[t.Union[str,t.List[str]]]=None,run_command:str="",run_args:t.Optional[t.Dict[str,t.Union[int,str,float,None]]]=None,env_vars:t.Optional[t.Dict[str,t.Optional[str]]]=None,container:t.Optional[Container]=None,**_kwargs:t.Any,)->None:"""Run parameters for a ``Model`` The base ``RunSettings`` class should only be used with the `local` launcher on single node, workstations, or laptops. If no ``run_command`` is specified, the executable will be launched locally. ``run_args`` passed as a dict will be interpreted literally for local ``RunSettings`` and added directly to the ``run_command`` e.g. run_args = {"-np": 2} will be "-np 2" Example initialization .. highlight:: python .. code-block:: python rs = RunSettings("echo", "hello", "mpirun", run_args={"-np": "2"}) :param exe: executable to run :param exe_args: executable arguments :param run_command: launch binary (e.g. "srun") :param run_args: arguments for run command (e.g. `-np` for `mpiexec`) :param env_vars: environment vars to launch job with :param container: container type for workload (e.g. "singularity") """# Do not expand executable if running within a containerself.exe=[exe]ifcontainerelse[expand_exe_path(exe)]self.exe_args=exe_argsor[]self.run_args=run_argsor{}self.env_vars=env_varsor{}self.container=containerself._run_command=run_commandself.in_batch=Falseself.colocated_db_settings:t.Optional[t.Dict[str,t.Union[bool,int,str,None,t.List[str],t.Iterable[t.Union[int,t.Iterable[int]]],t.List[DBModel],t.List[DBScript],t.Dict[str,t.Union[int,None]],t.Dict[str,str],],]]=None@propertydefexe_args(self)->t.Union[str,t.List[str]]:"""Return an immutable list of attached executable arguments. :returns: attached executable arguments """returnself._exe_args@exe_args.setterdefexe_args(self,value:t.Union[str,t.List[str],None])->None:"""Set the executable arguments. :param value: executable arguments """self._exe_args=self._build_exe_args(value)@propertydefrun_args(self)->t.Dict[str,t.Union[int,str,float,None]]:"""Return an immutable list of attached run arguments. :returns: attached run arguments """returnself._run_args@run_args.setterdefrun_args(self,value:t.Dict[str,t.Union[int,str,float,None]])->None:"""Set the run arguments. :param value: run arguments """self._run_args=copy.deepcopy(value)@propertydefenv_vars(self)->t.Dict[str,t.Optional[str]]:"""Return an immutable list of attached environment variables. :returns: attached environment variables """returnself._env_vars@env_vars.setterdefenv_vars(self,value:t.Dict[str,t.Optional[str]])->None:"""Set the environment variables. :param value: environment variables """self._env_vars=copy.deepcopy(value)# To be overwritten by subclasses. Set of reserved args a user cannot changereserved_run_args=set()# type: set[str]
[docs]defset_nodes(self,nodes:int)->None:"""Set the number of nodes :param nodes: number of nodes to run with """logger.warning(("Node specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_tasks(self,tasks:int)->None:"""Set the number of tasks to launch :param tasks: number of tasks to launch """logger.warning(("Task specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_tasks_per_node(self,tasks_per_node:int)->None:"""Set the number of tasks per node :param tasks_per_node: number of tasks to launch per node """logger.warning(("Task per node specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_task_map(self,task_mapping:str)->None:"""Set a task mapping :param task_mapping: task mapping """logger.warning(("Task mapping specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_cpus_per_task(self,cpus_per_task:int)->None:"""Set the number of cpus per task :param cpus_per_task: number of cpus per task """logger.warning(("CPU per node specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_hostlist(self,host_list:t.Union[str,t.List[str]])->None:"""Specify the hostlist for this job :param host_list: hosts to launch on """logger.warning(("Hostlist specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_hostlist_from_file(self,file_path:str)->None:"""Use the contents of a file to specify the hostlist for this job :param file_path: Path to the hostlist file """logger.warning(("Hostlist from file specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_excluded_hosts(self,host_list:t.Union[str,t.List[str]])->None:"""Specify a list of hosts to exclude for launching this job :param host_list: hosts to exclude """logger.warning(("Excluded host list specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_cpu_bindings(self,bindings:t.Union[int,t.List[int]])->None:"""Set the cores to which MPI processes are bound :param bindings: List specifing the cores to which MPI processes are bound """logger.warning(("CPU binding specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_memory_per_node(self,memory_per_node:int)->None:"""Set the amount of memory required per node in megabytes :param memory_per_node: Number of megabytes per node """logger.warning(("Memory per node specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_verbose_launch(self,verbose:bool)->None:"""Set the job to run in verbose mode :param verbose: Whether the job should be run verbosely """logger.warning(("Verbose specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_quiet_launch(self,quiet:bool)->None:"""Set the job to run in quiet mode :param quiet: Whether the job should be run quietly """logger.warning(("Quiet specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_broadcast(self,dest_path:t.Optional[str]=None)->None:"""Copy executable file to allocated compute nodes :param dest_path: Path to copy an executable file """logger.warning(("Broadcast specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_time(self,hours:int=0,minutes:int=0,seconds:int=0)->None:"""Automatically format and set wall time :param hours: number of hours to run job :param minutes: number of minutes to run job :param seconds: number of seconds to run job """returnself.set_walltime(self._fmt_walltime(int(hours),int(minutes),int(seconds)))
[docs]defset_node_feature(self,feature_list:t.Union[str,t.List[str]])->None:"""Specify the node feature for this job :param feature_list: node feature to launch on """logger.warning(("Feature specification not implemented for this "f"RunSettings type: {type(self)}"))
@staticmethoddef_fmt_walltime(hours:int,minutes:int,seconds:int)->str:"""Convert hours, minutes, and seconds into valid walltime format By defualt the formatted wall time is the total number of seconds. :param hours: number of hours to run job :param minutes: number of minutes to run job :param seconds: number of seconds to run job :returns: Formatted walltime """time_=hours*3600time_+=minutes*60time_+=secondsreturnstr(time_)
[docs]defset_walltime(self,walltime:str)->None:"""Set the formatted walltime :param walltime: Time in format required by launcher`` """logger.warning(("Walltime specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_binding(self,binding:str)->None:"""Set binding :param binding: Binding """logger.warning(("binding specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defset_mpmd_preamble(self,preamble_lines:t.List[str])->None:"""Set preamble to a file to make a job MPMD :param preamble_lines: lines to put at the beginning of a file. """logger.warning(("MPMD preamble specification not implemented for this "f"RunSettings type: {type(self)}"))
[docs]defmake_mpmd(self,settings:RunSettings)->None:"""Make job an MPMD job :param settings: ``RunSettings`` instance """logger.warning(("Make MPMD specification not implemented for this "f"RunSettings type: {type(self)}"))
@propertydefrun_command(self)->t.Optional[str]:"""Return the launch binary used to launch the executable Attempt to expand the path to the executable if possible :returns: launch binary e.g. mpiexec """cmd=self._run_commandifcmd:ifis_valid_cmd(cmd):# command is valid and will be expandedreturnexpand_exe_path(cmd)# command is not valid, so return it as is# it may be on the compute nodes but not local machinereturncmd# run without run commandreturnNone
[docs]defupdate_env(self,env_vars:t.Dict[str,t.Union[str,int,float,bool]])->None:"""Update the job environment variables To fully inherit the current user environment, add the workload-manager-specific flag to the launch command through the :meth:`add_exe_args` method. For example, ``--export=ALL`` for slurm, or ``-V`` for PBS/aprun. :param env_vars: environment variables to update or add :raises TypeError: if env_vars values cannot be coerced to strings """val_types=(str,int,float,bool)# Coerce env_vars values to str as a convenience to userforenv,valinenv_vars.items():ifnotisinstance(val,val_types):raiseTypeError(f"env_vars[{env}] was of type {type(val)}, not {val_types}")self.env_vars[env]=str(val)
[docs]defadd_exe_args(self,args:t.Union[str,t.List[str]])->None:"""Add executable arguments to executable :param args: executable arguments """args=self._build_exe_args(args)self._exe_args.extend(args)
[docs]defset(self,arg:str,value:t.Optional[str]=None,condition:bool=True)->None:"""Allows users to set individual run arguments. A method that allows users to set run arguments after object instantiation. Does basic formatting such as stripping leading dashes. If the argument has been set previously, this method will log warning but ultimately comply. Conditional expressions may be passed to the conditional parameter. If the expression evaluates to True, the argument will be set. In not an info message is logged and no further operation is performed. Basic Usage .. highlight:: python .. code-block:: python rs = RunSettings("python") rs.set("an-arg", "a-val") rs.set("a-flag") rs.format_run_args() # returns ["an-arg", "a-val", "a-flag", "None"] Slurm Example with Conditional Setting .. highlight:: python .. code-block:: python import socket rs = SrunSettings("echo", "hello") rs.set_tasks(1) rs.set("exclusive") # Only set this argument if condition param evals True # Otherwise log and NOP rs.set("partition", "debug", condition=socket.gethostname()=="testing-system") rs.format_run_args() # returns ["exclusive", "None", "partition", "debug"] iff socket.gethostname()=="testing-system" # otherwise returns ["exclusive", "None"] :param arg: name of the argument :param value: value of the argument :param conditon: set the argument if condition evaluates to True """ifnotisinstance(arg,str):raiseTypeError("Argument name should be of type str")ifvalueisnotNoneandnotisinstance(value,str):raiseTypeError("Argument value should be of type str or None")arg=arg.strip().lstrip("-")ifnotcondition:logger.info(f"Could not set argument '{arg}': condition not met")returnifarginself.reserved_run_args:logger.warning((f"Could not set argument '{arg}': "f"it is a reserved arguement of '{type(self).__name__}'"))returnifarginself.run_argsandvalue!=self.run_args[arg]:logger.warning(f"Overwritting argument '{arg}' with value '{value}'")self.run_args[arg]=value
@staticmethoddef_build_exe_args(exe_args:t.Optional[t.Union[str,t.List[str]]])->t.List[str]:"""Check and convert exe_args input to a desired collection format"""ifnotexe_args:return[]ifisinstance(exe_args,list):exe_args=copy.deepcopy(exe_args)ifnot(isinstance(exe_args,str)or(isinstance(exe_args,list)andall(isinstance(arg,str)forarginexe_args))):raiseTypeError("Executable arguments were not a list of str or a str.")ifisinstance(exe_args,str):returnexe_args.split()returnexe_args
[docs]defformat_run_args(self)->t.List[str]:"""Return formatted run arguments For ``RunSettings``, the run arguments are passed literally with no formatting. :return: list run arguments for these settings """formatted=[]forarg,valueinself.run_args.items():formatted.append(arg)formatted.append(str(value))returnformatted
[docs]defformat_env_vars(self)->t.List[str]:"""Build environment variable string :returns: formatted list of strings to export variables """formatted=[]forkey,valinself.env_vars.items():ifvalisNone:formatted.append(f"{key}=")else:formatted.append(f"{key}={val}")returnformatted
classBatchSettings(SettingsBase):def__init__(self,batch_cmd:str,batch_args:t.Optional[t.Dict[str,t.Optional[str]]]=None,**kwargs:t.Any,)->None:self._batch_cmd=batch_cmdself.batch_args=batch_argsor{}self._preamble:t.List[str]=[]nodes=kwargs.get("nodes",None)ifnodes:self.set_nodes(nodes)queue=kwargs.get("queue",None)ifqueue:self.set_queue(queue)self.set_walltime(kwargs.get("time",None))self.set_account(kwargs.get("account",None))@propertydefbatch_cmd(self)->str:"""Return the batch command Tests to see if we can expand the batch command path. If we can, then returns the expanded batch command. If we cannot, returns the batch command as is. :returns: batch command """ifis_valid_cmd(self._batch_cmd):returnexpand_exe_path(self._batch_cmd)returnself._batch_cmd@propertydefbatch_args(self)->t.Dict[str,t.Optional[str]]:"""Retrieve attached batch arguments :returns: attached batch arguments """returnself._batch_args@batch_args.setterdefbatch_args(self,value:t.Dict[str,t.Optional[str]])->None:"""Attach batch arguments :param value: dictionary of batch arguments """self._batch_args=copy.deepcopy(value)ifvalueelse{}defset_nodes(self,num_nodes:int)->None:raiseNotImplementedErrordefset_hostlist(self,host_list:t.Union[str,t.List[str]])->None:raiseNotImplementedErrordefset_queue(self,queue:str)->None:raiseNotImplementedErrordefset_walltime(self,walltime:str)->None:raiseNotImplementedErrordefset_account(self,account:str)->None:raiseNotImplementedErrordefformat_batch_args(self)->t.List[str]:raiseNotImplementedErrordefset_batch_command(self,command:str)->None:"""Set the command used to launch the batch e.g. ``sbatch`` :param command: batch command """self._batch_cmd=commanddefadd_preamble(self,lines:t.List[str])->None:"""Add lines to the batch file preamble. The lines are just written (unmodified) at the beginning of the batch file (after the WLM directives) and can be used to e.g. start virtual environments before running the executables. :param line: lines to add to preamble. """ifisinstance(lines,str):self._preamble+=[lines]elifisinstance(lines,list):self._preamble+=lineselse:raiseTypeError("Expected str or List[str] for lines argument")@propertydefpreamble(self)->t.Iterable[str]:"""Return an iterable of preamble clauses to be prepended to the batch file :return: attached preamble clauses """return(clauseforclauseinself._preamble)def__str__(self)->str:# pragma: no-coverstring=f"Batch Command: {self._batch_cmd}"ifself.batch_args:string+=f"\nBatch arguments:\n{fmt_dict(self.batch_args)}"returnstring