from __future__ import annotations import logging import os import xml.etree.ElementTree as ET from abc import ABC, abstractmethod from dataclasses import dataclass from glob import glob from shutil import copy import trimesh from scipy.spatial.transform import Rotation logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) __all__ = [ "AssetConverterFactory", "AssetType", "MeshtoMJCFConverter", "MeshtoUSDConverter", "URDFtoUSDConverter", ] @dataclass class AssetType(str): """Asset type enumeration.""" MJCF = "mjcf" USD = "usd" URDF = "urdf" MESH = "mesh" class AssetConverterBase(ABC): """Converter abstract base class.""" @abstractmethod def convert(self, urdf_path: str, output_path: str, **kwargs) -> str: pass def transform_mesh( self, input_mesh: str, output_mesh: str, mesh_origin: ET.Element ) -> None: """Apply transform to the mesh based on the origin element in URDF.""" mesh = trimesh.load(input_mesh) rpy = list(map(float, mesh_origin.get("rpy").split(" "))) rotation = Rotation.from_euler("xyz", rpy, degrees=False) offset = list(map(float, mesh_origin.get("xyz").split(" "))) mesh.vertices = (mesh.vertices @ rotation.as_matrix().T) + offset os.makedirs(os.path.dirname(output_mesh), exist_ok=True) _ = mesh.export(output_mesh) return def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return False class MeshtoMJCFConverter(AssetConverterBase): """Convert URDF files into MJCF format.""" def __init__( self, **kwargs, ) -> None: self.kwargs = kwargs def _copy_asset_file(self, src: str, dst: str) -> None: if os.path.exists(dst): return os.makedirs(os.path.dirname(dst), exist_ok=True) copy(src, dst) def add_geometry( self, mujoco_element: ET.Element, link: ET.Element, body: ET.Element, tag: str, input_dir: str, output_dir: str, mesh_name: str, material: ET.Element | None = None, is_collision: bool = False, ) -> None: """Add geometry to the MJCF body from the URDF link.""" element = link.find(tag) geometry = element.find("geometry") mesh = geometry.find("mesh") filename = mesh.get("filename") scale = mesh.get("scale", "1.0 1.0 1.0") mesh_asset = ET.SubElement( mujoco_element, "mesh", name=mesh_name, file=filename, scale=scale ) geom = ET.SubElement(body, "geom", type="mesh", mesh=mesh_name) self._copy_asset_file( f"{input_dir}/{filename}", f"{output_dir}/{filename}", ) # Preprocess the mesh by applying rotation. input_mesh = f"{input_dir}/{filename}" output_mesh = f"{output_dir}/{filename}" mesh_origin = element.find("origin") if mesh_origin is not None: self.transform_mesh(input_mesh, output_mesh, mesh_origin) if material is not None: geom.set("material", material.get("name")) if is_collision: geom.set("contype", "1") geom.set("conaffinity", "1") geom.set("rgba", "1 1 1 0") def add_materials( self, mujoco_element: ET.Element, link: ET.Element, tag: str, input_dir: str, output_dir: str, name: str, reflectance: float = 0.2, ) -> ET.Element: """Add materials to the MJCF asset from the URDF link.""" element = link.find(tag) geometry = element.find("geometry") mesh = geometry.find("mesh") filename = mesh.get("filename") dirname = os.path.dirname(filename) material = ET.SubElement( mujoco_element, "material", name=f"material_{name}", texture=f"texture_{name}", reflectance=str(reflectance), ) for path in glob(f"{input_dir}/{dirname}/*.png"): file_name = os.path.basename(path) self._copy_asset_file( path, f"{output_dir}/{dirname}/{file_name}", ) ET.SubElement( mujoco_element, "texture", name=f"texture_{name}_{os.path.splitext(file_name)[0]}", type="2d", file=f"{dirname}/{file_name}", ) return material def convert(self, urdf_path: str, mjcf_path: str): """Convert a URDF file to MJCF format.""" tree = ET.parse(urdf_path) root = tree.getroot() mujoco_struct = ET.Element("mujoco") mujoco_struct.set("model", root.get("name")) mujoco_asset = ET.SubElement(mujoco_struct, "asset") mujoco_worldbody = ET.SubElement(mujoco_struct, "worldbody") input_dir = os.path.dirname(urdf_path) output_dir = os.path.dirname(mjcf_path) os.makedirs(output_dir, exist_ok=True) for idx, link in enumerate(root.findall("link")): link_name = link.get("name", "unnamed_link") body = ET.SubElement(mujoco_worldbody, "body", name=link_name) material = self.add_materials( mujoco_asset, link, "visual", input_dir, output_dir, name=str(idx), ) self.add_geometry( mujoco_asset, link, body, "visual", input_dir, output_dir, f"visual_mesh_{idx}", material, ) self.add_geometry( mujoco_asset, link, body, "collision", input_dir, output_dir, f"collision_mesh_{idx}", is_collision=True, ) tree = ET.ElementTree(mujoco_struct) ET.indent(tree, space=" ", level=0) tree.write(mjcf_path, encoding="utf-8", xml_declaration=True) logger.info(f"Successfully converted {urdf_path} → {mjcf_path}") class URDFtoMJCFConverter(MeshtoMJCFConverter): """Convert URDF files with joints to MJCF format, handling transformations from joints.""" def add_materials( self, mujoco_element: ET.Element, link: ET.Element, tag: str, input_dir: str, output_dir: str, name: str, reflectance: float = 0.2, ) -> ET.Element: """Add materials to the MJCF asset from the URDF link.""" element = link.find(tag) geometry = element.find("geometry") mesh = geometry.find("mesh") filename = mesh.get("filename") dirname = os.path.dirname(filename) diffuse_texture = None for path in glob(f"{input_dir}/{dirname}/*.png"): file_name = os.path.basename(path) self._copy_asset_file( path, f"{output_dir}/{dirname}/{file_name}", ) texture_name = f"texture_{name}_{os.path.splitext(file_name)[0]}" ET.SubElement( mujoco_element, "texture", name=texture_name, type="2d", file=f"{dirname}/{file_name}", ) if "diffuse" in file_name.lower(): diffuse_texture = texture_name if diffuse_texture is None: return None material = ET.SubElement( mujoco_element, "material", name=f"material_{name}", texture=diffuse_texture, reflectance=str(reflectance), ) return material def convert(self, urdf_path: str, mjcf_path: str, **kwargs) -> str: """Convert a URDF file with joints to MJCF format.""" tree = ET.parse(urdf_path) root = tree.getroot() mujoco_struct = ET.Element("mujoco") mujoco_struct.set("model", root.get("name")) mujoco_asset = ET.SubElement(mujoco_struct, "asset") mujoco_worldbody = ET.SubElement(mujoco_struct, "worldbody") input_dir = os.path.dirname(urdf_path) output_dir = os.path.dirname(mjcf_path) os.makedirs(output_dir, exist_ok=True) # Create a dictionary to store body elements for each link body_dict = {} # Process all links first for idx, link in enumerate(root.findall("link")): link_name = link.get("name", f"unnamed_link_{idx}") body = ET.SubElement(mujoco_worldbody, "body", name=link_name) body_dict[link_name] = body # Add materials and geometry visual_element = link.find("visual") if visual_element is not None: material = self.add_materials( mujoco_asset, link, "visual", input_dir, output_dir, name=str(idx), ) self.add_geometry( mujoco_asset, link, body, "visual", input_dir, output_dir, f"visual_mesh_{idx}", material, ) collision_element = link.find("collision") if collision_element is not None: self.add_geometry( mujoco_asset, link, body, "collision", input_dir, output_dir, f"collision_mesh_{idx}", is_collision=True, ) # Process joints to set transformations and hierarchy for joint in root.findall("joint"): joint_type = joint.get("type") if joint_type != "fixed": logger.warning( f"Skipping non-fixed joint: {joint.get('name')}" ) continue parent_link = joint.find("parent").get("link") child_link = joint.find("child").get("link") origin = joint.find("origin") if parent_link not in body_dict or child_link not in body_dict: logger.warning( f"Parent or child link not found for joint: {joint.get('name')}" ) continue # Move child body under parent body in MJCF hierarchy child_body = body_dict[child_link] mujoco_worldbody.remove(child_body) parent_body = body_dict[parent_link] parent_body.append(child_body) # Apply joint origin transformation to child body if origin is not None: xyz = origin.get("xyz", "0 0 0") rpy = origin.get("rpy", "0 0 0") child_body.set("pos", xyz) # Convert rpy to MJCF euler format (degrees) rpy_floats = list(map(float, rpy.split())) rotation = Rotation.from_euler( "xyz", rpy_floats, degrees=False ) euler_deg = rotation.as_euler("xyz", degrees=True) child_body.set( "euler", f"{euler_deg[0]} {euler_deg[1]} {euler_deg[2]}" ) tree = ET.ElementTree(mujoco_struct) ET.indent(tree, space=" ", level=0) tree.write(mjcf_path, encoding="utf-8", xml_declaration=True) logger.info(f"Successfully converted {urdf_path} → {mjcf_path}") return mjcf_path class MeshtoUSDConverter(AssetConverterBase): """Convert Mesh file from URDF into USD format.""" DEFAULT_BIND_APIS = [ "MaterialBindingAPI", "PhysicsMeshCollisionAPI", "PhysicsCollisionAPI", "PhysxCollisionAPI", "PhysicsMassAPI", "PhysicsRigidBodyAPI", "PhysxRigidBodyAPI", ] def __init__( self, force_usd_conversion: bool = True, make_instanceable: bool = False, simulation_app=None, **kwargs, ): self.usd_parms = dict( force_usd_conversion=force_usd_conversion, make_instanceable=make_instanceable, **kwargs, ) if simulation_app is not None: self.simulation_app = simulation_app def __enter__(self): from isaaclab.app import AppLauncher if not hasattr(self, "simulation_app"): launch_args = dict( headless=True, no_splash=True, fast_shutdown=True, disable_gpu=True, ) self.app_launcher = AppLauncher(launch_args) self.simulation_app = self.app_launcher.app return self def __exit__(self, exc_type, exc_val, exc_tb): # Close the simulation app if it was created here if hasattr(self, "app_launcher"): self.simulation_app.close() if exc_val is not None: logger.error(f"Exception occurred: {exc_val}.") return False def convert(self, urdf_path: str, output_file: str): """Convert a URDF file to USD and post-process collision meshes.""" from isaaclab.sim.converters import MeshConverter, MeshConverterCfg from pxr import PhysxSchema, Sdf, Usd, UsdShade tree = ET.parse(urdf_path) root = tree.getroot() mesh_file = root.find("link/visual/geometry/mesh").get("filename") input_mesh = os.path.join(os.path.dirname(urdf_path), mesh_file) output_dir = os.path.abspath(os.path.dirname(output_file)) output_mesh = f"{output_dir}/mesh/{os.path.basename(mesh_file)}" mesh_origin = root.find("link/visual/origin") if mesh_origin is not None: self.transform_mesh(input_mesh, output_mesh, mesh_origin) cfg = MeshConverterCfg( asset_path=output_mesh, usd_dir=output_dir, usd_file_name=os.path.basename(output_file), **self.usd_parms, ) urdf_converter = MeshConverter(cfg) usd_path = urdf_converter.usd_path stage = Usd.Stage.Open(usd_path) layer = stage.GetRootLayer() with Usd.EditContext(stage, layer): for prim in stage.Traverse(): # Change texture path to relative path. if prim.GetName() == "material_0": shader = UsdShade.Shader(prim).GetInput("diffuse_texture") if shader.Get() is not None: relative_path = shader.Get().path.replace( f"{output_dir}/", "" ) shader.Set(Sdf.AssetPath(relative_path)) # Add convex decomposition collision and set ShrinkWrap. elif prim.GetName() == "mesh": approx_attr = prim.GetAttribute("physics:approximation") if not approx_attr: approx_attr = prim.CreateAttribute( "physics:approximation", Sdf.ValueTypeNames.Token ) approx_attr.Set("convexDecomposition") physx_conv_api = ( PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply( prim ) ) physx_conv_api.GetShrinkWrapAttr().Set(True) api_schemas = prim.GetMetadata("apiSchemas") if api_schemas is None: api_schemas = Sdf.TokenListOp() api_list = list(api_schemas.GetAddedOrExplicitItems()) for api in self.DEFAULT_BIND_APIS: if api not in api_list: api_list.append(api) api_schemas.appendedItems = api_list prim.SetMetadata("apiSchemas", api_schemas) layer.Save() logger.info(f"Successfully converted {urdf_path} → {usd_path}") class URDFtoUSDConverter(MeshtoUSDConverter): """Convert URDF files into USD format. Args: fix_base (bool): Whether to fix the base link. merge_fixed_joints (bool): Whether to merge fixed joints. make_instanceable (bool): Whether to make prims instanceable. force_usd_conversion (bool): Force conversion to USD. collision_from_visuals (bool): Generate collisions from visuals if not provided. """ def __init__( self, fix_base: bool = False, merge_fixed_joints: bool = False, make_instanceable: bool = True, force_usd_conversion: bool = True, collision_from_visuals: bool = True, joint_drive=None, rotate_wxyz: tuple[float] | None = None, simulation_app=None, **kwargs, ): self.usd_parms = dict( fix_base=fix_base, merge_fixed_joints=merge_fixed_joints, make_instanceable=make_instanceable, force_usd_conversion=force_usd_conversion, collision_from_visuals=collision_from_visuals, joint_drive=joint_drive, **kwargs, ) self.rotate_wxyz = rotate_wxyz if simulation_app is not None: self.simulation_app = simulation_app def convert(self, urdf_path: str, output_file: str): """Convert a URDF file to USD and post-process collision meshes.""" from isaaclab.sim.converters import UrdfConverter, UrdfConverterCfg from pxr import Gf, PhysxSchema, Sdf, Usd, UsdGeom cfg = UrdfConverterCfg( asset_path=urdf_path, usd_dir=os.path.abspath(os.path.dirname(output_file)), usd_file_name=os.path.basename(output_file), **self.usd_parms, ) urdf_converter = UrdfConverter(cfg) usd_path = urdf_converter.usd_path stage = Usd.Stage.Open(usd_path) layer = stage.GetRootLayer() with Usd.EditContext(stage, layer): for prim in stage.Traverse(): if prim.GetName() == "collisions": approx_attr = prim.GetAttribute("physics:approximation") if not approx_attr: approx_attr = prim.CreateAttribute( "physics:approximation", Sdf.ValueTypeNames.Token ) approx_attr.Set("convexDecomposition") physx_conv_api = ( PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply( prim ) ) physx_conv_api.GetShrinkWrapAttr().Set(True) api_schemas = prim.GetMetadata("apiSchemas") if api_schemas is None: api_schemas = Sdf.TokenListOp() api_list = list(api_schemas.GetAddedOrExplicitItems()) for api in self.DEFAULT_BIND_APIS: if api not in api_list: api_list.append(api) api_schemas.appendedItems = api_list prim.SetMetadata("apiSchemas", api_schemas) if self.rotate_wxyz is not None: inner_prim = next( p for p in stage.GetDefaultPrim().GetChildren() if p.IsA(UsdGeom.Xform) ) xformable = UsdGeom.Xformable(inner_prim) xformable.ClearXformOpOrder() orient_op = xformable.AddOrientOp(UsdGeom.XformOp.PrecisionDouble) orient_op.Set(Gf.Quatd(*self.rotate_wxyz)) layer.Save() logger.info(f"Successfully converted {urdf_path} → {usd_path}") class AssetConverterFactory: """Factory class for creating asset converters based on target and source types.""" @staticmethod def create( target_type: AssetType, source_type: AssetType = "urdf", **kwargs ) -> AssetConverterBase: """Create an asset converter instance based on target and source types.""" if target_type == AssetType.MJCF and source_type == AssetType.URDF: converter = MeshtoMJCFConverter(**kwargs) elif target_type == AssetType.USD and source_type == AssetType.URDF: converter = URDFtoUSDConverter(**kwargs) elif target_type == AssetType.USD and source_type == AssetType.MESH: converter = MeshtoUSDConverter(**kwargs) else: raise ValueError( f"Unsupported converter type: {source_type} -> {target_type}." ) return converter if __name__ == "__main__": # # target_asset_type = AssetType.MJCF # target_asset_type = AssetType.USD # urdf_paths = [ # "outputs/embodiedgen_assets/demo_assets/remote_control/result/remote_control.urdf", # ] # if target_asset_type == AssetType.MJCF: # output_files = [ # "outputs/embodiedgen_assets/demo_assets/remote_control/mjcf/remote_control.mjcf", # ] # asset_converter = AssetConverterFactory.create( # target_type=AssetType.MJCF, # source_type=AssetType.URDF, # ) # elif target_asset_type == AssetType.USD: # output_files = [ # "outputs/embodiedgen_assets/demo_assets/remote_control/usd/remote_control.usd", # ] # asset_converter = AssetConverterFactory.create( # target_type=AssetType.USD, # source_type=AssetType.MESH, # ) # with asset_converter: # for urdf_path, output_file in zip(urdf_paths, output_files): # asset_converter.convert(urdf_path, output_file) # urdf_path = "outputs/embodiedgen_assets/demo_assets/remote_control/result/remote_control.urdf" # output_file = "outputs/embodiedgen_assets/demo_assets/remote_control/usd/remote_control.usd" # asset_converter = AssetConverterFactory.create( # target_type=AssetType.USD, # source_type=AssetType.URDF, # rotate_wxyz=(0.7071, 0.7071, 0, 0), # rotate 90 deg around the X-axis # ) # with asset_converter: # asset_converter.convert(urdf_path, output_file) urdf_path = "/home/users/xinjie.wang/xinjie/infinigen/outputs/exports/kitchen_simple_solve_nos_i_urdf/export_scene/scene.urdf" output_file = "/home/users/xinjie.wang/xinjie/infinigen/outputs/exports/kitchen_simple_solve_nos_i_urdf/mjcf/scene.urdf" asset_converter = URDFtoMJCFConverter() with asset_converter: asset_converter.convert(urdf_path, output_file)