Unverified Commit 286e1eea authored by glvov-bdai's avatar glvov-bdai Committed by GitHub

Adds Ray Workflow: Multiple Run Support, Distributed Hyperparameter Tuning,...

Adds Ray Workflow: Multiple Run Support, Distributed Hyperparameter Tuning, and Consistent Setup Across Local/Cloud (#1301)

This PR adds Ray support, which enables a lot of really cool stuff by
leveraging the existing Hydra support, including but not limited to:

- Several training runs at once in parallel or consecutively with
minimal interaction
- Using the same training setup everywhere (on cloud and local) with
minimal overhead
- Tuning hyperparameters
- Tuning hyperparameters in parallel on multiple GPUs and/or multiple
GPU Nodes
- Simultaneously tuning model hyperparameters for different
environments/agents
- Resource Isolation
parent d8bc7256
......@@ -100,6 +100,7 @@ Table of Contents
source/features/hydra
source/features/multi_gpu
Tiled Rendering</source/overview/sensors/camera>
source/features/ray
source/features/reproducibility
.. toctree::
......
This diff is collapsed.
FROM isaac-lab-base:latest
# Set NVIDIA paths
ENV PATH="/usr/local/nvidia/bin:$PATH"
ENV LD_LIBRARY_PATH="/usr/local/nvidia/lib64"
# Link NVIDIA binaries
RUN ln -sf /usr/local/nvidia/bin/nvidia* /usr/bin
# Install Ray and configure it
RUN /workspace/isaaclab/_isaac_sim/python.sh -m pip install "ray[default, tune]"==2.31.0 && \
sed -i "1i $(echo "#!/workspace/isaaclab/_isaac_sim/python.sh")" \
/isaac-sim/kit/python/bin/ray && ln -s /isaac-sim/kit/python/bin/ray /usr/local/bin/ray
# Install tuning dependencies
RUN /workspace/isaaclab/_isaac_sim/python.sh -m pip install optuna bayesian-optimization
# Install MLflow for logging
RUN /workspace/isaaclab/_isaac_sim/python.sh -m pip install mlflow
# Jinja is used for templating here as full helm setup is excessive for application
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
name: {{ name }}
namespace: {{ namespace }}
spec:
rayVersion: "2.8.0"
enableInTreeAutoscaling: true
autoscalerOptions:
upscalingMode: Default
idleTimeoutSeconds: 120
imagePullPolicy: Always
securityContext: {}
envFrom: []
headGroupSpec:
rayStartParams:
block: "true"
dashboard-host: 0.0.0.0
dashboard-port: "8265"
node-ip-address: "0.0.0.0"
port: "6379"
include-dashboard: "true"
ray-debugger-external: "true"
object-manager-port: "8076"
num-gpus: "0"
num-cpus: "0" # prevent scheduling jobs to the head node - workers only
headService:
apiVersion: v1
kind: Service
metadata:
name: head
spec:
type: LoadBalancer
template:
metadata:
labels:
app.kubernetes.io/instance: tuner
app.kubernetes.io/name: kuberay
cloud.google.com/gke-ray-node-type: head
spec:
serviceAccountName: {{ service_account_name }}
affinity: {}
securityContext:
fsGroup: 100
containers:
- env:
image: {{ image }}
imagePullPolicy: Always
name: head
resources:
limits:
cpu: "{{ num_head_cpu }}"
memory: {{ head_ram_gb }}G
nvidia.com/gpu: "0"
requests:
cpu: "{{ num_head_cpu }}"
memory: {{ head_ram_gb }}G
nvidia.com/gpu: "0"
securityContext: {}
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
command: ["/bin/bash", "-c", "ray start --head --port=6379 --object-manager-port=8076 --dashboard-host=0.0.0.0 --dashboard-port=8265 --include-dashboard=true && tail -f /dev/null"]
- image: fluent/fluent-bit:1.9.6
name: fluentbit
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 100m
memory: 128Mi
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
imagePullSecrets: []
nodeSelector:
iam.gke.io/gke-metadata-server-enabled: "true"
volumes:
- configMap:
name: fluentbit-config
name: fluentbit-config
- name: ray-logs
emptyDir: {}
workerGroupSpecs:
{% for it in range(gpu_per_worker|length) %}
- groupName: "{{ worker_accelerator[it] }}x{{ gpu_per_worker[it] }}-cpu-{{ cpu_per_worker[it] }}-ram-gb-{{ ram_gb_per_worker[it] }}"
replicas: {{ num_workers[it] }}
maxReplicas: {{ num_workers[it] }}
minReplicas: {{ num_workers[it] }}
rayStartParams:
block: "true"
ray-debugger-external: "true"
replicas: "{{num_workers[it]}}"
template:
metadata:
annotations: {}
labels:
app.kubernetes.io/instance: tuner
app.kubernetes.io/name: kuberay
cloud.google.com/gke-ray-node-type: worker
spec:
serviceAccountName: {{ service_account_name }}
affinity: {}
securityContext:
fsGroup: 100
containers:
- env:
- name: NVIDIA_VISIBLE_DEVICES
value: "all"
- name: NVIDIA_DRIVER_CAPABILITIES
value: "compute,utility"
image: {{ image }}
imagePullPolicy: Always
name: ray-worker
resources:
limits:
cpu: "{{ cpu_per_worker[it] }}"
memory: {{ ram_gb_per_worker[it] }}G
nvidia.com/gpu: "{{ gpu_per_worker[it] }}"
requests:
cpu: "{{ cpu_per_worker[it] }}"
memory: {{ ram_gb_per_worker[it] }}G
nvidia.com/gpu: "{{ gpu_per_worker[it] }}"
securityContext: {}
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
command: ["/bin/bash", "-c", "ray start --address=head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
- image: fluent/fluent-bit:1.9.6
name: fluentbit
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 100m
memory: 128Mi
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
imagePullSecrets: []
nodeSelector:
cloud.google.com/gke-accelerator: {{ worker_accelerator[it] }}
iam.gke.io/gke-metadata-server-enabled: "true"
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
volumes:
- configMap:
name: fluentbit-config
name: fluentbit-config
- name: ray-logs
emptyDir: {}
{% endfor %}
---
# ML Flow Server - for fetching logs
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{name}}-mlflow
namespace: {{ namespace }}
spec:
replicas: 1
selector:
matchLabels:
app: mlflow
template:
metadata:
labels:
app: mlflow
spec:
containers:
- name: mlflow
image: ghcr.io/mlflow/mlflow:v2.9.2
ports:
- containerPort: 5000
command: ["mlflow"]
args:
- server
- --host=0.0.0.0
- --port=5000
- --backend-store-uri=sqlite:///mlflow.db
---
# ML Flow Service (for port forwarding, kubectl port-forward service/{name}-mlflow 5000:5000)
apiVersion: v1
kind: Service
metadata:
name: {{name}}-mlflow
namespace: {{ namespace }}
spec:
selector:
app: mlflow
ports:
- port: 5000
targetPort: 5000
type: ClusterIP
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import os
import re
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
"""
This script requires that kubectl is installed and KubeRay was used to create the cluster.
Creates a config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster, and also fetches the MLFlow URI.
Usage:
.. code-block:: bash
./isaaclab.sh -p source/standalone/workflows/ray/grok_cluster_with_kubectl.py
# For options, supply -h arg
"""
def get_namespace() -> str:
"""Get the current Kubernetes namespace from the context, fallback to default if not set"""
try:
namespace = (
subprocess.check_output(["kubectl", "config", "view", "--minify", "--output", "jsonpath={..namespace}"])
.decode()
.strip()
)
if not namespace:
namespace = "default"
except subprocess.CalledProcessError:
namespace = "default"
return namespace
def get_pods(namespace: str = "default") -> list[tuple]:
"""Get a list of all of the pods in the namespace"""
cmd = ["kubectl", "get", "pods", "-n", namespace, "--no-headers"]
output = subprocess.check_output(cmd).decode()
pods = []
for line in output.strip().split("\n"):
fields = line.split()
pod_name = fields[0]
status = fields[2]
pods.append((pod_name, status))
return pods
def get_clusters(pods: list, cluster_name_prefix: str) -> set:
"""
Get unique cluster name(s). Works for one or more clusters, based off of the number of head nodes.
Excludes MLflow deployments.
"""
clusters = set()
for pod_name, _ in pods:
# Skip MLflow pods
if "-mlflow" in pod_name:
continue
match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
if match:
# Get base name without head/worker suffix
base_name = match.group(1).split("-head")[0].split("-worker")[0]
clusters.add(base_name)
return sorted(clusters)
def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") -> str:
"""
Get MLflow service information if it exists in the namespace with the given prefix.
Only works for a single cluster instance.
Args:
namespace: Kubernetes namespace
cluster_prefix: Base cluster name (without -head/-worker suffixes)
Returns:
MLflow service URL
"""
# Strip any -head or -worker suffixes to get base name
if namespace is None:
namespace = get_namespace()
pods = get_pods(namespace=namespace)
clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix)
if len(clusters) > 1:
raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.")
base_name = cluster_prefix.split("-head")[0].split("-worker")[0]
mlflow_name = f"{base_name}-mlflow"
cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
try:
output = subprocess.check_output(cmd).decode()
fields = output.strip().split()
# Get cluster IP
cluster_ip = fields[2]
port = "5000" # Default MLflow port
return f"http://{cluster_ip}:{port}"
except subprocess.CalledProcessError as e:
raise ValueError(f"Could not grok MLflow: {e}") # Fixed f-string
def check_clusters_running(pods: list, clusters: set) -> bool:
"""
Check that all of the pods in all provided clusters are running.
Args:
pods (list): A list of tuples where each tuple contains the pod name and its status.
clusters (set): A set of cluster names to check.
Returns:
bool: True if all pods in any of the clusters are running, False otherwise.
"""
clusters_running = False
for cluster in clusters:
cluster_pods = [p for p in pods if p[0].startswith(cluster)]
total_pods = len(cluster_pods)
running_pods = len([p for p in cluster_pods if p[1] == "Running"])
if running_pods == total_pods and running_pods > 0:
clusters_running = True
break
return clusters_running
def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str:
"""
Given a cluster head pod, check its logs, which should include the ray address which can accept job requests.
Args:
head_pod (str): The name of the head pod.
namespace (str, optional): The Kubernetes namespace. Defaults to "default".
ray_head_name (str, optional): The name of the ray head container. Defaults to "head".
Returns:
str: The ray address if found, None otherwise.
Raises:
ValueError: If the logs cannot be retrieved or the ray address is not found.
"""
cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace]
try:
output = subprocess.check_output(cmd).decode()
except subprocess.CalledProcessError as e:
raise ValueError(
f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name."
)
match = re.search(r"RAY_ADDRESS='([^']+)'", output)
if match:
return match.group(1)
else:
return None
def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str:
"""
For each cluster, check that it is running, and get the Ray head address that will accept jobs.
Args:
cluster_info (dict): A dictionary containing cluster information with keys 'cluster', 'pods', and 'namespace'.
ray_head_name (str, optional): The name of the ray head container. Defaults to "head".
Returns:
str: A string containing the cluster name and its Ray head address, or an error message if the head pod or Ray address is not found.
"""
cluster, pods, namespace = cluster_info
head_pod = None
for pod_name, status in pods:
if pod_name.startswith(cluster + "-head"):
head_pod = pod_name
break
if not head_pod:
return f"Error: Could not find head pod for cluster {cluster}\n"
# Get RAY_ADDRESS and status
ray_address = get_ray_address(head_pod, namespace=namespace, ray_head_name=ray_head_name)
if not ray_address:
return f"Error: Could not find RAY_ADDRESS for cluster {cluster}\n"
# Return only cluster and ray address
return f"name: {cluster} address: {ray_address}\n"
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.")
parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.")
parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.")
parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container")
parser.add_argument(
"--namespace", help="Kubernetes namespace to use. If not provided, will detect from current context."
)
args = parser.parse_args()
# Get namespace from args or detect it
current_namespace = args.namespace if args.namespace else get_namespace()
print(f"Using namespace: {current_namespace}")
cluster_name_prefix = args.prefix
cluster_spec_file = os.path.expanduser(args.output)
# Get all pods
pods = get_pods(namespace=current_namespace)
# Get clusters
clusters = get_clusters(pods, cluster_name_prefix)
if not clusters:
print(f"No clusters found with prefix {cluster_name_prefix}")
return
# Wait for clusters to be running
while True:
pods = get_pods(namespace=current_namespace)
if check_clusters_running(pods, clusters):
break
print("Waiting for all clusters to spin up...")
time.sleep(5)
print("Checking for MLflow:")
# Check MLflow status for each cluster
for cluster in clusters:
try:
mlflow_address = get_mlflow_info(current_namespace, cluster)
print(f"MLflow address for {cluster}: {mlflow_address}")
except ValueError as e:
print(f"ML Flow not located: {e}")
print()
# Prepare cluster info for parallel processing
cluster_infos = []
for cluster in clusters:
cluster_pods = [p for p in pods if p[0].startswith(cluster)]
cluster_infos.append((cluster, cluster_pods, current_namespace))
# Use ThreadPoolExecutor to process clusters in parallel
results = []
results_lock = threading.Lock()
with ThreadPoolExecutor() as executor:
future_to_cluster = {
executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos
}
for future in as_completed(future_to_cluster):
cluster_name = future_to_cluster[future]
try:
result = future.result()
with results_lock:
results.append(result)
except Exception as exc:
print(f"{cluster_name} generated an exception: {exc}")
# Sort results alphabetically by cluster name
results.sort()
# Write sorted results to the output file (Ray info only)
with open(cluster_spec_file, "w") as f:
for result in results:
f.write(result)
print(f"Cluster spec information saved to {cluster_spec_file}")
# Display the contents of the config file
with open(cluster_spec_file) as f:
print(f.read())
if __name__ == "__main__":
main()
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import pathlib
import sys
# Allow for import of items from the ray workflow.
CUR_DIR = pathlib.Path(__file__).parent
UTIL_DIR = CUR_DIR.parent
sys.path.extend([str(UTIL_DIR), str(CUR_DIR)])
import util
import vision_cfg
from ray import tune
class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg):
def __init__(self, cfg: dict = {}):
cfg = util.populate_isaac_ray_cfg_args(cfg)
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
super().__init__(cfg, vary_env_count=False, vary_cnn=False, vary_mlp=False)
class CartpoleRGBCNNOnlyJobCfg(vision_cfg.CameraJobCfg):
def __init__(self, cfg: dict = {}):
cfg = util.populate_isaac_ray_cfg_args(cfg)
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
super().__init__(cfg, vary_env_count=False, vary_cnn=True, vary_mlp=False)
class CartpoleRGBJobCfg(vision_cfg.CameraJobCfg):
def __init__(self, cfg: dict = {}):
cfg = util.populate_isaac_ray_cfg_args(cfg)
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
super().__init__(cfg, vary_env_count=True, vary_cnn=True, vary_mlp=True)
class CartpoleResNetJobCfg(vision_cfg.ResNetCameraJob):
def __init__(self, cfg: dict = {}):
cfg = util.populate_isaac_ray_cfg_args(cfg)
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-ResNet18-v0"])
super().__init__(cfg)
class CartpoleTheiaJobCfg(vision_cfg.TheiaCameraJob):
def __init__(self, cfg: dict = {}):
cfg = util.populate_isaac_ray_cfg_args(cfg)
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
super().__init__(cfg)
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import pathlib
import sys
# Allow for import of items from the ray workflow.
UTIL_DIR = pathlib.Path(__file__).parent.parent.parent
sys.path.append(str(UTIL_DIR))
import tuner
import util
from ray import tune
class CameraJobCfg(tuner.JobCfg):
"""In order to be compatible with :meth: invoke_tuning_run, and
:class:IsaacLabTuneTrainable , configurations should
be in a similar format to this class. This class can vary env count/horizon length,
CNN structure, and MLP structure. Broad possible ranges are set, the specific values
that work can be found via tuning. Tuning results can inform better ranges for a second tuning run.
These ranges were selected for demonstration purposes. Best ranges are run/task specific."""
@staticmethod
def _get_batch_size_divisors(batch_size: int, min_size: int = 128) -> list[int]:
"""Get valid batch divisors to combine with num_envs and horizon length"""
divisors = [i for i in range(min_size, batch_size + 1) if batch_size % i == 0]
return divisors if divisors else [min_size]
def __init__(self, cfg={}, vary_env_count: bool = False, vary_cnn: bool = False, vary_mlp: bool = False):
cfg = util.populate_isaac_ray_cfg_args(cfg)
# Basic configuration
cfg["runner_args"]["headless_singleton"] = "--headless"
cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras"
cfg["hydra_args"]["agent.params.config.max_epochs"] = 200
if vary_env_count: # Vary the env count, and horizon length, and select a compatible mini-batch size
# Check from 512 to 8196 envs in powers of 2
# check horizon lengths of 8 to 256
# More envs should be better, but different batch sizes can improve gradient estimation
env_counts = [2**x for x in range(9, 13)]
horizon_lengths = [2**x for x in range(3, 8)]
selected_env_count = tune.choice(env_counts)
selected_horizon = tune.choice(horizon_lengths)
cfg["runner_args"]["--num_envs"] = selected_env_count
cfg["hydra_args"]["agent.params.config.horizon_length"] = selected_horizon
def get_valid_batch_size(config):
num_envs = config["runner_args"]["--num_envs"]
horizon_length = config["hydra_args"]["agent.params.config.horizon_length"]
total_batch = horizon_length * num_envs
divisors = self._get_batch_size_divisors(total_batch)
return divisors[0]
cfg["hydra_args"]["agent.params.config.minibatch_size"] = tune.sample_from(get_valid_batch_size)
if vary_cnn: # Vary the depth, and size of the layers in the CNN part of the agent
# Also varies kernel size, and stride.
num_layers = tune.randint(2, 3)
cfg["hydra_args"]["agent.params.network.cnn.type"] = "conv2d"
cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"])
cfg["hydra_args"]["agent.params.network.cnn.initializer"] = "{name:default}"
cfg["hydra_args"]["agent.params.network.cnn.regularizer"] = "{name:None}"
def get_cnn_layers(_):
layers = []
size = 64 # Initial input size
for _ in range(num_layers.sample()):
# Get valid kernel sizes for current size
valid_kernels = [k for k in [3, 4, 6, 8, 10, 12] if k <= size]
if not valid_kernels:
break
kernel = int(tune.choice([str(k) for k in valid_kernels]).sample())
stride = int(tune.choice(["1", "2", "3", "4"]).sample())
padding = int(tune.choice(["0", "1"]).sample())
# Calculate next size
next_size = ((size + 2 * padding - kernel) // stride) + 1
if next_size <= 0:
break
layers.append({
"filters": tune.randint(16, 32).sample(),
"kernel_size": str(kernel),
"strides": str(stride),
"padding": str(padding),
})
size = next_size
return layers
cfg["hydra_args"]["agent.params.network.cnn.convs"] = tune.sample_from(get_cnn_layers)
if vary_mlp: # Vary the MLP structure; neurons (units) per layer, number of layers,
max_num_layers = 6
max_neurons_per_layer = 128
if "env.observations.policy.image.params.model_name" in cfg["hydra_args"]:
# By decreasing MLP size when using pretrained helps prevent out of memory on L4
max_num_layers = 3
max_neurons_per_layer = 32
if "agent.params.network.cnn.convs" in cfg["hydra_args"]:
# decrease MLP size to prevent running out of memory on L4
max_num_layers = 2
max_neurons_per_layer = 32
num_layers = tune.randint(1, max_num_layers)
def get_mlp_layers(_):
return [tune.randint(4, max_neurons_per_layer).sample() for _ in range(num_layers.sample())]
cfg["hydra_args"]["agent.params.network.mlp.units"] = tune.sample_from(get_mlp_layers)
cfg["hydra_args"]["agent.params.network.mlp.initializer.name"] = tune.choice(["default"]).sample()
cfg["hydra_args"]["agent.params.network.mlp.activation"] = tune.choice(
["relu", "tanh", "sigmoid", "elu"]
).sample()
super().__init__(cfg)
class ResNetCameraJob(CameraJobCfg):
"""Try different ResNet sizes."""
def __init__(self, cfg: dict = {}):
cfg = util.populate_isaac_ray_cfg_args(cfg)
cfg["hydra_args"]["env.observations.policy.image.params.model_name"] = tune.choice(
["resnet18", "resnet34", "resnet50", "resnet101"]
)
super().__init__(cfg, vary_env_count=True, vary_cnn=False, vary_mlp=True)
class TheiaCameraJob(CameraJobCfg):
"""Try different Theia sizes."""
def __init__(self, cfg: dict = {}):
cfg = util.populate_isaac_ray_cfg_args(cfg)
cfg["hydra_args"]["env.observations.policy.image.params.model_name"] = tune.choice([
"theia-tiny-patch16-224-cddsv",
"theia-tiny-patch16-224-cdiv",
"theia-small-patch16-224-cdiv",
"theia-base-patch16-224-cdiv",
"theia-small-patch16-224-cddsv",
"theia-base-patch16-224-cddsv",
])
super().__init__(cfg, vary_env_count=True, vary_cnn=False, vary_mlp=True)
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import pathlib
import subprocess
import yaml
from jinja2 import Environment, FileSystemLoader
from kubernetes import config
import source.standalone.workflows.ray.util as util
"""This script helps create one or more KubeRay clusters.
Usage:
.. code-block:: bash
# If the head node is stuck on container creating, make sure to create a secret
./isaaclab.sh -p source/standalone/workflows/ray/launch.py -h
# Examples
# The following creates 8 GPUx1 nvidia l4 workers
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
--num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1
# The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
# and 2 GPUx4 nvidia-tesla-t4 GPU workers
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
--num_workers 1 2 --num_clusters 1 \
--worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4
"""
RAY_DIR = pathlib.Path(__file__).parent
def apply_manifest(args: argparse.Namespace) -> None:
"""Provided a Jinja templated ray.io/v1alpha1 file,
populate the arguments and create the cluster. Additionally, create
kubernetes containers for resources separated by '---' from the rest
of the file.
Args:
args: Possible arguments concerning cluster parameters.
"""
# Load Kubernetes configuration
config.load_kube_config()
# Set up Jinja2 environment for loading templates
templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
file_loader = FileSystemLoader(str(templates_dir))
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True)
# Define template filename
template_file = "kuberay.yaml.jinja"
# Convert args namespace to a dictionary
template_params = vars(args)
# Load and render the template
template = jinja_env.get_template(template_file)
file_contents = template.render(template_params)
# Parse all YAML documents in the rendered template
all_yamls = []
for doc in yaml.safe_load_all(file_contents):
all_yamls.append(doc)
# Convert back to YAML string, preserving multiple documents
cleaned_yaml_string = ""
for i, doc in enumerate(all_yamls):
if i > 0:
cleaned_yaml_string += "\n---\n"
cleaned_yaml_string += yaml.dump(doc)
# Apply the Kubernetes manifest using kubectl
try:
subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
except subprocess.CalledProcessError as e:
exit(f"An error occurred while running `kubectl`: {e}")
def parse_args() -> argparse.Namespace:
"""
Parse command-line arguments for Kubernetes deployment script.
Returns:
argparse.Namespace: Parsed command-line arguments.
"""
arg_parser = argparse.ArgumentParser(
description="Script to apply manifests to create Kubernetes objects for Ray clusters.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
arg_parser.add_argument(
"--cluster_host",
type=str,
default="google_cloud",
choices=["google_cloud"],
help=(
"In the cluster_configs directory, the name of the folder where a tune.yaml.jinja"
"file exists defining the KubeRay config. Currently only google_cloud is supported."
),
)
arg_parser.add_argument(
"--name",
type=str,
required=False,
default="isaacray",
help="Name of the Kubernetes deployment.",
)
arg_parser.add_argument(
"--namespace",
type=str,
required=False,
default="default",
help="Kubernetes namespace to deploy the Ray cluster.",
)
arg_parser.add_argument(
"--service_acount_name", type=str, required=False, default="default", help="The service account name to use."
)
arg_parser.add_argument(
"--image",
type=str,
required=True,
help="Docker image for the Ray cluster pods.",
)
arg_parser.add_argument(
"--worker_accelerator",
nargs="+",
type=str,
default=["nvidia-l4"],
help="GPU accelerator name. Supply more than one for heterogeneous resources.",
)
arg_parser = util.add_resource_arguments(arg_parser, cluster_create_defaults=True)
arg_parser.add_argument(
"--num_clusters",
type=int,
default=1,
help="How many Ray Clusters to create.",
)
arg_parser.add_argument(
"--num_head_cpu",
type=float, # to be able to schedule partial CPU heads
default=8,
help="The number of CPUs to give the Ray head.",
)
arg_parser.add_argument("--head_ram_gb", type=int, default=8, help="How many gigs of ram to give the Ray head")
args = arg_parser.parse_args()
return util.fill_in_missing_resources(args, cluster_creation_flag=True)
def main():
args = parse_args()
if "head" in args.name:
raise ValueError("For compatibility with other scripts, do not include head in the name")
if args.num_clusters == 1:
apply_manifest(args)
else:
default_name = args.name
for i in range(args.num_clusters):
args.name = default_name + "-" + str(i)
apply_manifest(args)
if __name__ == "__main__":
main()
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import logging
import multiprocessing as mp
import os
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from torch.utils.tensorboard import SummaryWriter
import mlflow
from mlflow.tracking import MlflowClient
def setup_logging(level=logging.INFO):
logging.basicConfig(level=level, format="%(asctime)s - %(levelname)s - %(message)s")
def get_existing_runs(download_dir: str) -> set[str]:
"""Get set of run IDs that have already been downloaded."""
existing_runs = set()
tensorboard_dir = os.path.join(download_dir, "tensorboard")
if os.path.exists(tensorboard_dir):
for entry in os.listdir(tensorboard_dir):
if entry.startswith("run_"):
existing_runs.add(entry[4:])
return existing_runs
def process_run(args):
"""Convert MLflow run to TensorBoard format."""
run_id, download_dir, tracking_uri = args
try:
# Set up MLflow client
mlflow.set_tracking_uri(tracking_uri)
client = MlflowClient()
run = client.get_run(run_id)
# Create TensorBoard writer
tensorboard_log_dir = os.path.join(download_dir, "tensorboard", f"run_{run_id}")
writer = SummaryWriter(log_dir=tensorboard_log_dir)
# Log parameters
for key, value in run.data.params.items():
writer.add_text(f"params/{key}", str(value))
# Log metrics with history
for key in run.data.metrics.keys():
history = client.get_metric_history(run_id, key)
for m in history:
writer.add_scalar(f"metrics/{key}", m.value, m.step)
# Log tags
for key, value in run.data.tags.items():
writer.add_text(f"tags/{key}", str(value))
writer.close()
return run_id, True
except Exception:
return run_id, False
def download_experiment_tensorboard_logs(uri: str, experiment_name: str, download_dir: str) -> None:
"""Download MLflow experiment logs and convert to TensorBoard format."""
logger = logging.getLogger(__name__)
try:
# Set up MLflow
mlflow.set_tracking_uri(uri)
logger.info(f"Connected to MLflow tracking server at {uri}")
# Get experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment is None:
raise ValueError(f"Experiment '{experiment_name}' not found at URI '{uri}'.")
# Get all runs
runs = mlflow.search_runs([experiment.experiment_id])
logger.info(f"Found {len(runs)} total runs in experiment '{experiment_name}'")
# Check existing runs
existing_runs = get_existing_runs(download_dir)
logger.info(f"Found {len(existing_runs)} existing runs in {download_dir}")
# Create directory structure
os.makedirs(os.path.join(download_dir, "tensorboard"), exist_ok=True)
# Process new runs
new_run_ids = [run.run_id for _, run in runs.iterrows() if run.run_id not in existing_runs]
if not new_run_ids:
logger.info("No new runs to process")
return
logger.info(f"Processing {len(new_run_ids)} new runs...")
# Process runs in parallel
num_processes = min(mp.cpu_count(), len(new_run_ids))
processed = 0
with ProcessPoolExecutor(max_workers=num_processes) as executor:
future_to_run = {
executor.submit(process_run, (run_id, download_dir, uri)): run_id for run_id in new_run_ids
}
for future in as_completed(future_to_run):
run_id = future_to_run[future]
try:
run_id, success = future.result()
processed += 1
if success:
logger.info(f"[{processed}/{len(new_run_ids)}] Successfully processed run {run_id}")
else:
logger.error(f"[{processed}/{len(new_run_ids)}] Failed to process run {run_id}")
except Exception as e:
logger.error(f"Error processing run {run_id}: {e}")
logger.info(f"\nAll data saved to {download_dir}/tensorboard")
except Exception as e:
logger.error(f"Error during download: {e}")
raise
def main():
parser = argparse.ArgumentParser(description="Download MLflow experiment logs for TensorBoard visualization.")
parser.add_argument("--uri", required=True, help="The MLflow tracking URI (e.g., http://localhost:5000)")
parser.add_argument("--experiment-name", required=True, help="Name of the experiment to download")
parser.add_argument("--download-dir", required=True, help="Directory to save TensorBoard logs")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
args = parser.parse_args()
setup_logging(level=logging.DEBUG if args.debug else logging.INFO)
try:
download_experiment_tensorboard_logs(args.uri, args.experiment_name, args.download_dir)
print("\nSuccess! To view the logs, run:")
print(f"tensorboard --logdir {os.path.join(args.download_dir, 'tensorboard')}")
except Exception as e:
logging.error(f"Failed to download experiment logs: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import os
import time
from concurrent.futures import ThreadPoolExecutor
from ray import job_submission
"""
This script submits aggregate job(s) to cluster(s) described in a
config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster. For KubeRay clusters, this file
can be automatically created with :file:`grok_cluster_with_kubectl.py`
Aggregate job(s) are matched with cluster(s) via the following relation:
cluster_line_index_submitted_to = job_index % total_cluster_count
Aggregate jobs are separated by the * delimiter. The ``--aggregate_jobs`` argument must be
the last argument supplied to the script.
An aggregate job could be a :file:`../tuner.py` tuning job, which automatically
creates several individual jobs when started on a cluster. Alternatively, an aggregate job
could be a :file:'../wrap_resources.py` resource-wrapped job,
which may contain several individual sub-jobs separated by
the + delimiter.
If there are more aggregate jobs than cluster(s), aggregate jobs will be submitted
as clusters become available via the defined relation above. If there are less aggregate job(s)
than clusters, some clusters will not receive aggregate job(s). The maximum number of
aggregate jobs that can be run simultaneously is equal to the number of workers created by
default by a ThreadPoolExecutor on the machine submitting jobs due to fetching the log output after
jobs finish, which is unlikely to constrain overall-job submission.
Usage:
.. code-block:: bash
# Example; submitting a tuning job
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
--aggregate_jobs /workspace/isaaclab/source/standalone/workflows/ray/tuner.py \
--cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
--cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <ML_FLOW_URI>
# Example: Submitting resource wrapped job
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --sub_jobs ./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-v0 --headless+./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-RGB-Camera-Direct-v0 --headless --enable_cameras agent.params.config.max_epochs=150
# For all command line arguments
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py -h
"""
script_directory = os.path.dirname(os.path.abspath(__file__))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}
def read_cluster_spec(fn: str | None = None) -> list[dict]:
if fn is None:
cluster_spec_path = os.path.expanduser("~/.cluster_config")
else:
cluster_spec_path = os.path.expanduser(fn)
if not os.path.exists(cluster_spec_path):
raise FileNotFoundError(f"Cluster spec file not found at {cluster_spec_path}")
clusters = []
with open(cluster_spec_path) as f:
for line in f:
parts = line.strip().split(" ")
http_address = parts[3]
cluster_info = {"name": parts[1], "address": http_address}
print(f"[INFO] Setting {cluster_info['name']}") # with {cluster_info['num_gpu']} GPUs.")
clusters.append(cluster_info)
return clusters
def submit_job(cluster: dict, job_command: str) -> None:
"""
Submits a job to a single cluster, prints the final result and Ray dashboard URL at the end.
"""
address = cluster["address"]
cluster_name = cluster["name"]
print(f"[INFO]: Submitting job to cluster '{cluster_name}' at {address}") # with {num_gpus} GPUs.")
client = job_submission.JobSubmissionClient(address)
runtime_env = {"working_dir": CONFIG["working_dir"], "executable": CONFIG["executable"]}
print(f"[INFO]: Checking contents of the directory: {CONFIG['working_dir']}")
try:
dir_contents = os.listdir(CONFIG["working_dir"])
print(f"[INFO]: Directory contents: {dir_contents}")
except Exception as e:
print(f"[INFO]: Failed to list directory contents: {str(e)}")
entrypoint = f"{CONFIG['executable']} {job_command}"
print(f"[INFO]: Attempting entrypoint {entrypoint=} in cluster {cluster}")
job_id = client.submit_job(entrypoint=entrypoint, runtime_env=runtime_env)
status = client.get_job_status(job_id)
while status in [job_submission.JobStatus.PENDING, job_submission.JobStatus.RUNNING]:
time.sleep(5)
status = client.get_job_status(job_id)
final_logs = client.get_job_logs(job_id)
print("----------------------------------------------------")
print(f"[INFO]: Cluster {cluster_name} Logs: \n")
print(final_logs)
print("----------------------------------------------------")
def submit_jobs_to_clusters(jobs: list[str], clusters: list[dict]) -> None:
"""
Submit all jobs to their respective clusters, cycling through clusters if there are more jobs than clusters.
"""
if not clusters:
raise ValueError("No clusters available for job submission.")
if len(jobs) < len(clusters):
print("[INFO]: Less jobs than clusters, some clusters will not receive jobs")
elif len(jobs) == len(clusters):
print("[INFO]: Exactly one job per cluster")
else:
print("[INFO]: More jobs than clusters, jobs submitted as clusters become available.")
with ThreadPoolExecutor() as executor:
for idx, job_command in enumerate(jobs):
# Cycle through clusters using modulus to wrap around if there are more jobs than clusters
cluster = clusters[idx % len(clusters)]
executor.submit(submit_job, cluster, job_command)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Submit multiple GPU jobs to multiple Ray clusters.")
parser.add_argument("--config_file", default="~/.cluster_config", help="The cluster config path.")
parser.add_argument(
"--aggregate_jobs",
type=str,
nargs=argparse.REMAINDER,
help="This should be last argument. The aggregate jobs to submit separated by the * delimiter.",
)
args = parser.parse_args()
if args.aggregate_jobs is not None:
jobs = " ".join(args.aggregate_jobs)
formatted_jobs = jobs.split("*")
if len(formatted_jobs) > 1:
print("Warning; Split jobs by cluster with the * delimiter")
else:
formatted_jobs = []
print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
clusters = read_cluster_spec(args.config_file)
submit_jobs_to_clusters(formatted_jobs, clusters)
This diff is collapsed.
This diff is collapsed.
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import ray
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
import source.standalone.workflows.ray.util as util
"""
This script dispatches sub-job(s) (either individual jobs or tuning aggregate jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.
If the desired resources for each sub-job is specified,
the maximum number of workers possible with the desired resources are created for each node
with GPU(s) in the cluster. It is also possible to split available node resources for each node
into the desired number of workers with the ``--num_workers`` flag, to be able to easily
parallelize sub-jobs on multi-GPU nodes. Due to Isaac Lab requiring a GPU,
this ignores all CPU only nodes such as loggers.
Sub-jobs are matched with node(s) in a cluster via the following relation:
sorted_nodes = Node sorted by descending GPUs, then descending CPUs, then descending RAM, then node ID
node_submitted_to = sorted_nodes[job_index % total_node_count]
To check the ordering of sorted nodes, supply the ``--test`` argument and run the script.
Sub-jobs are separated by the + delimiter. The ``--sub_jobs`` argument must be the last
argument supplied to the script.
If there is more than one available worker, and more than one sub-job,
sub-jobs will be executed in parallel. If there are more sub-jobs than workers, sub-jobs will
be dispatched to workers as they become available. There is no limit on the number
of sub-jobs that can be near-simultaneously submitted.
This script is meant to be executed on a Ray cluster head node as an aggregate cluster job.
To submit aggregate cluster jobs such as this script to one or more remote clusters,
see :file:`../submit_isaac_ray_job.py`.
KubeRay clusters on Google GKE can be created with :file:`../launch.py`
Usage:
.. code-block:: bash
# **Ensure that sub-jobs are separated by the ``+`` delimiter.**
# Generic Templates-----------------------------------
./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py -h
# No resource isolation; no parallelization:
./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py
--sub_jobs <JOB0>+<JOB1>+<JOB2>
# Automatic Resource Isolation; Example A: needed for parallelization
./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py \
--num_workers <NUM_TO_DIVIDE_TOTAL_RESOURCES_BY> \
--sub_jobs <JOB0>+<JOB1>
# Manual Resource Isolation; Example B: needed for parallelization
./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
--gpu_per_worker <GPU> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
# Manual Resource Isolation; Example C: Needed for parallelization, for heterogeneous workloads
./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
--gpu_per_worker <GPU1> <GPU2> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
# to see all arguments
./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py -h
"""
def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
"""
Provided a list of jobs, dispatch jobs to one worker per available node,
unless otherwise specified by resource constraints.
Args:
jobs: bash commands to execute on a Ray cluster
args: The arguments for resource allocation
"""
if not ray.is_initialized():
ray.init(address=args.ray_address, log_to_driver=True)
job_results = []
gpu_node_resources = util.get_gpu_node_resources(include_id=True, include_gb_ram=True)
if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers:
raise ValueError("Either specify only num_workers or only granular resources(GPU,CPU,RAM_GB).")
num_nodes = len(gpu_node_resources)
# Populate arguments
formatted_node_resources = {
"gpu_per_worker": [gpu_node_resources[i]["GPU"] for i in range(num_nodes)],
"cpu_per_worker": [gpu_node_resources[i]["CPU"] for i in range(num_nodes)],
"ram_gb_per_worker": [gpu_node_resources[i]["ram_gb"] for i in range(num_nodes)],
"num_workers": args.num_workers, # By default, 1 worker por node
}
args = util.fill_in_missing_resources(args, resources=formatted_node_resources, policy=min)
print(f"[INFO]: Number of GPU nodes found: {num_nodes}")
if args.test:
jobs = ["nvidia-smi"] * num_nodes
for i, job in enumerate(jobs):
gpu_node = gpu_node_resources[i % num_nodes]
print(f"[INFO]: Submitting job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
print(
f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}"
f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}"
)
print(f"[INFO] For the node parameters, creating {args.num_workers[i]} workers")
num_gpus = args.gpu_per_worker[i] / args.num_workers[i]
num_cpus = args.cpu_per_worker[i] / args.num_workers[i]
memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i]
print(f"[INFO]: Requesting {num_gpus=} {num_cpus=} {memory=} id={gpu_node['id']}")
job = util.remote_execute_job.options(
num_gpus=num_gpus,
num_cpus=num_cpus,
memory=memory,
scheduling_strategy=NodeAffinitySchedulingStrategy(gpu_node["id"], soft=False),
).remote(job, f"Job {i}", args.test)
job_results.append(job)
results = ray.get(job_results)
for i, result in enumerate(results):
print(f"[INFO]: Job {i} result: {result}")
print("[INFO]: All jobs completed.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Submit multiple jobs with optional GPU testing.")
parser = util.add_resource_arguments(arg_parser=parser)
parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
parser.add_argument(
"--test",
action="store_true",
help=(
"Run nvidia-smi test instead of the arbitrary job,"
"can use as a sanity check prior to any jobs to check "
"that GPU resources are correctly isolated."
),
)
parser.add_argument(
"--sub_jobs",
type=str,
nargs=argparse.REMAINDER,
help="This should be last wrapper argument. Jobs separated by the + delimiter to run on a cluster.",
)
args = parser.parse_args()
if args.sub_jobs is not None:
jobs = " ".join(args.sub_jobs)
formatted_jobs = jobs.split("+")
else:
formatted_jobs = []
print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
wrap_resources_to_jobs(jobs=formatted_jobs, args=args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment