Unverified Commit dddd51db authored by Willbon's avatar Willbon Committed by GitHub

Adds YAML Resource Specification To Ray Integration (#2847)

# Description

<!--
Thank you for your interest in sending a pull request. Please make sure
to check the contribution guidelines.

Link:
https://isaac-sim.github.io/IsaacLab/main/source/refs/contributing.html
-->
This PR:
- Add task_runner.py to support specifying resources, py_modules, and
pip.

Fixes [# (issue)](https://github.com/isaac-sim/IsaacLab/issues/2632)

<!-- As a practice, it is recommended to open an issue to have
discussions on the proposed pull request.
This makes it easier for the community to keep track of what is being
developed or added, and if a given feature
is demanded by more than one party. -->

## Type of change

<!-- As you go through the list, delete the ones that are not
applicable. -->

- New feature (non-breaking change which adds functionality)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Signed-off-by: 's avatargarylvov <67614381+garylvov@users.noreply.github.com>
Co-authored-by: 's avatar松翊 <songyi.wb@alibaba-inc.com>
Co-authored-by: 's avatargarylvov <67614381+garylvov@users.noreply.github.com>
parent 82b24dd4
......@@ -140,6 +140,7 @@ Guidelines for modifications:
* Ziqi Fan
* Zoe McCarthy
* David Leon
* Song Yi
## Acknowledgements
......
......@@ -44,22 +44,28 @@ specifying the ``--num_workers`` argument for resource-wrapped jobs, or ``--num_
for tuning jobs, which is especially critical for parallel aggregate
job processing on local/virtual multi-GPU machines. Tuning jobs assume homogeneous node resource composition for nodes with GPUs.
The two following files contain the core functionality of the Ray integration.
The three following files contain the core functionality of the Ray integration.
.. dropdown:: scripts/reinforcement_learning/ray/wrap_resources.py
:icon: code
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/wrap_resources.py
:language: python
:emphasize-lines: 14-66
:emphasize-lines: 10-63
.. dropdown:: scripts/reinforcement_learning/ray/tuner.py
:icon: code
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/tuner.py
:language: python
:emphasize-lines: 18-53
:emphasize-lines: 18-54
.. dropdown:: scripts/reinforcement_learning/ray/task_runner.py
:icon: code
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/task_runner.py
:language: python
:emphasize-lines: 13-105
The following script can be used to submit aggregate
jobs to one or more Ray cluster(s), which can be used for
......@@ -71,7 +77,7 @@ resource requirements.
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/submit_job.py
:language: python
:emphasize-lines: 12-53
:emphasize-lines: 13-61
The following script can be used to extract KubeRay cluster information for aggregate job submission.
......@@ -89,7 +95,7 @@ The following script can be used to easily create clusters on Google GKE.
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/launch.py
:language: python
:emphasize-lines: 16-37
:emphasize-lines: 15-36
Docker-based Local Quickstart
-----------------------------
......@@ -147,7 +153,26 @@ Submitting resource-wrapped individual jobs instead of automatic tuning runs is
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/wrap_resources.py
:language: python
:emphasize-lines: 14-66
:emphasize-lines: 10-63
The ``task_runner.py`` dispatches Python tasks to a Ray cluster via a single declarative YAML file. This approach allows users to specify additional pip packages and Python modules for each run. Fine-grained resource allocation is supported, with explicit control over the number of CPUs, GPUs, and memory assigned to each task. The runner also offers advanced scheduling capabilities: tasks can be restricted to specific nodes by hostname or node ID, and supports two launch modes: tasks can be executed independently as resources become available, or grouped into a simultaneous batch—ideal for multi-node training jobs—which ensures that all tasks launch together only when sufficient resources are available across the cluster.
.. dropdown:: scripts/reinforcement_learning/ray/task_runner.py
:icon: code
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/task_runner.py
:language: python
:emphasize-lines: 13-105
To use this script, run a command similar to the following (replace ``tasks.yaml`` with your actual configuration file):
.. code-block:: bash
python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs task_runner.py --task_cfg tasks.yaml
For detailed instructions on how to write your ``tasks.yaml`` file, please refer to the comments in ``task_runner.py``.
**Tip:** Place the ``tasks.yaml`` file in the ``scripts/reinforcement_learning/ray`` directory so that it is included when the ``working_dir`` is uploaded. You can then reference it using a relative path in the command.
Transferring files from the running container can be done as follows.
......@@ -288,7 +313,7 @@ where instructions are included in the following creation file.
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/launch.py
:language: python
:emphasize-lines: 15-37
:emphasize-lines: 15-36
For other cloud services, the ``kuberay.yaml.ninja`` will be similar to that of
Google's.
......@@ -345,7 +370,7 @@ Dispatching Steps Shared Between KubeRay and Pure Ray Part II
.. literalinclude:: ../../../scripts/reinforcement_learning/ray/submit_job.py
:language: python
:emphasize-lines: 12-53
:emphasize-lines: 13-61
3.) For tuning jobs, specify the tuning job / hyperparameter sweep as a :class:`JobCfg` .
The included :class:`JobCfg` only supports the ``rl_games`` workflow due to differences in
......
......@@ -3,13 +3,6 @@
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import os
import time
from concurrent.futures import ThreadPoolExecutor
from ray import job_submission
"""
This script submits aggregate job(s) to cluster(s) described in a
config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
......@@ -26,7 +19,11 @@ An aggregate job could be a :file:`../tuner.py` tuning job, which automatically
creates several individual jobs when started on a cluster. Alternatively, an aggregate job
could be a :file:'../wrap_resources.py` resource-wrapped job,
which may contain several individual sub-jobs separated by
the + delimiter.
the + delimiter. An aggregate job could also be a :file:`../task_runner.py` multi-task submission job,
where each sub-job and its resource requirements are defined in a YAML configuration file.
In this mode, :file:`../task_runner.py` will read the YAML file (via --task_cfg), and
submit all defined sub-tasks to the Ray cluster, supporting per-job resource specification and
real-time streaming of sub-job outputs.
If there are more aggregate jobs than cluster(s), aggregate jobs will be submitted
as clusters become available via the defined relation above. If there are less aggregate job(s)
......@@ -48,9 +45,21 @@ Usage:
# Example: Submitting resource wrapped job
python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs wrap_resources.py --test
# Example: submitting tasks with specific resources, and supporting pip packages and py_modules
# You may use relative paths for task_cfg and py_modules, placing them in the scripts/reinforcement_learning/ray directory, which will be uploaded to the cluster.
python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs task_runner.py --task_cfg tasks.yaml
# For all command line arguments
python3 scripts/reinforcement_learning/ray/submit_job.py -h
"""
import argparse
import os
import time
from concurrent.futures import ThreadPoolExecutor
from ray import job_submission
script_directory = os.path.dirname(os.path.abspath(__file__))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}
......
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script dispatches one or more user-defined Python tasks to workers in a Ray cluster.
Each task, along with its resource requirements and execution parameters, is specified in a YAML configuration file.
Users may define the number of CPUs, GPUs, and the amount of memory to allocate per task via the config file.
Key features:
-------------
- Fine-grained, per-task resource management via config fields (`num_gpus`, `num_cpus`, `memory`).
- Parallel execution of multiple tasks using available resources across the Ray cluster.
- Option to specify node affinity for tasks, e.g., by hostname, node ID, or any node.
- Optional batch (simultaneous) or independent scheduling of tasks.
Task scheduling and distribution are handled via Ray’s built-in resource manager.
YAML configuration fields:
--------------------------
- `pip`: List of extra pip packages to install before running any tasks.
- `py_modules`: List of additional Python module paths (directories or files) to include in the runtime environment.
- `concurrent`: (bool) It determines task dispatch semantics:
- If `concurrent: true`, **all tasks are scheduled as a batch**. The script waits until sufficient resources are available for every task in the batch, then launches all tasks together. If resources are insufficient, all tasks remain blocked until the cluster can support the full batch.
- If `concurrent: false`, tasks are launched as soon as resources are available for each individual task, and Ray independently schedules them. This may result in non-simultaneous task start times.
- `tasks`: List of task specifications, each with:
- `name`: String identifier for the task.
- `py_args`: Arguments to the Python interpreter (e.g., script/module, flags, user arguments).
- `num_gpus`: Number of GPUs to allocate (float or string arithmetic, e.g., "2*2").
- `num_cpus`: Number of CPUs to allocate (float or string).
- `memory`: Amount of RAM in bytes (int or string).
- `node` (optional): Node placement constraints.
- `specific` (str): Type of node placement, support `hostname`, `node_id`, or `any`.
- `any`: Place the task on any available node.
- `hostname`: Place the task on a specific hostname. `hostname` must be specified in the node field.
- `node_id`: Place the task on a specific node ID. `node_id` must be specified in the node field.
- `hostname` (str): Specific hostname to place the task on.
- `node_id` (str): Specific node ID to place the task on.
Typical usage:
---------------
.. code-block:: bash
# Print help and argument details:
python task_runner.py -h
# Submit tasks defined in a YAML file to the Ray cluster (auto-detects Ray head address):
python task_runner.py --task_cfg /path/to/tasks.yaml
YAML configuration example-1:
---------------------------
.. code-block:: yaml
pip: ["xxx"]
py_modules: ["my_package/my_package"]
concurrent: false
tasks:
- name: "Isaac-Cartpole-v0"
py_args: "-m torch.distributed.run --nnodes=1 --nproc_per_node=2 --rdzv_endpoint=localhost:29501 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --max_iterations 200 --headless --distributed"
num_gpus: 2
num_cpus: 10
memory: 10737418240
- name: "script need some dependencies"
py_args: "script.py --option arg"
num_gpus: 0
num_cpus: 1
memory: 10*1024*1024*1024
YAML configuration example-2:
---------------------------
.. code-block:: yaml
pip: ["xxx"]
py_modules: ["my_package/my_package"]
concurrent: true
tasks:
- name: "Isaac-Cartpole-v0-multi-node-train-1"
py_args: "-m torch.distributed.run --nproc_per_node=1 --nnodes=2 --node_rank=0 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=localhost:5555 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --headless --distributed --max_iterations 1000"
num_gpus: 1
num_cpus: 10
memory: 10*1024*1024*1024
node:
specific: "hostname"
hostname: "xxx"
- name: "Isaac-Cartpole-v0-multi-node-train-2"
py_args: "-m torch.distributed.run --nproc_per_node=1 --nnodes=2 --node_rank=1 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=x.x.x.x:5555 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --headless --distributed --max_iterations 1000"
num_gpus: 1
num_cpus: 10
memory: 10*1024*1024*1024
node:
specific: "hostname"
hostname: "xxx"
To stop all tasks early, press Ctrl+C; the script will cancel all running Ray tasks.
"""
import argparse
import yaml
from datetime import datetime
import util
def parse_args() -> argparse.Namespace:
"""
Parse command-line arguments for the Ray task runner.
Returns:
argparse.Namespace: The namespace containing parsed CLI arguments:
- task_cfg (str): Path to the YAML task file.
- ray_address (str): Ray cluster address.
- test (bool): Whether to run a GPU resource isolation sanity check.
"""
parser = argparse.ArgumentParser(description="Run tasks from a YAML config file.")
parser.add_argument("--task_cfg", type=str, required=True, help="Path to the YAML task file.")
parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
parser.add_argument(
"--test",
action="store_true",
help=(
"Run nvidia-smi test instead of the arbitrary job,"
"can use as a sanity check prior to any jobs to check "
"that GPU resources are correctly isolated."
),
)
return parser.parse_args()
def parse_task_resource(task: dict) -> util.JobResource:
"""
Parse task resource requirements from the YAML configuration.
Args:
task (dict): Dictionary representing a single task's configuration.
Keys may include `num_gpus`, `num_cpus`, and `memory`, each either
as a number or evaluatable string expression.
Returns:
util.JobResource: Resource object with the parsed values.
"""
resource = util.JobResource()
if "num_gpus" in task:
resource.num_gpus = eval(task["num_gpus"]) if isinstance(task["num_gpus"], str) else task["num_gpus"]
if "num_cpus" in task:
resource.num_cpus = eval(task["num_cpus"]) if isinstance(task["num_cpus"], str) else task["num_cpus"]
if "memory" in task:
resource.memory = eval(task["memory"]) if isinstance(task["memory"], str) else task["memory"]
return resource
def run_tasks(
tasks: list[dict], args: argparse.Namespace, runtime_env: dict | None = None, concurrent: bool = False
) -> None:
"""
Submit tasks to the Ray cluster for execution.
Args:
tasks (list[dict]): A list of task configuration dictionaries.
args (argparse.Namespace): Parsed command-line arguments.
runtime_env (dict | None): Ray runtime environment configuration containing:
- pip (list[str] | None): Additional pip packages to install.
- py_modules (list[str] | None): Python modules to include in the environment.
concurrent (bool): Whether to launch tasks simultaneously as a batch,
or independently as resources become available.
Returns:
None
"""
job_objs = []
util.ray_init(ray_address=args.ray_address, runtime_env=runtime_env, log_to_driver=False)
for task in tasks:
resource = parse_task_resource(task)
print(f"[INFO] Creating job {task['name']} with resource={resource}")
job = util.Job(
name=task["name"],
py_args=task["py_args"],
resources=resource,
node=util.JobNode(
specific=task.get("node", {}).get("specific"),
hostname=task.get("node", {}).get("hostname"),
node_id=task.get("node", {}).get("node_id"),
),
)
job_objs.append(job)
start = datetime.now()
print(f"[INFO] Creating {len(job_objs)} jobs at {start.strftime('%H:%M:%S.%f')} with runtime env={runtime_env}")
# submit jobs
util.submit_wrapped_jobs(
jobs=job_objs,
test_mode=args.test,
concurrent=concurrent,
)
end = datetime.now()
print(
f"[INFO] All jobs completed at {end.strftime('%H:%M:%S.%f')}, took {(end - start).total_seconds():.2f} seconds."
)
def main() -> None:
"""
Main entry point for the Ray task runner script.
Reads the YAML task configuration file, parses CLI arguments,
and dispatches tasks to the Ray cluster.
Returns:
None
"""
args = parse_args()
with open(args.task_cfg) as f:
config = yaml.safe_load(f)
tasks = config["tasks"]
runtime_env = {
"pip": None if not config.get("pip") else config["pip"],
"py_modules": None if not config.get("py_modules") else config["py_modules"],
}
concurrent = config.get("concurrent", False)
run_tasks(
tasks=tasks,
args=args,
runtime_env=runtime_env,
concurrent=concurrent,
)
if __name__ == "__main__":
main()
......@@ -7,12 +7,17 @@ import os
import re
import select
import subprocess
import sys
import threading
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime
from math import isclose
from time import time
from typing import Any
import ray
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from tensorboard.backend.event_processing.directory_watcher import DirectoryDeletedError
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
......@@ -307,12 +312,21 @@ def execute_job(
return " ".join(result_details)
def ray_init(ray_address: str = "auto", runtime_env: dict[str, Any] | None = None, log_to_driver: bool = False):
"""Initialize Ray with the given address and runtime environment."""
if not ray.is_initialized():
print(
f"[INFO] Initializing Ray with address {ray_address}, log_to_driver={log_to_driver},"
f" runtime_env={runtime_env}"
)
ray.init(address=ray_address, runtime_env=runtime_env, log_to_driver=log_to_driver)
def get_gpu_node_resources(
total_resources: bool = False,
one_node_only: bool = False,
include_gb_ram: bool = False,
include_id: bool = False,
ray_address: str = "auto",
) -> list[dict] | dict:
"""Get information about available GPU node resources.
......@@ -329,8 +343,7 @@ def get_gpu_node_resources(
or simply the resource for a single node if requested.
"""
if not ray.is_initialized():
ray.init(address=ray_address)
raise Exception("Ray is not initialized. Please initialize Ray before getting node resources.")
nodes = ray.nodes()
node_resources = []
total_cpus = 0
......@@ -481,3 +494,225 @@ def _dicts_equal(d1: dict, d2: dict, tol=1e-9) -> bool:
elif d1[key] != d2[key]:
return False
return True
@dataclass
class JobResource:
"""A dataclass to represent a resource request for a job."""
num_gpus: float | None = None
num_cpus: float | None = None
memory: int | None = None # in bytes
def to_opt(self) -> dict[str, Any]:
"""Convert the resource request to a dictionary."""
opt = {}
if self.num_gpus is not None:
opt["num_gpus"] = self.num_gpus
if self.num_cpus is not None:
opt["num_cpus"] = self.num_cpus
if self.memory is not None:
opt["memory"] = self.memory
return opt
def to_pg_resources(self) -> dict[str, Any]:
"""Convert the resource request to a dictionary suitable for placement groups."""
res = {}
if self.num_gpus is not None:
res["GPU"] = self.num_gpus
if self.num_cpus is not None:
res["CPU"] = self.num_cpus
if self.memory is not None:
res["memory"] = self.memory
return res
@dataclass
class JobNode:
"""A dataclass to represent a node for job affinity."""
specific: str | None = None
hostname: str | None = None
node_id: str | None = None
def to_opt(self, nodes: list[dict[str, Any]]) -> dict[str, Any]:
"""
Convert node affinity settings into a dictionary of Ray actor scheduling options.
Args:
nodes (list[dict[str, Any]]): List of node metadata from `ray.nodes()` which looks like this:
[{
'NodeID': 'xxx',
'Alive': True,
'NodeManagerAddress': 'x.x.x.x',
'NodeManagerHostname': 'ray-head-mjzzf',
'NodeManagerPort': 44039,
'ObjectManagerPort': 35689,
'ObjectStoreSocketName': '/tmp/ray/session_xxx/sockets/plasma_store',
'RayletSocketName': '/tmp/ray/session_xxx/sockets/raylet',
'MetricsExportPort': 8080,
'NodeName': 'x.x.x.x',
'RuntimeEnvAgentPort': 63725,
'DeathReason': 0,
'DeathReasonMessage': '',
'alive': True,
'Resources': {
'node:__internal_head__': 1.0,
'object_store_memory': 422449279795.0,
'memory': 1099511627776.0,
'GPU': 8.0,
'node:x.x.x.x': 1.0,
'CPU': 192.0,
'accelerator_type:H20': 1.0
},
'Labels': {
'ray.io/node_id': 'xxx'
}
},...]
Returns:
dict[str, Any]: A dictionary with possible scheduling options:
- Empty if no specific placement requirement.
- "scheduling_strategy" key set to `NodeAffinitySchedulingStrategy`
if hostname or node_id placement is specified.
Raises:
ValueError: If hostname/node_id is specified but not found in the cluster
or the node is not alive.
"""
opt = {}
if self.specific is None or self.specific == "any":
return opt
elif self.specific == "hostname":
if self.hostname is None:
raise ValueError("Hostname must be specified when specific is 'hostname'")
for node in nodes:
if node["NodeManagerHostname"] == self.hostname:
if node["alive"] is False:
raise ValueError(f"Node {node['NodeID']} is not alive")
opt["scheduling_strategy"] = NodeAffinitySchedulingStrategy(node_id=node["NodeID"], soft=False)
return opt
raise ValueError(f"Hostname {self.hostname} not found in nodes: {nodes}")
elif self.specific == "node_id":
if self.node_id is None:
raise ValueError("Node ID must be specified when specific is 'node_id'")
for node in nodes:
if node["NodeID"] == self.node_id:
if node["alive"] is False:
raise ValueError(f"Node {node['NodeID']} is not alive")
opt["scheduling_strategy"] = NodeAffinitySchedulingStrategy(node_id=node["NodeID"], soft=False)
return opt
raise ValueError(f"Node ID {self.node_id} not found in nodes: {nodes}")
else:
raise ValueError(f"Invalid specific value: {self.specific}. Must be 'any', 'hostname', or 'node_id'.")
@dataclass
class Job:
"""A dataclass to represent a job to be submitted to Ray."""
# job command
cmd: str | None = None
py_args: str | None = None
# identifier string for the job, e.g., "job 0"
name: str = ""
# job resources, e.g., {"CPU": 4, "GPU": 1}
resources: JobResource | None = None
# specify the node to run the job on, if needed to run on a specific node
node: JobNode | None = None
def to_opt(self, nodes: list[dict[str, Any]]) -> dict[str, Any]:
"""
Convert the job definition into a dictionary of Ray scheduling options.
Args:
nodes (list[dict[str, Any]]): Node information from `ray.nodes()`.
Returns:
dict[str, Any]: Combined scheduling options from:
- `JobResource.to_opt()` for resource requirements
- `JobNode.to_opt()` for node placement constraints
"""
opt = {}
if self.resources is not None:
opt.update(self.resources.to_opt())
if self.node is not None:
opt.update(self.node.to_opt(nodes))
return opt
@ray.remote
class JobActor:
"""Actor to run job in Ray cluster."""
def __init__(self, job: Job, test_mode: bool, log_all_output: bool, extract_experiment: bool = False):
self.job = job
self.test_mode = test_mode
self.log_all_output = log_all_output
self.extract_experiment = extract_experiment
self.done = True
def ready(self) -> bool:
"""Check if the job is ready to run."""
return self.done
def run(self):
"""Run the job."""
cmd = self.job.cmd if self.job.cmd else " ".join([sys.executable, *self.job.py_args.split()])
return execute_job(
job_cmd=cmd,
identifier_string=self.job.name,
test_mode=self.test_mode,
extract_experiment=self.extract_experiment,
log_all_output=self.log_all_output,
)
def submit_wrapped_jobs(
jobs: Sequence[Job],
log_realtime: bool = True,
test_mode: bool = False,
concurrent: bool = False,
) -> None:
"""
Submit a list of jobs to the Ray cluster and manage their execution.
Args:
jobs (Sequence[Job]): A sequence of Job objects to execute on Ray.
log_realtime (bool): Whether to log stdout/stderr in real-time. Defaults to True.
test_mode (bool): If True, run in GPU sanity-check mode instead of actual jobs. Defaults to False.
concurrent (bool): Whether to launch tasks simultaneously as a batch,
or independently as resources become available. Defaults to False.
Returns:
None
"""
if jobs is None or len(jobs) == 0:
print("[WARNING]: No jobs to submit")
return
if not ray.is_initialized():
raise Exception("Ray is not initialized. Please initialize Ray before submitting jobs.")
nodes = ray.nodes()
actors = []
for i, job in enumerate(jobs):
opts = job.to_opt(nodes)
name = job.name or f"job_{i + 1}"
print(f"[INFO] Create {name} with opts={opts}")
job_actor = JobActor.options(**opts).remote(job, test_mode, log_realtime)
actors.append(job_actor)
try:
if concurrent:
ray.get([actor.ready.remote() for actor in actors])
print("[INFO] All actors are ready to run.")
future = [actor.run.remote() for actor in actors]
while future:
ready, not_ready = ray.wait(future, timeout=5)
for result in ray.get(ready):
print(f"\n{result}\n")
future = not_ready
print("[INFO] all jobs completed.")
except KeyboardInterrupt:
print("[INFO] KeyboardInterrupt received, cancelling …")
for actor in actors:
ray.cancel(actor, force=True)
sys.exit(0)
......@@ -3,12 +3,6 @@
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import ray
import util
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
"""
This script dispatches sub-job(s) (individual jobs, use :file:`tuner.py` for tuning jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
......@@ -64,6 +58,10 @@ Usage:
./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h
"""
import argparse
import util
def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
"""
......@@ -75,9 +73,14 @@ def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
args: The arguments for resource allocation
"""
if not ray.is_initialized():
ray.init(address=args.ray_address, log_to_driver=True)
job_results = []
job_objs = []
util.ray_init(
ray_address=args.ray_address,
runtime_env={
"py_modules": None if not args.py_modules else args.py_modules,
},
log_to_driver=False,
)
gpu_node_resources = util.get_gpu_node_resources(include_id=True, include_gb_ram=True)
if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers:
......@@ -97,7 +100,7 @@ def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
jobs = ["nvidia-smi"] * num_nodes
for i, job in enumerate(jobs):
gpu_node = gpu_node_resources[i % num_nodes]
print(f"[INFO]: Submitting job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
print(f"[INFO]: Creating job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
print(
f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}"
f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}"
......@@ -106,19 +109,19 @@ def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
num_gpus = args.gpu_per_worker[i] / args.num_workers[i]
num_cpus = args.cpu_per_worker[i] / args.num_workers[i]
memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i]
print(f"[INFO]: Requesting {num_gpus=} {num_cpus=} {memory=} id={gpu_node['id']}")
job = util.remote_execute_job.options(
num_gpus=num_gpus,
num_cpus=num_cpus,
memory=memory,
scheduling_strategy=NodeAffinitySchedulingStrategy(gpu_node["id"], soft=False),
).remote(job, f"Job {i}", args.test)
job_results.append(job)
results = ray.get(job_results)
for i, result in enumerate(results):
print(f"[INFO]: Job {i} result: {result}")
print("[INFO]: All jobs completed.")
job_objs.append(
util.Job(
cmd=job,
name=f"Job-{i + 1}",
resources=util.JobResource(num_gpus=num_gpus, num_cpus=num_cpus, memory=memory),
node=util.JobNode(
specific="node_id",
node_id=gpu_node["id"],
),
)
)
# submit jobs
util.submit_wrapped_jobs(jobs=job_objs, test_mode=args.test, concurrent=False)
if __name__ == "__main__":
......@@ -134,6 +137,15 @@ if __name__ == "__main__":
"that GPU resources are correctly isolated."
),
)
parser.add_argument(
"--py_modules",
type=str,
nargs="*",
default=[],
help=(
"List of python modules or paths to add before running the job. Example: --py_modules my_package/my_package"
),
)
parser.add_argument(
"--sub_jobs",
type=str,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment