{ "cells": [ { "cell_type": "markdown", "id": "4cba3240", "metadata": {}, "source": [ "# Ray Integration" ] }, { "cell_type": "markdown", "id": "624cb31c", "metadata": {}, "source": [ "## Starting a Ray Cluster with SmartSim\n", "\n", "Before we can begin starting up a Cluster, we first import the relevant modules. We will also define some global variables for clarity and ease of use:\n", "\n", " 1. `NUM_NODES` is the number of Ray nodes we will deploy with the first one will be the head node. We will run one node on each host.\n", " 2. `CPUS_PER_WORKER` is number of cpus to be used by each worker in the cluster\n", " 3. `LAUNCHER` is the workload manager that our SmartSim experiment and ray cluster will use" ] }, { "cell_type": "code", "execution_count": 1, "id": "bf6b043d", "metadata": { "tags": [] }, "outputs": [], "source": [ "import numpy as np\n", "import os\n", "import ray\n", "from ray import tune\n", "import ray.util\n", "\n", "from smartsim import Experiment\n", "from smartsim.exp.ray import RayCluster\n", "\n", "NUM_NODES = 3\n", "CPUS_PER_WORKER = 18\n", "LAUNCHER='slurm'" ] }, { "cell_type": "markdown", "id": "713f5f27", "metadata": {}, "source": [ "Now, we instance a SmartSim experiment with the name `\"ray-cluster\"`, which we will spin up the Ray cluster. By doing so we will create a `ray-cluster` directory (relative to the path from where we are executing this notebook). The output files generated by the experment will be located in the `ray-cluster` directory. \n", "\n", "Next, we will instance a `RayCluster` to connect to the cluster. We are limiting the number each ray node can use to `CPUS_PER_WORKER`. If we wanted to let it use all the CPUs, it would suffice not to pass `ray_args`.\n", "Notice that the cluster will be password-protected (the password, generated internally, will be shared with worker nodes).\n", "\n", "If the hosts are attached to multiple interfaces (e.g. `ib`, `eth0`, ...), we can specify to which one the Ray nodes should bind by setting the `interface` argument; it is recommended to always choose the one offering the best performances. On a Cray XC, for example, this will be `ipogif0`. \n", "\n", "Note that this approach only works with `ray>=1.6`. For previous versions, you have to add `password=None` to the `RayCluster` constructor." ] }, { "cell_type": "code", "execution_count": 2, "id": "a8851bff", "metadata": { "tags": [] }, "outputs": [], "source": [ "exp = Experiment(\"ray-cluster\", launcher=LAUNCHER)\n", "cluster = RayCluster(\n", " name=\"ray-cluster\",\n", " run_args={},\n", " ray_args={\"num-cpus\": CPUS_PER_WORKER},\n", " launcher=LAUNCHER,\n", " num_nodes=NUM_NODES,\n", " batch=False,\n", " interface=\"ipogif0\",\n", ")" ] }, { "cell_type": "markdown", "id": "a28512f9", "metadata": {}, "source": [ "We now generate the needed directories. If an experiment with the same name already exists, this call will fail to avoid overwriting existing results. If we want to overwrite, we can simply pass `overwrite=True` to `exp.generate()`." ] }, { "cell_type": "code", "execution_count": 3, "id": "30c66187", "metadata": { "tags": [] }, "outputs": [], "source": [ "exp.generate(cluster, overwrite=True)" ] }, { "cell_type": "markdown", "id": "5ddd1af8", "metadata": {}, "source": [ "Now we are ready to start the cluster!" ] }, { "cell_type": "code", "execution_count": 4, "id": "088251d3", "metadata": { "tags": [] }, "outputs": [], "source": [ "exp.start(cluster, block=False, summary=False)" ] }, { "cell_type": "markdown", "id": "847a4a74", "metadata": {}, "source": [ "## Connect to the Ray Cluster\n", "\n", "Now we can just connect to our running server." ] }, { "cell_type": "code", "execution_count": 5, "id": "2a90ff89", "metadata": { "tags": [] }, "outputs": [], "source": [ "ctx = ray.init(f\"ray://{cluster.get_head_address()}:10001\")" ] }, { "cell_type": "markdown", "id": "c6401082", "metadata": {}, "source": [ "We can check that all resources are set properly." ] }, { "cell_type": "code", "execution_count": 6, "id": "c17e5555", "metadata": { "tags": [] }, "outputs": [], "source": [ "print(\n", " (\n", " \"This cluster consists of\\n\"\n", " f\"{len(ray.nodes())} nodes in total\\n\"\n", " f\"{ray.cluster_resources()['CPU']} CPU resources in total\\n\"\n", " f\"and the head node is running at {cluster.get_head_address()}\"\n", " )\n", ")" ] }, { "cell_type": "markdown", "id": "4f6663d4", "metadata": {}, "source": [ "We can run a Ray Tune example, to see that everything is working." ] }, { "cell_type": "code", "execution_count": 7, "id": "1f08fc6a", "metadata": { "tags": [] }, "outputs": [], "source": [ "tune.run(\n", " \"PPO\",\n", " stop={\"episode_reward_max\": 200},\n", " config={\n", " \"framework\": \"torch\",\n", " \"env\": \"CartPole-v0\",\n", " \"num_gpus\": 0,\n", " \"lr\": tune.grid_search(np.linspace (0.001, 0.01, 50).tolist()),\n", " \"log_level\": \"ERROR\",\n", " },\n", " local_dir=os.path.join(exp.exp_path, \"ray_log\"),\n", " verbose=0,\n", " fail_fast=True,\n", " log_to_file=True,\n", ")" ] }, { "cell_type": "markdown", "id": "66e52249", "metadata": {}, "source": [ "When the Ray job is running, we can connect to the Ray dashboard to monitor the evolution of the experiment. To do this, if Ray is running on a compute node of a remote system, we need to setup a SSH tunnel (we will see later how), to forward the port on which the dashboard is published to our local system. For example, if the head address (printed in the cell above) is ``, and the system name is ``, we can establish a tunnel to the dashboard opening a terminal on the local system and entering:\n", "\n", "```bash\n", "ssh -L 8265::8265 \n", "```\n", "\n", "Then, from a browser on the local system, we can go to the address `http://localhost:8265` to see the dashboard.\n", "\n", "There are two things to know if something does not work:\n", "\n", "1. We are using `8265` as a port, which is the default dashboard port. If that port is not free, we can bind the dashboard to another port, e.g. `PORT_NUMBER` (by adding `\"dashboard-port\": str(PORT_NUMBER)` to `ray_args` when creating the cluster) and the command changed accordingly.\n", "\n", "2. If the port forwarding fails, it is possible that the interface is not reachable. In that case, you can add `\"dashboard-address\": \"0.0.0.0\"` to `ray_args` when creating the cluster, to bind the dashboard to all interfaces, or select a visible address if one knows it. You can then use the node name (or its public IP) to establish the tunnel, by entering (on the local terminal):\n", " ```bash \n", " ssh -L 8265::8265 \n", " ```\n", "Please refer to your system guide to find out how you can get the name and the address of a node." ] }, { "cell_type": "markdown", "id": "6da5f0a5", "metadata": {}, "source": [ "## Stop Cluster and Release Resources\n", "\n", "When we are finished with the cluster and ready to deallocate resources, we must first shut down the Ray runtime, followed by disconnecting the context." ] }, { "cell_type": "code", "execution_count": 8, "id": "4961f1d6", "metadata": { "tags": [] }, "outputs": [], "source": [ "ray.shutdown()\n", "ctx.disconnect()" ] }, { "cell_type": "markdown", "id": "97d167bb", "metadata": {}, "source": [ "Now that all is gracefully stopped, we can stop the job on the allocation." ] }, { "cell_type": "code", "execution_count": 9, "id": "f19f7b95", "metadata": {}, "outputs": [], "source": [ "exp.stop(cluster)" ] } ], "metadata": { "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.12" } }, "nbformat": 4, "nbformat_minor": 5 }