Spaces:
Running
on
Zero
Running
on
Zero
| from __future__ import annotations | |
| import logging | |
| import os | |
| import xml.etree.ElementTree as ET | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass | |
| from glob import glob | |
| from shutil import copy | |
| import trimesh | |
| from scipy.spatial.transform import Rotation | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| __all__ = [ | |
| "AssetConverterFactory", | |
| "AssetType", | |
| "MeshtoMJCFConverter", | |
| "MeshtoUSDConverter", | |
| "URDFtoUSDConverter", | |
| ] | |
| class AssetType(str): | |
| """Asset type enumeration.""" | |
| MJCF = "mjcf" | |
| USD = "usd" | |
| URDF = "urdf" | |
| MESH = "mesh" | |
| class AssetConverterBase(ABC): | |
| """Converter abstract base class.""" | |
| def convert(self, urdf_path: str, output_path: str, **kwargs) -> str: | |
| pass | |
| def transform_mesh( | |
| self, input_mesh: str, output_mesh: str, mesh_origin: ET.Element | |
| ) -> None: | |
| """Apply transform to the mesh based on the origin element in URDF.""" | |
| mesh = trimesh.load(input_mesh) | |
| rpy = list(map(float, mesh_origin.get("rpy").split(" "))) | |
| rotation = Rotation.from_euler("xyz", rpy, degrees=False) | |
| offset = list(map(float, mesh_origin.get("xyz").split(" "))) | |
| mesh.vertices = (mesh.vertices @ rotation.as_matrix().T) + offset | |
| os.makedirs(os.path.dirname(output_mesh), exist_ok=True) | |
| _ = mesh.export(output_mesh) | |
| return | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| return False | |
| class MeshtoMJCFConverter(AssetConverterBase): | |
| """Convert URDF files into MJCF format.""" | |
| def __init__( | |
| self, | |
| **kwargs, | |
| ) -> None: | |
| self.kwargs = kwargs | |
| def _copy_asset_file(self, src: str, dst: str) -> None: | |
| if os.path.exists(dst): | |
| return | |
| os.makedirs(os.path.dirname(dst), exist_ok=True) | |
| copy(src, dst) | |
| def add_geometry( | |
| self, | |
| mujoco_element: ET.Element, | |
| link: ET.Element, | |
| body: ET.Element, | |
| tag: str, | |
| input_dir: str, | |
| output_dir: str, | |
| mesh_name: str, | |
| material: ET.Element | None = None, | |
| is_collision: bool = False, | |
| ) -> None: | |
| """Add geometry to the MJCF body from the URDF link.""" | |
| element = link.find(tag) | |
| geometry = element.find("geometry") | |
| mesh = geometry.find("mesh") | |
| filename = mesh.get("filename") | |
| scale = mesh.get("scale", "1.0 1.0 1.0") | |
| mesh_asset = ET.SubElement( | |
| mujoco_element, "mesh", name=mesh_name, file=filename, scale=scale | |
| ) | |
| geom = ET.SubElement(body, "geom", type="mesh", mesh=mesh_name) | |
| self._copy_asset_file( | |
| f"{input_dir}/{filename}", | |
| f"{output_dir}/{filename}", | |
| ) | |
| # Preprocess the mesh by applying rotation. | |
| input_mesh = f"{input_dir}/{filename}" | |
| output_mesh = f"{output_dir}/{filename}" | |
| mesh_origin = element.find("origin") | |
| if mesh_origin is not None: | |
| self.transform_mesh(input_mesh, output_mesh, mesh_origin) | |
| if material is not None: | |
| geom.set("material", material.get("name")) | |
| if is_collision: | |
| geom.set("contype", "1") | |
| geom.set("conaffinity", "1") | |
| geom.set("rgba", "1 1 1 0") | |
| def add_materials( | |
| self, | |
| mujoco_element: ET.Element, | |
| link: ET.Element, | |
| tag: str, | |
| input_dir: str, | |
| output_dir: str, | |
| name: str, | |
| reflectance: float = 0.2, | |
| ) -> ET.Element: | |
| """Add materials to the MJCF asset from the URDF link.""" | |
| element = link.find(tag) | |
| geometry = element.find("geometry") | |
| mesh = geometry.find("mesh") | |
| filename = mesh.get("filename") | |
| dirname = os.path.dirname(filename) | |
| material = ET.SubElement( | |
| mujoco_element, | |
| "material", | |
| name=f"material_{name}", | |
| texture=f"texture_{name}", | |
| reflectance=str(reflectance), | |
| ) | |
| for path in glob(f"{input_dir}/{dirname}/*.png"): | |
| file_name = os.path.basename(path) | |
| self._copy_asset_file( | |
| path, | |
| f"{output_dir}/{dirname}/{file_name}", | |
| ) | |
| ET.SubElement( | |
| mujoco_element, | |
| "texture", | |
| name=f"texture_{name}_{os.path.splitext(file_name)[0]}", | |
| type="2d", | |
| file=f"{dirname}/{file_name}", | |
| ) | |
| return material | |
| def convert(self, urdf_path: str, mjcf_path: str): | |
| """Convert a URDF file to MJCF format.""" | |
| tree = ET.parse(urdf_path) | |
| root = tree.getroot() | |
| mujoco_struct = ET.Element("mujoco") | |
| mujoco_struct.set("model", root.get("name")) | |
| mujoco_asset = ET.SubElement(mujoco_struct, "asset") | |
| mujoco_worldbody = ET.SubElement(mujoco_struct, "worldbody") | |
| input_dir = os.path.dirname(urdf_path) | |
| output_dir = os.path.dirname(mjcf_path) | |
| os.makedirs(output_dir, exist_ok=True) | |
| for idx, link in enumerate(root.findall("link")): | |
| link_name = link.get("name", "unnamed_link") | |
| body = ET.SubElement(mujoco_worldbody, "body", name=link_name) | |
| material = self.add_materials( | |
| mujoco_asset, | |
| link, | |
| "visual", | |
| input_dir, | |
| output_dir, | |
| name=str(idx), | |
| ) | |
| self.add_geometry( | |
| mujoco_asset, | |
| link, | |
| body, | |
| "visual", | |
| input_dir, | |
| output_dir, | |
| f"visual_mesh_{idx}", | |
| material, | |
| ) | |
| self.add_geometry( | |
| mujoco_asset, | |
| link, | |
| body, | |
| "collision", | |
| input_dir, | |
| output_dir, | |
| f"collision_mesh_{idx}", | |
| is_collision=True, | |
| ) | |
| tree = ET.ElementTree(mujoco_struct) | |
| ET.indent(tree, space=" ", level=0) | |
| tree.write(mjcf_path, encoding="utf-8", xml_declaration=True) | |
| logger.info(f"Successfully converted {urdf_path} → {mjcf_path}") | |
| class URDFtoMJCFConverter(MeshtoMJCFConverter): | |
| """Convert URDF files with joints to MJCF format, handling transformations from joints.""" | |
| def add_materials( | |
| self, | |
| mujoco_element: ET.Element, | |
| link: ET.Element, | |
| tag: str, | |
| input_dir: str, | |
| output_dir: str, | |
| name: str, | |
| reflectance: float = 0.2, | |
| ) -> ET.Element: | |
| """Add materials to the MJCF asset from the URDF link.""" | |
| element = link.find(tag) | |
| geometry = element.find("geometry") | |
| mesh = geometry.find("mesh") | |
| filename = mesh.get("filename") | |
| dirname = os.path.dirname(filename) | |
| diffuse_texture = None | |
| for path in glob(f"{input_dir}/{dirname}/*.png"): | |
| file_name = os.path.basename(path) | |
| self._copy_asset_file( | |
| path, | |
| f"{output_dir}/{dirname}/{file_name}", | |
| ) | |
| texture_name = f"texture_{name}_{os.path.splitext(file_name)[0]}" | |
| ET.SubElement( | |
| mujoco_element, | |
| "texture", | |
| name=texture_name, | |
| type="2d", | |
| file=f"{dirname}/{file_name}", | |
| ) | |
| if "diffuse" in file_name.lower(): | |
| diffuse_texture = texture_name | |
| if diffuse_texture is None: | |
| return None | |
| material = ET.SubElement( | |
| mujoco_element, | |
| "material", | |
| name=f"material_{name}", | |
| texture=diffuse_texture, | |
| reflectance=str(reflectance), | |
| ) | |
| return material | |
| def convert(self, urdf_path: str, mjcf_path: str, **kwargs) -> str: | |
| """Convert a URDF file with joints to MJCF format.""" | |
| tree = ET.parse(urdf_path) | |
| root = tree.getroot() | |
| mujoco_struct = ET.Element("mujoco") | |
| mujoco_struct.set("model", root.get("name")) | |
| mujoco_asset = ET.SubElement(mujoco_struct, "asset") | |
| mujoco_worldbody = ET.SubElement(mujoco_struct, "worldbody") | |
| input_dir = os.path.dirname(urdf_path) | |
| output_dir = os.path.dirname(mjcf_path) | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Create a dictionary to store body elements for each link | |
| body_dict = {} | |
| # Process all links first | |
| for idx, link in enumerate(root.findall("link")): | |
| link_name = link.get("name", f"unnamed_link_{idx}") | |
| body = ET.SubElement(mujoco_worldbody, "body", name=link_name) | |
| body_dict[link_name] = body | |
| # Add materials and geometry | |
| visual_element = link.find("visual") | |
| if visual_element is not None: | |
| material = self.add_materials( | |
| mujoco_asset, | |
| link, | |
| "visual", | |
| input_dir, | |
| output_dir, | |
| name=str(idx), | |
| ) | |
| self.add_geometry( | |
| mujoco_asset, | |
| link, | |
| body, | |
| "visual", | |
| input_dir, | |
| output_dir, | |
| f"visual_mesh_{idx}", | |
| material, | |
| ) | |
| collision_element = link.find("collision") | |
| if collision_element is not None: | |
| self.add_geometry( | |
| mujoco_asset, | |
| link, | |
| body, | |
| "collision", | |
| input_dir, | |
| output_dir, | |
| f"collision_mesh_{idx}", | |
| is_collision=True, | |
| ) | |
| # Process joints to set transformations and hierarchy | |
| for joint in root.findall("joint"): | |
| joint_type = joint.get("type") | |
| if joint_type != "fixed": | |
| logger.warning( | |
| f"Skipping non-fixed joint: {joint.get('name')}" | |
| ) | |
| continue | |
| parent_link = joint.find("parent").get("link") | |
| child_link = joint.find("child").get("link") | |
| origin = joint.find("origin") | |
| if parent_link not in body_dict or child_link not in body_dict: | |
| logger.warning( | |
| f"Parent or child link not found for joint: {joint.get('name')}" | |
| ) | |
| continue | |
| # Move child body under parent body in MJCF hierarchy | |
| child_body = body_dict[child_link] | |
| mujoco_worldbody.remove(child_body) | |
| parent_body = body_dict[parent_link] | |
| parent_body.append(child_body) | |
| # Apply joint origin transformation to child body | |
| if origin is not None: | |
| xyz = origin.get("xyz", "0 0 0") | |
| rpy = origin.get("rpy", "0 0 0") | |
| child_body.set("pos", xyz) | |
| # Convert rpy to MJCF euler format (degrees) | |
| rpy_floats = list(map(float, rpy.split())) | |
| rotation = Rotation.from_euler( | |
| "xyz", rpy_floats, degrees=False | |
| ) | |
| euler_deg = rotation.as_euler("xyz", degrees=True) | |
| child_body.set( | |
| "euler", f"{euler_deg[0]} {euler_deg[1]} {euler_deg[2]}" | |
| ) | |
| tree = ET.ElementTree(mujoco_struct) | |
| ET.indent(tree, space=" ", level=0) | |
| tree.write(mjcf_path, encoding="utf-8", xml_declaration=True) | |
| logger.info(f"Successfully converted {urdf_path} → {mjcf_path}") | |
| return mjcf_path | |
| class MeshtoUSDConverter(AssetConverterBase): | |
| """Convert Mesh file from URDF into USD format.""" | |
| DEFAULT_BIND_APIS = [ | |
| "MaterialBindingAPI", | |
| "PhysicsMeshCollisionAPI", | |
| "PhysicsCollisionAPI", | |
| "PhysxCollisionAPI", | |
| "PhysicsMassAPI", | |
| "PhysicsRigidBodyAPI", | |
| "PhysxRigidBodyAPI", | |
| ] | |
| def __init__( | |
| self, | |
| force_usd_conversion: bool = True, | |
| make_instanceable: bool = False, | |
| simulation_app=None, | |
| **kwargs, | |
| ): | |
| self.usd_parms = dict( | |
| force_usd_conversion=force_usd_conversion, | |
| make_instanceable=make_instanceable, | |
| **kwargs, | |
| ) | |
| if simulation_app is not None: | |
| self.simulation_app = simulation_app | |
| def __enter__(self): | |
| from isaaclab.app import AppLauncher | |
| if not hasattr(self, "simulation_app"): | |
| launch_args = dict( | |
| headless=True, | |
| no_splash=True, | |
| fast_shutdown=True, | |
| disable_gpu=True, | |
| ) | |
| self.app_launcher = AppLauncher(launch_args) | |
| self.simulation_app = self.app_launcher.app | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| # Close the simulation app if it was created here | |
| if hasattr(self, "app_launcher"): | |
| self.simulation_app.close() | |
| if exc_val is not None: | |
| logger.error(f"Exception occurred: {exc_val}.") | |
| return False | |
| def convert(self, urdf_path: str, output_file: str): | |
| """Convert a URDF file to USD and post-process collision meshes.""" | |
| from isaaclab.sim.converters import MeshConverter, MeshConverterCfg | |
| from pxr import PhysxSchema, Sdf, Usd, UsdShade | |
| tree = ET.parse(urdf_path) | |
| root = tree.getroot() | |
| mesh_file = root.find("link/visual/geometry/mesh").get("filename") | |
| input_mesh = os.path.join(os.path.dirname(urdf_path), mesh_file) | |
| output_dir = os.path.abspath(os.path.dirname(output_file)) | |
| output_mesh = f"{output_dir}/mesh/{os.path.basename(mesh_file)}" | |
| mesh_origin = root.find("link/visual/origin") | |
| if mesh_origin is not None: | |
| self.transform_mesh(input_mesh, output_mesh, mesh_origin) | |
| cfg = MeshConverterCfg( | |
| asset_path=output_mesh, | |
| usd_dir=output_dir, | |
| usd_file_name=os.path.basename(output_file), | |
| **self.usd_parms, | |
| ) | |
| urdf_converter = MeshConverter(cfg) | |
| usd_path = urdf_converter.usd_path | |
| stage = Usd.Stage.Open(usd_path) | |
| layer = stage.GetRootLayer() | |
| with Usd.EditContext(stage, layer): | |
| for prim in stage.Traverse(): | |
| # Change texture path to relative path. | |
| if prim.GetName() == "material_0": | |
| shader = UsdShade.Shader(prim).GetInput("diffuse_texture") | |
| if shader.Get() is not None: | |
| relative_path = shader.Get().path.replace( | |
| f"{output_dir}/", "" | |
| ) | |
| shader.Set(Sdf.AssetPath(relative_path)) | |
| # Add convex decomposition collision and set ShrinkWrap. | |
| elif prim.GetName() == "mesh": | |
| approx_attr = prim.GetAttribute("physics:approximation") | |
| if not approx_attr: | |
| approx_attr = prim.CreateAttribute( | |
| "physics:approximation", Sdf.ValueTypeNames.Token | |
| ) | |
| approx_attr.Set("convexDecomposition") | |
| physx_conv_api = ( | |
| PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply( | |
| prim | |
| ) | |
| ) | |
| physx_conv_api.GetShrinkWrapAttr().Set(True) | |
| api_schemas = prim.GetMetadata("apiSchemas") | |
| if api_schemas is None: | |
| api_schemas = Sdf.TokenListOp() | |
| api_list = list(api_schemas.GetAddedOrExplicitItems()) | |
| for api in self.DEFAULT_BIND_APIS: | |
| if api not in api_list: | |
| api_list.append(api) | |
| api_schemas.appendedItems = api_list | |
| prim.SetMetadata("apiSchemas", api_schemas) | |
| layer.Save() | |
| logger.info(f"Successfully converted {urdf_path} → {usd_path}") | |
| class URDFtoUSDConverter(MeshtoUSDConverter): | |
| """Convert URDF files into USD format. | |
| Args: | |
| fix_base (bool): Whether to fix the base link. | |
| merge_fixed_joints (bool): Whether to merge fixed joints. | |
| make_instanceable (bool): Whether to make prims instanceable. | |
| force_usd_conversion (bool): Force conversion to USD. | |
| collision_from_visuals (bool): Generate collisions from visuals if not provided. | |
| """ | |
| def __init__( | |
| self, | |
| fix_base: bool = False, | |
| merge_fixed_joints: bool = False, | |
| make_instanceable: bool = True, | |
| force_usd_conversion: bool = True, | |
| collision_from_visuals: bool = True, | |
| joint_drive=None, | |
| rotate_wxyz: tuple[float] | None = None, | |
| simulation_app=None, | |
| **kwargs, | |
| ): | |
| self.usd_parms = dict( | |
| fix_base=fix_base, | |
| merge_fixed_joints=merge_fixed_joints, | |
| make_instanceable=make_instanceable, | |
| force_usd_conversion=force_usd_conversion, | |
| collision_from_visuals=collision_from_visuals, | |
| joint_drive=joint_drive, | |
| **kwargs, | |
| ) | |
| self.rotate_wxyz = rotate_wxyz | |
| if simulation_app is not None: | |
| self.simulation_app = simulation_app | |
| def convert(self, urdf_path: str, output_file: str): | |
| """Convert a URDF file to USD and post-process collision meshes.""" | |
| from isaaclab.sim.converters import UrdfConverter, UrdfConverterCfg | |
| from pxr import Gf, PhysxSchema, Sdf, Usd, UsdGeom | |
| cfg = UrdfConverterCfg( | |
| asset_path=urdf_path, | |
| usd_dir=os.path.abspath(os.path.dirname(output_file)), | |
| usd_file_name=os.path.basename(output_file), | |
| **self.usd_parms, | |
| ) | |
| urdf_converter = UrdfConverter(cfg) | |
| usd_path = urdf_converter.usd_path | |
| stage = Usd.Stage.Open(usd_path) | |
| layer = stage.GetRootLayer() | |
| with Usd.EditContext(stage, layer): | |
| for prim in stage.Traverse(): | |
| if prim.GetName() == "collisions": | |
| approx_attr = prim.GetAttribute("physics:approximation") | |
| if not approx_attr: | |
| approx_attr = prim.CreateAttribute( | |
| "physics:approximation", Sdf.ValueTypeNames.Token | |
| ) | |
| approx_attr.Set("convexDecomposition") | |
| physx_conv_api = ( | |
| PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply( | |
| prim | |
| ) | |
| ) | |
| physx_conv_api.GetShrinkWrapAttr().Set(True) | |
| api_schemas = prim.GetMetadata("apiSchemas") | |
| if api_schemas is None: | |
| api_schemas = Sdf.TokenListOp() | |
| api_list = list(api_schemas.GetAddedOrExplicitItems()) | |
| for api in self.DEFAULT_BIND_APIS: | |
| if api not in api_list: | |
| api_list.append(api) | |
| api_schemas.appendedItems = api_list | |
| prim.SetMetadata("apiSchemas", api_schemas) | |
| if self.rotate_wxyz is not None: | |
| inner_prim = next( | |
| p | |
| for p in stage.GetDefaultPrim().GetChildren() | |
| if p.IsA(UsdGeom.Xform) | |
| ) | |
| xformable = UsdGeom.Xformable(inner_prim) | |
| xformable.ClearXformOpOrder() | |
| orient_op = xformable.AddOrientOp(UsdGeom.XformOp.PrecisionDouble) | |
| orient_op.Set(Gf.Quatd(*self.rotate_wxyz)) | |
| layer.Save() | |
| logger.info(f"Successfully converted {urdf_path} → {usd_path}") | |
| class AssetConverterFactory: | |
| """Factory class for creating asset converters based on target and source types.""" | |
| def create( | |
| target_type: AssetType, source_type: AssetType = "urdf", **kwargs | |
| ) -> AssetConverterBase: | |
| """Create an asset converter instance based on target and source types.""" | |
| if target_type == AssetType.MJCF and source_type == AssetType.URDF: | |
| converter = MeshtoMJCFConverter(**kwargs) | |
| elif target_type == AssetType.USD and source_type == AssetType.URDF: | |
| converter = URDFtoUSDConverter(**kwargs) | |
| elif target_type == AssetType.USD and source_type == AssetType.MESH: | |
| converter = MeshtoUSDConverter(**kwargs) | |
| else: | |
| raise ValueError( | |
| f"Unsupported converter type: {source_type} -> {target_type}." | |
| ) | |
| return converter | |
| if __name__ == "__main__": | |
| # # target_asset_type = AssetType.MJCF | |
| # target_asset_type = AssetType.USD | |
| # urdf_paths = [ | |
| # "outputs/embodiedgen_assets/demo_assets/remote_control/result/remote_control.urdf", | |
| # ] | |
| # if target_asset_type == AssetType.MJCF: | |
| # output_files = [ | |
| # "outputs/embodiedgen_assets/demo_assets/remote_control/mjcf/remote_control.mjcf", | |
| # ] | |
| # asset_converter = AssetConverterFactory.create( | |
| # target_type=AssetType.MJCF, | |
| # source_type=AssetType.URDF, | |
| # ) | |
| # elif target_asset_type == AssetType.USD: | |
| # output_files = [ | |
| # "outputs/embodiedgen_assets/demo_assets/remote_control/usd/remote_control.usd", | |
| # ] | |
| # asset_converter = AssetConverterFactory.create( | |
| # target_type=AssetType.USD, | |
| # source_type=AssetType.MESH, | |
| # ) | |
| # with asset_converter: | |
| # for urdf_path, output_file in zip(urdf_paths, output_files): | |
| # asset_converter.convert(urdf_path, output_file) | |
| # urdf_path = "outputs/embodiedgen_assets/demo_assets/remote_control/result/remote_control.urdf" | |
| # output_file = "outputs/embodiedgen_assets/demo_assets/remote_control/usd/remote_control.usd" | |
| # asset_converter = AssetConverterFactory.create( | |
| # target_type=AssetType.USD, | |
| # source_type=AssetType.URDF, | |
| # rotate_wxyz=(0.7071, 0.7071, 0, 0), # rotate 90 deg around the X-axis | |
| # ) | |
| # with asset_converter: | |
| # asset_converter.convert(urdf_path, output_file) | |
| urdf_path = "/home/users/xinjie.wang/xinjie/infinigen/outputs/exports/kitchen_simple_solve_nos_i_urdf/export_scene/scene.urdf" | |
| output_file = "/home/users/xinjie.wang/xinjie/infinigen/outputs/exports/kitchen_simple_solve_nos_i_urdf/mjcf/scene.urdf" | |
| asset_converter = URDFtoMJCFConverter() | |
| with asset_converter: | |
| asset_converter.convert(urdf_path, output_file) | |