Unverified Commit ea766783 authored by garylvov's avatar garylvov Committed by GitHub

Clarifies Ray Documentation and Fixes Minor Issues (#1717)

# Description
This PR cleans up the Ray documentation to be more clear, and fixes some
small issues in the code.

- Moved local set up to be an easier Docker-based thing
- Added Wget to docker container to fix issue where Ray head would fail
health check on GKE where the workers wouldn't start
- Removed redundant information from Documentation
- Added a local quickstart
- (investigated whether https mlflow was possible, added flag to jinja
env)@kellyguo11
- Added better compatibility with other workflows to address #1703 
- Avoided early exit due to buffer overflow to address #1703 (thank you
@giulioturrisi for helping find this)
<!-- As a practice, it is recommended to open an issue to have
discussions on the proposed pull request.
This makes it easier for the community to keep track of what is being
developed or added, and if a given feature
is demanded by more than one party. -->

## Type of change

<!-- As you go through the list, delete the ones that are not
applicable. -->

- Bug fix (non-breaking change which fixes an issue)
- This change requires a documentation update


![image](https://github.com/user-attachments/assets/eb38b3c8-8e9c-438d-9218-8b0662146f96)


## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Signed-off-by: 's avatargarylvov <67614381+garylvov@users.noreply.github.com>
Co-authored-by: 's avatarGary Lvov <glvov@theaiinstitute.com>
parent 21173c3e
This diff is collapsed.
FROM isaac-lab-base:latest
# WGet is needed so that GCS or other cloud providers can mark the container as ready.
# Otherwise the Ray liveliness checks fail.
RUN apt-get update && apt-get install wget
# Set NVIDIA paths
ENV PATH="/usr/local/nvidia/bin:$PATH"
ENV LD_LIBRARY_PATH="/usr/local/nvidia/lib64"
......
......@@ -19,7 +19,6 @@ spec:
block: "true"
dashboard-host: 0.0.0.0
dashboard-port: "8265"
node-ip-address: "0.0.0.0"
port: "6379"
include-dashboard: "true"
ray-debugger-external: "true"
......@@ -30,7 +29,7 @@ spec:
apiVersion: v1
kind: Service
metadata:
name: head
name: {{ name }}-head
spec:
type: LoadBalancer
template:
......@@ -130,7 +129,7 @@ spec:
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
command: ["/bin/bash", "-c", "ray start --address=head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
command: ["/bin/bash", "-c", "ray start --address={{name}}-head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
- image: fluent/fluent-bit:1.9.6
name: fluentbit
resources:
......
......@@ -21,7 +21,7 @@ Usage:
.. code-block:: bash
./isaaclab.sh -p source/standalone/workflows/ray/grok_cluster_with_kubectl.py
python3 source/standalone/workflows/ray/grok_cluster_with_kubectl.py
# For options, supply -h arg
"""
......@@ -67,9 +67,10 @@ def get_clusters(pods: list, cluster_name_prefix: str) -> set:
match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
if match:
# Get base name without head/worker suffix
base_name = match.group(1).split("-head")[0].split("-worker")[0]
clusters.add(base_name)
# Get base name without head/worker suffix (skip workers)
if "head" in pod_name:
base_name = match.group(1).split("-head")[0]
clusters.add(base_name)
return sorted(clusters)
......@@ -90,9 +91,7 @@ def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") ->
clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix)
if len(clusters) > 1:
raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.")
base_name = cluster_prefix.split("-head")[0].split("-worker")[0]
mlflow_name = f"{base_name}-mlflow"
mlflow_name = f"{cluster_prefix}-mlflow"
cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
try:
......@@ -102,7 +101,8 @@ def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") ->
# Get cluster IP
cluster_ip = fields[2]
port = "5000" # Default MLflow port
# This needs to be http to be resolved. HTTPS can't be resolved
# This should be fine as it is on a subnet on the cluster regardless
return f"http://{cluster_ip}:{port}"
except subprocess.CalledProcessError as e:
raise ValueError(f"Could not grok MLflow: {e}") # Fixed f-string
......
......@@ -8,29 +8,28 @@ import pathlib
import subprocess
import yaml
import util
from jinja2 import Environment, FileSystemLoader
from kubernetes import config
import source.standalone.workflows.ray.util as util
"""This script helps create one or more KubeRay clusters.
Usage:
.. code-block:: bash
# If the head node is stuck on container creating, make sure to create a secret
./isaaclab.sh -p source/standalone/workflows/ray/launch.py -h
python3 source/standalone/workflows/ray/launch.py -h
# Examples
# The following creates 8 GPUx1 nvidia l4 workers
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
python3 source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
--num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1
# The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
# and 2 GPUx4 nvidia-tesla-t4 GPU workers
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
python3 source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
--num_workers 1 2 --num_clusters 1 \
--worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4
......@@ -53,7 +52,7 @@ def apply_manifest(args: argparse.Namespace) -> None:
# Set up Jinja2 environment for loading templates
templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
file_loader = FileSystemLoader(str(templates_dir))
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True)
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True, autoescape=True)
# Define template filename
template_file = "kuberay.yaml.jinja"
......@@ -79,6 +78,7 @@ def apply_manifest(args: argparse.Namespace) -> None:
# Apply the Kubernetes manifest using kubectl
try:
print(cleaned_yaml_string)
subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
except subprocess.CalledProcessError as e:
exit(f"An error occurred while running `kubectl`: {e}")
......
......@@ -40,16 +40,16 @@ Usage:
.. code-block:: bash
# Example; submitting a tuning job
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
python3 source/standalone/workflows/ray/submit_job.py \
--aggregate_jobs /workspace/isaaclab/source/standalone/workflows/ray/tuner.py \
--cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
--cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <ML_FLOW_URI>
--cfg_class CartpoleTheiaJobCfg --mlflow_uri <ML_FLOW_URI>
# Example: Submitting resource wrapped job
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --sub_jobs ./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-v0 --headless+./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-RGB-Camera-Direct-v0 --headless --enable_cameras agent.params.config.max_epochs=150
python3 source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --test
# For all command line arguments
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py -h
python3 source/standalone/workflows/ray/submit_job.py -h
"""
script_directory = os.path.dirname(os.path.abspath(__file__))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}
......
......@@ -17,8 +17,9 @@ from ray.tune.search.repeater import Repeater
"""
This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,
into individual jobs (shell commands) to run on the GPU-enabled nodes of the cluster.
By default, (unless combined as a sub-job in a resource-wrapped aggregate job), one worker is created
for each GPU-enabled node in the cluster for each individual job.
By default, one worker is created for each GPU-enabled node in the cluster for each individual job.
To use more than one worker per node (likely the case for multi-GPU machines), supply the
num_workers_per_node argument.
Each hyperparameter sweep configuration should include the workflow,
runner arguments, and hydra arguments to vary.
......@@ -39,16 +40,15 @@ Usage:
./isaaclab.sh -p source/standalone/workflows/ray/tuner.py -h
# Examples
# Local (not within a docker container, when within a local docker container, do not supply run_mode argument)
# Local
./isaaclab.sh -p source/standalone/workflows/ray/tuner.py --run_mode local \
--cfg_file source/standalone/workflows/ray/hyperparameter_tuning/vision_cartpole_cfg.py \
--cfg_class CartpoleRGBNoTuneJobCfg
# Local docker: start the ray server and run above command in the same running container without run_mode arg
--cfg_class CartpoleTheiaJobCfg
# Remote (run grok cluster or create config file mentioned in :file:`submit_job.py`)
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
--aggregate_jobs tuner.py \
--cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
--cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>
--cfg_class CartpoleTheiaJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>
"""
......@@ -74,7 +74,7 @@ class IsaacLabTuneTrainable(tune.Trainable):
print(f"[INFO]: Recovered invocation with {self.invoke_cmd}")
self.experiment = None
def reset_config(self, new_config):
def reset_config(self, new_config: dict):
"""Allow environments to be re-used by fetching a new invocation command"""
self.setup(new_config)
return True
......@@ -95,15 +95,15 @@ class IsaacLabTuneTrainable(tune.Trainable):
self.proc = experiment["proc"]
self.experiment_name = experiment["experiment_name"]
self.isaac_logdir = experiment["logdir"]
self.tensorboard_logdir = self.isaac_logdir + f"/{self.experiment_name}/summaries"
self.tensorboard_logdir = self.isaac_logdir + "/" + self.experiment_name
self.done = False
if self.proc is None:
raise ValueError("Could not start trial.")
if self.proc.poll() is not None: # process finished, signal finish
proc_status = self.proc.poll()
if proc_status is not None: # process finished, signal finish
self.data["done"] = True
print("[INFO]: Process finished, returning...")
print(f"[INFO]: Process finished with {proc_status}, returning...")
else: # wait until the logs are ready or fresh
data = util.load_tensorboard_logs(self.tensorboard_logdir)
......@@ -220,10 +220,24 @@ class JobCfg:
"""To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
at a minimum, the tune job should inherit from this class."""
def __init__(self, cfg):
def __init__(self, cfg: dict):
"""
Runner args include command line arguments passed to the task.
For example:
cfg["runner_args"]["headless_singleton"] = "--headless"
cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras"
"""
assert "runner_args" in cfg, "No runner arguments specified."
"""
Task is the desired task to train on. For example:
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
"""
assert "--task" in cfg["runner_args"], "No task specified."
assert "hydra_args" in cfg, "No hypeparameters specified."
"""
Hydra args define the hyperparameters varied within the sweep. For example:
cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"])
"""
assert "hydra_args" in cfg, "No hyperparameters specified."
self.cfg = cfg
......
......@@ -6,15 +6,18 @@ import argparse
import os
import re
import subprocess
import threading
from datetime import datetime
from math import isclose
import ray
from tensorboard.backend.event_processing.directory_watcher import DirectoryDeletedError
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
def load_tensorboard_logs(directory: str) -> dict:
"""From a tensorboard directory, get the latest scalar values.
"""From a tensorboard directory, get the latest scalar values. If the logs can't be
found, check the summaries sublevel.
Args:
directory: The directory of the tensorboard logging.
......@@ -22,19 +25,23 @@ def load_tensorboard_logs(directory: str) -> dict:
Returns:
The latest available scalar values.
"""
# Initialize the event accumulator with a size guidance for only the latest entry
size_guidance = {"scalars": 1} # Load only the latest entry for scalars
event_acc = EventAccumulator(directory, size_guidance=size_guidance)
event_acc.Reload() # Load all data from the directory
def get_latest_scalars(path: str) -> dict:
event_acc = EventAccumulator(path, size_guidance={"scalars": 1})
try:
event_acc.Reload()
if event_acc.Tags()["scalars"]:
return {
tag: event_acc.Scalars(tag)[-1].value
for tag in event_acc.Tags()["scalars"]
if event_acc.Scalars(tag)
}
except (KeyError, OSError, RuntimeError, DirectoryDeletedError):
return {}
# Extract the latest scalars logged
latest_scalars = {}
for tag in event_acc.Tags()["scalars"]:
events = event_acc.Scalars(tag)
if events: # Check if there is at least one entry
latest_event = events[-1] # Get the latest entry
latest_scalars[tag] = latest_event.value
return latest_scalars
scalars = get_latest_scalars(directory)
return scalars or get_latest_scalars(os.path.join(directory, "summaries"))
def get_invocation_command_from_cfg(
......@@ -190,47 +197,62 @@ def execute_job(
experiment_info_pattern = re.compile("Exact experiment name requested from command line: (.+)")
logdir_pattern = re.compile(r"\[INFO\] Logging experiment in directory: (.+)$")
err_pattern = re.compile("There was an error (.+)$")
with process.stdout as stdout:
for line in iter(stdout.readline, ""):
def stream_reader(stream, identifier_string, result_details):
for line in iter(stream.readline, ""):
line = line.strip()
result_details.append(f"{identifier_string}: {line} \n")
result_details.append(f"{identifier_string}: {line}\n")
if log_all_output:
print(f"{identifier_string}: {line}")
if extract_experiment:
exp_match = experiment_info_pattern.search(line)
log_match = logdir_pattern.search(line)
err_match = err_pattern.search(line)
if err_match:
raise ValueError(f"Encountered an error during trial run. {' '.join(result_details)}")
if exp_match:
experiment_name = exp_match.group(1)
if log_match:
logdir = log_match.group(1)
if experiment_name and logdir:
result = {
"experiment_name": experiment_name,
"logdir": logdir,
"proc": process,
"result": " ".join(result_details),
}
return result
with process.stderr as stderr:
for line in iter(stderr.readline, ""):
line = line.strip()
result_details.append(f"{identifier_string}: {line}")
# Read stdout until we find experiment info
# Do some careful handling prevent overflowing the pipe reading buffer with error 141
for line in iter(process.stdout.readline, ""):
line = line.strip()
result_details.append(f"{identifier_string}: {line} \n")
if log_all_output:
print(f"{identifier_string}: {line}")
process.wait() # Wait for the subprocess to finish naturally if not exited early
now = datetime.now().strftime("%H:%M:%S.%f")
completion_info = f"\n[INFO]: {identifier_string}: Job Started at {start_time}, completed at {now}\n"
print(completion_info)
result_details.append(completion_info)
return " ".join(result_details)
if extract_experiment:
exp_match = experiment_info_pattern.search(line)
log_match = logdir_pattern.search(line)
err_match = err_pattern.search(line)
if err_match:
raise ValueError(f"Encountered an error during trial run. {' '.join(result_details)}")
if exp_match:
experiment_name = exp_match.group(1)
if log_match:
logdir = log_match.group(1)
if experiment_name and logdir:
# Start stderr reader after finding experiment info
stderr_thread = threading.Thread(
target=stream_reader, args=(process.stderr, identifier_string, result_details)
)
stderr_thread.daemon = True
stderr_thread.start()
# Start stdout reader to continue reading to flush buffer
stdout_thread = threading.Thread(
target=stream_reader, args=(process.stdout, identifier_string, result_details)
)
stdout_thread.daemon = True
stdout_thread.start()
return {
"experiment_name": experiment_name,
"logdir": logdir,
"proc": process,
"result": " ".join(result_details),
}
process.wait()
now = datetime.now().strftime("%H:%M:%S.%f")
completion_info = f"\n[INFO]: {identifier_string}: Job Started at {start_time}, completed at {now}\n"
print(completion_info)
result_details.append(completion_info)
return " ".join(result_details)
def get_gpu_node_resources(
......
......@@ -6,12 +6,11 @@
import argparse
import ray
import util
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
import source.standalone.workflows.ray.util as util
"""
This script dispatches sub-job(s) (either individual jobs or tuning aggregate jobs)
This script dispatches sub-job(s) (individual jobs, use :file:`tuner.py` for tuning jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.
......
......@@ -93,6 +93,8 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
print(f"[INFO] Logging experiment in directory: {log_root_path}")
# specify directory for logging runs: {time-stamp}_{run_name}
log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# This way, the Ray Tune workflow can extract experiment name.
print(f"Exact experiment name requested from command line: {log_dir}")
if agent_cfg.run_name:
log_dir += f"_{agent_cfg.run_name}"
log_dir = os.path.join(log_root_path, log_dir)
......
......@@ -89,7 +89,11 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device
# directory for logging into
log_dir = os.path.join("logs", "sb3", args_cli.task, datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
run_info = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_root_path = os.path.abspath(os.path.join("logs", "sb3", args_cli.task))
print(f"[INFO] Logging experiment in directory: {log_root_path}")
print(f"Exact experiment name requested from command line: {run_info}")
log_dir = os.path.join(log_root_path, run_info)
# dump the configuration into log-directory
dump_yaml(os.path.join(log_dir, "params", "env.yaml"), env_cfg)
dump_yaml(os.path.join(log_dir, "params", "agent.yaml"), agent_cfg)
......
......@@ -135,6 +135,7 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
print(f"[INFO] Logging experiment in directory: {log_root_path}")
# specify directory for logging runs: {time-stamp}_{run_name}
log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + f"_{algorithm}_{args_cli.ml_framework}"
print(f"Exact experiment name requested from command line {log_dir}")
if agent_cfg["agent"]["experiment"]["experiment_name"]:
log_dir += f'_{agent_cfg["agent"]["experiment"]["experiment_name"]}'
# set directory into agent config
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment