Unverified Commit 314e5158 authored by Mayank Mittal's avatar Mayank Mittal Committed by GitHub

Moves location of serve file check to the correct module (#3368)

# Description

Earlier, the function `check_usd_path_with_timeout` was in
`sim/utils.py` while all file related operations live in
`utils/asset.py`. This MR moves the function to the right location.

Fixes # (issue)

## Type of change

- Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

---------
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
parent e0a8df23
...@@ -24,11 +24,11 @@ from isaaclab.sim import converters, schemas ...@@ -24,11 +24,11 @@ from isaaclab.sim import converters, schemas
from isaaclab.sim.utils import ( from isaaclab.sim.utils import (
bind_physics_material, bind_physics_material,
bind_visual_material, bind_visual_material,
check_usd_path_with_timeout,
clone, clone,
is_current_stage_in_memory, is_current_stage_in_memory,
select_usd_variants, select_usd_variants,
) )
from isaaclab.utils.assets import check_usd_path_with_timeout
if TYPE_CHECKING: if TYPE_CHECKING:
from . import from_files_cfg from . import from_files_cfg
......
...@@ -7,13 +7,11 @@ ...@@ -7,13 +7,11 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import contextlib import contextlib
import functools import functools
import inspect import inspect
import re import re
import time from collections.abc import Callable, Generator
from collections.abc import Callable
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import carb import carb
...@@ -829,97 +827,6 @@ def find_global_fixed_joint_prim( ...@@ -829,97 +827,6 @@ def find_global_fixed_joint_prim(
return fixed_joint_prim return fixed_joint_prim
"""
Stage management.
"""
def attach_stage_to_usd_context(attaching_early: bool = False):
"""Attaches stage in memory to usd context.
This function should be called during or after scene is created and before stage is simulated or rendered.
Note:
If the stage is not in memory or rendering is not enabled, this function will return without attaching.
Args:
attaching_early: Whether to attach the stage to the usd context before stage is created. Defaults to False.
"""
from isaacsim.core.simulation_manager import SimulationManager
from isaaclab.sim.simulation_context import SimulationContext
# if Isaac Sim version is less than 5.0, stage in memory is not supported
isaac_sim_version = float(".".join(get_version()[2]))
if isaac_sim_version < 5:
return
# if stage is not in memory, we can return early
if not is_current_stage_in_memory():
return
# attach stage to physx
stage_id = get_current_stage_id()
physx_sim_interface = omni.physx.get_physx_simulation_interface()
physx_sim_interface.attach_stage(stage_id)
# this carb flag is equivalent to if rendering is enabled
carb_setting = carb.settings.get_settings()
is_rendering_enabled = get_carb_setting(carb_setting, "/physics/fabricUpdateTransformations")
# if rendering is not enabled, we don't need to attach it
if not is_rendering_enabled:
return
# early attach warning msg
if attaching_early:
omni.log.warn(
"Attaching stage in memory to USD context early to support an operation which doesn't support stage in"
" memory."
)
# skip this callback to avoid wiping the stage after attachment
SimulationContext.instance().skip_next_stage_open_callback()
# disable stage open callback to avoid clearing callbacks
SimulationManager.enable_stage_open_callback(False)
# enable physics fabric
SimulationContext.instance()._physics_context.enable_fabric(True)
# attach stage to usd context
omni.usd.get_context().attach_stage_with_callback(stage_id)
# attach stage to physx
physx_sim_interface = omni.physx.get_physx_simulation_interface()
physx_sim_interface.attach_stage(stage_id)
# re-enable stage open callback
SimulationManager.enable_stage_open_callback(True)
def is_current_stage_in_memory() -> bool:
"""This function checks if the current stage is in memory.
Compares the stage id of the current stage with the stage id of the context stage.
Returns:
If the current stage is in memory.
"""
# grab current stage id
stage_id = get_current_stage_id()
# grab context stage id
context_stage = omni.usd.get_context().get_stage()
with use_stage(context_stage):
context_stage_id = get_current_stage_id()
# check if stage ids are the same
return stage_id != context_stage_id
""" """
USD Variants. USD Variants.
""" """
...@@ -1001,84 +908,107 @@ def select_usd_variants(prim_path: str, variants: object | dict[str, str], stage ...@@ -1001,84 +908,107 @@ def select_usd_variants(prim_path: str, variants: object | dict[str, str], stage
""" """
Nucleus Connection Stage management.
""" """
async def _is_usd_path_available(usd_path: str, timeout: float) -> bool: def attach_stage_to_usd_context(attaching_early: bool = False):
""" """Attaches the current USD stage in memory to the USD context.
Asynchronously checks whether the given USD path is available on the server.
Args: This function should be called during or after scene is created and before stage is simulated or rendered.
usd_path: The remote or local USD file path to check.
timeout: Timeout in seconds for the async stat call.
Returns: Note:
True if the server responds with OK, False otherwise. If the stage is not in memory or rendering is not enabled, this function will return without attaching.
Args:
attaching_early: Whether to attach the stage to the usd context before stage is created. Defaults to False.
""" """
try:
result, _ = await asyncio.wait_for(omni.client.stat_async(usd_path), timeout=timeout)
return result == omni.client.Result.OK
except asyncio.TimeoutError:
omni.log.warn(f"Timed out after {timeout}s while checking for USD: {usd_path}")
return False
except Exception as ex:
omni.log.warn(f"Exception during USD file check: {type(ex).__name__}: {ex}")
return False
from isaacsim.core.simulation_manager import SimulationManager
def check_usd_path_with_timeout(usd_path: str, timeout: float = 300, log_interval: float = 30) -> bool: from isaaclab.sim.simulation_context import SimulationContext
"""
Synchronously runs an asynchronous USD path availability check,
logging progress periodically until it completes.
This is useful for checking server responsiveness before attempting to load a remote asset. # if Isaac Sim version is less than 5.0, stage in memory is not supported
It will block execution until the check completes or times out. isaac_sim_version = float(".".join(get_version()[2]))
if isaac_sim_version < 5:
return
Args: # if stage is not in memory, we can return early
usd_path: The remote USD file path to check. if not is_current_stage_in_memory():
timeout: Maximum time (in seconds) to wait for the server check. return
log_interval: Interval (in seconds) at which progress is logged.
Returns: # attach stage to physx
True if the file is available (HTTP 200 / OK), False otherwise. stage_id = get_current_stage_id()
""" physx_sim_interface = omni.physx.get_physx_simulation_interface()
start_time = time.time() physx_sim_interface.attach_stage(stage_id)
loop = asyncio.get_event_loop()
coroutine = _is_usd_path_available(usd_path, timeout) # this carb flag is equivalent to if rendering is enabled
task = asyncio.ensure_future(coroutine) carb_setting = carb.settings.get_settings()
is_rendering_enabled = get_carb_setting(carb_setting, "/physics/fabricUpdateTransformations")
next_log_time = start_time + log_interval # if rendering is not enabled, we don't need to attach it
if not is_rendering_enabled:
return
first_log = True # early attach warning msg
while not task.done(): if attaching_early:
now = time.time() omni.log.warn(
if now >= next_log_time: "Attaching stage in memory to USD context early to support an operation which doesn't support stage in"
elapsed = int(now - start_time) " memory."
if first_log: )
omni.log.warn(f"Checking server availability for USD path: {usd_path} (timeout: {timeout}s)")
first_log = False
omni.log.warn(f"Waiting for server response... ({elapsed}s elapsed)")
next_log_time += log_interval
loop.run_until_complete(asyncio.sleep(0.1)) # Yield to allow async work
return task.result() # skip this callback to avoid wiping the stage after attachment
SimulationContext.instance().skip_next_stage_open_callback()
# disable stage open callback to avoid clearing callbacks
SimulationManager.enable_stage_open_callback(False)
""" # enable physics fabric
Isaac Sim stage utils wrappers to enable backwards compatibility to Isaac Sim 4.5 SimulationContext.instance()._physics_context.enable_fabric(True)
"""
# attach stage to usd context
omni.usd.get_context().attach_stage_with_callback(stage_id)
# attach stage to physx
physx_sim_interface = omni.physx.get_physx_simulation_interface()
physx_sim_interface.attach_stage(stage_id)
# re-enable stage open callback
SimulationManager.enable_stage_open_callback(True)
def is_current_stage_in_memory() -> bool:
"""Checks if the current stage is in memory.
This function compares the stage id of the current USD stage with the stage id of the USD context stage.
Returns:
Whether the current stage is in memory.
"""
# grab current stage id
stage_id = get_current_stage_id()
# grab context stage id
context_stage = omni.usd.get_context().get_stage()
with use_stage(context_stage):
context_stage_id = get_current_stage_id()
# check if stage ids are the same
return stage_id != context_stage_id
@contextlib.contextmanager @contextlib.contextmanager
def use_stage(stage: Usd.Stage) -> None: def use_stage(stage: Usd.Stage) -> Generator[None, None, None]:
"""Context manager that sets a thread-local stage, if supported. """Context manager that sets a thread-local stage, if supported.
In Isaac Sim < 5.0, this is a no-op to maintain compatibility. In Isaac Sim < 5.0, this is a no-op to maintain compatibility.
Args: Args:
stage (Usd.Stage): The stage to set temporarily. stage: The stage to set temporarily.
Yields:
None
""" """
isaac_sim_version = float(".".join(get_version()[2])) isaac_sim_version = float(".".join(get_version()[2]))
if isaac_sim_version < 5: if isaac_sim_version < 5:
...@@ -1090,10 +1020,10 @@ def use_stage(stage: Usd.Stage) -> None: ...@@ -1090,10 +1020,10 @@ def use_stage(stage: Usd.Stage) -> None:
def create_new_stage_in_memory() -> Usd.Stage: def create_new_stage_in_memory() -> Usd.Stage:
"""Create a new stage in memory, if supported. """Creates a new stage in memory, if supported.
Returns: Returns:
The new stage. The new stage in memory.
""" """
isaac_sim_version = float(".".join(get_version()[2])) isaac_sim_version = float(".".join(get_version()[2]))
if isaac_sim_version < 5: if isaac_sim_version < 5:
...@@ -1107,12 +1037,13 @@ def create_new_stage_in_memory() -> Usd.Stage: ...@@ -1107,12 +1037,13 @@ def create_new_stage_in_memory() -> Usd.Stage:
def get_current_stage_id() -> int: def get_current_stage_id() -> int:
"""Get the current open stage id. """Gets the current open stage id.
Reimplementation of stage_utils.get_current_stage_id() for Isaac Sim < 5.0. This function is a reimplementation of :meth:`isaacsim.core.utils.stage.get_current_stage_id` for
backwards compatibility to Isaac Sim < 5.0.
Returns: Returns:
int: The stage id. The current open stage id.
""" """
stage = get_current_stage() stage = get_current_stage()
stage_cache = UsdUtils.StageCache.Get() stage_cache = UsdUtils.StageCache.Get()
......
...@@ -13,9 +13,11 @@ For more information, please check information on `Omniverse Nucleus`_. ...@@ -13,9 +13,11 @@ For more information, please check information on `Omniverse Nucleus`_.
.. _Omniverse Nucleus: https://docs.omniverse.nvidia.com/nucleus/latest/overview/overview.html .. _Omniverse Nucleus: https://docs.omniverse.nvidia.com/nucleus/latest/overview/overview.html
""" """
import asyncio
import io import io
import os import os
import tempfile import tempfile
import time
from typing import Literal from typing import Literal
import carb import carb
...@@ -127,3 +129,78 @@ def read_file(path: str) -> io.BytesIO: ...@@ -127,3 +129,78 @@ def read_file(path: str) -> io.BytesIO:
return io.BytesIO(memoryview(file_content).tobytes()) return io.BytesIO(memoryview(file_content).tobytes())
else: else:
raise FileNotFoundError(f"Unable to find the file: {path}") raise FileNotFoundError(f"Unable to find the file: {path}")
"""
Nucleus Connection.
"""
def check_usd_path_with_timeout(usd_path: str, timeout: float = 300, log_interval: float = 30) -> bool:
"""Checks whether the given USD file path is available on the NVIDIA Nucleus server.
This function synchronously runs an asynchronous USD path availability check,
logging progress periodically until it completes. The file is available on the server
if the HTTP status code is 200. Otherwise, the file is not available on the server.
This is useful for checking server responsiveness before attempting to load a remote
asset. It will block execution until the check completes or times out.
Args:
usd_path: The remote USD file path to check.
timeout: Maximum time (in seconds) to wait for the server check.
log_interval: Interval (in seconds) at which progress is logged.
Returns:
Whether the given USD path is available on the server.
"""
start_time = time.time()
loop = asyncio.get_event_loop()
coroutine = _is_usd_path_available(usd_path, timeout)
task = asyncio.ensure_future(coroutine)
next_log_time = start_time + log_interval
first_log = True
while not task.done():
now = time.time()
if now >= next_log_time:
elapsed = int(now - start_time)
if first_log:
omni.log.warn(f"Checking server availability for USD path: {usd_path} (timeout: {timeout}s)")
first_log = False
omni.log.warn(f"Waiting for server response... ({elapsed}s elapsed)")
next_log_time += log_interval
loop.run_until_complete(asyncio.sleep(0.1)) # Yield to allow async work
return task.result()
"""
Helper functions.
"""
async def _is_usd_path_available(usd_path: str, timeout: float) -> bool:
"""Checks whether the given USD path is available on the Omniverse Nucleus server.
This function is a asynchronous routine to check the availability of the given USD path on the Omniverse Nucleus server.
It will return True if the USD path is available on the server, False otherwise.
Args:
usd_path: The remote or local USD file path to check.
timeout: Timeout in seconds for the async stat call.
Returns:
Whether the given USD path is available on the server.
"""
try:
result, _ = await asyncio.wait_for(omni.client.stat_async(usd_path), timeout=timeout)
return result == omni.client.Result.OK
except asyncio.TimeoutError:
omni.log.warn(f"Timed out after {timeout}s while checking for USD: {usd_path}")
return False
except Exception as ex:
omni.log.warn(f"Exception during USD file check: {type(ex).__name__}: {ex}")
return False
...@@ -37,3 +37,16 @@ def test_check_file_path_invalid(): ...@@ -37,3 +37,16 @@ def test_check_file_path_invalid():
usd_path = f"{assets_utils.ISAACLAB_NUCLEUS_DIR}/Robots/FrankaEmika/panda_xyz.usd" usd_path = f"{assets_utils.ISAACLAB_NUCLEUS_DIR}/Robots/FrankaEmika/panda_xyz.usd"
# check file path # check file path
assert assets_utils.check_file_path(usd_path) == 0 assert assets_utils.check_file_path(usd_path) == 0
def test_check_usd_path_with_timeout():
"""Test checking a USD path with timeout."""
# robot file path
usd_path = f"{assets_utils.ISAACLAB_NUCLEUS_DIR}/Robots/FrankaEmika/panda_instanceable.usd"
# check file path
assert assets_utils.check_usd_path_with_timeout(usd_path) is True
# invalid file path
usd_path = f"{assets_utils.ISAACLAB_NUCLEUS_DIR}/Robots/FrankaEmika/panda_xyz.usd"
# check file path
assert assets_utils.check_usd_path_with_timeout(usd_path) is False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment