-
Notifications
You must be signed in to change notification settings - Fork 54
[Feature] New Flash with collision Controller #189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kew6688
wants to merge
2
commits into
InternRobotics:dev
Choose a base branch
from
kew6688:flash_update
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
185 changes: 185 additions & 0 deletions
185
...v/utils/internutopia_extension/controllers/vln_move_by_flash_with_collision_controller.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,185 @@ | ||
| import math | ||
| from typing import Any, Dict, List | ||
|
|
||
| import numpy as np | ||
| from internutopia.core.robot.articulation import ArticulationAction | ||
| from internutopia.core.robot.controller import BaseController | ||
| from internutopia.core.robot.robot import BaseRobot | ||
| from internutopia.core.scene.scene import IScene | ||
|
|
||
| from internnav.evaluator.utils.path_plan import world_to_pixel | ||
|
|
||
| from ..configs.controllers.flash_controller import VlnMoveByFlashControllerCfg | ||
|
|
||
|
|
||
| @BaseController.register('VlnMoveByFlashCollisionController') | ||
| class VlnMoveByFlashCollisionController(BaseController): # codespell:ignore | ||
| """ | ||
| Discrete Controller, direct set robot world position to achieve teleport-type locomotion. | ||
| a general controller adaptable to different type of robots. | ||
| """ | ||
|
|
||
| def __init__(self, config: VlnMoveByFlashControllerCfg, robot: BaseRobot, scene: IScene) -> None: | ||
| self._user_config = None | ||
| self.current_steps = 0 | ||
| self.steps_per_action = config.steps_per_action if config.steps_per_action is not None else 200 | ||
|
|
||
| self.forward_distance = config.forward_distance if config.forward_distance is not None else 0.25 | ||
| self.rotation_angle = config.rotation_angle if config.rotation_angle is not None else 15.0 # in degrees | ||
| self.physics_frequency = config.physics_frequency if config.physics_frequency is not None else 240 | ||
|
|
||
| # self.forward_speed = config.forward_speed if config.forward_speed is not None else 0.25 | ||
| # self.rotation_speed = config.rotation_speed if config.rotation_speed is not None else 1.0 | ||
| self.forward_speed = self.forward_distance / self.steps_per_action * self.physics_frequency | ||
| self.rotation_speed = np.deg2rad( | ||
| self.rotation_angle / self.steps_per_action * self.physics_frequency | ||
| ) # 200 is the physics_dt | ||
|
|
||
| self.current_action = None | ||
|
|
||
| super().__init__(config=config, robot=robot, scene=scene) | ||
|
|
||
| def get_new_position_and_rotation(self, robot_position, robot_rotation, action): | ||
| """ | ||
| Calculate robot new state by previous state and action. The move should be based on the controller | ||
| settings. | ||
| Caution: the rotation need to reset pitch and roll to prevent robot falling. This may due to no | ||
| adjustment during the whole path and some rotation accumulated | ||
|
|
||
| Args: | ||
| robot_position (np.ndarray): Current world position of the robot, shape (3,), in [x, y, z] format. | ||
| robot_rotation (np.ndarray): Current world orientation of the robot as a quaternion, shape (4,), in [x, y, z, w] format. | ||
| action (int): Discrete action to apply: | ||
| - 0: no movement (stand still) | ||
| - 1: move forward | ||
| - 2: rotate left | ||
| - 3: rotate right | ||
|
|
||
| Returns: | ||
| Tuple[np.ndarray, np.ndarray]: The new robot position and rotation as (position, rotation), | ||
| both in world frame. | ||
| """ | ||
| from omni.isaac.core.utils.rotations import ( | ||
| euler_angles_to_quat, | ||
| quat_to_euler_angles, | ||
| ) | ||
|
|
||
| _, _, yaw = quat_to_euler_angles(robot_rotation) | ||
| if action == 1: # forward | ||
| dx = self.forward_distance * math.cos(yaw) | ||
| dy = self.forward_distance * math.sin(yaw) | ||
| new_robot_position = robot_position + [dx, dy, 0] | ||
| new_robot_rotation = robot_rotation | ||
| elif action == 2: # left | ||
| new_robot_position = robot_position | ||
| new_yaw = yaw + math.radians(self.rotation_angle) | ||
| new_robot_rotation = euler_angles_to_quat( | ||
| np.array([0.0, 0.0, new_yaw]) | ||
| ) # using 0 to prevent the robot from falling | ||
| elif action == 3: # right | ||
| new_robot_position = robot_position | ||
| new_yaw = yaw - math.radians(self.rotation_angle) | ||
| new_robot_rotation = euler_angles_to_quat(np.array([0.0, 0.0, new_yaw])) | ||
| else: | ||
| new_robot_position = robot_position | ||
| new_robot_rotation = robot_rotation | ||
|
|
||
| return new_robot_position, new_robot_rotation | ||
|
|
||
| def reset_robot_state(self, position, orientation): | ||
| """Set robot state to the new position and orientation. | ||
|
|
||
| Args: | ||
| position, orientation: np.array, issac_robot.get_world_pose() | ||
| """ | ||
| robot = self.robot.articulation | ||
| robot._articulation.set_world_pose(position=position, orientation=orientation) | ||
| robot._articulation.set_world_velocity(np.zeros(6)) | ||
| robot._articulation.set_joint_velocities(np.zeros(len(robot.dof_names))) | ||
| robot._articulation.set_joint_positions(np.zeros(len(robot.dof_names))) | ||
| robot._articulation.set_joint_efforts(np.zeros(len(robot.dof_names))) | ||
|
|
||
| def get_map_info(self, topdown_global_map_camera, dilation_iterations=0, voxel_size=0.1, agent_radius=0.25): | ||
| # 获取 free_map | ||
| min_height = self.robot.get_robot_base().get_world_pose()[0][2] + 0.6 # default robot height | ||
| max_height = 1.55 + 8 | ||
| data_info = topdown_global_map_camera.get_data() | ||
| depth = np.array(data_info['depth']) | ||
| flat_surface_mask = np.ones_like(depth, dtype=bool) | ||
| if self.robot.config.type == 'VLNH1Robot': | ||
| depth_mask = ((depth >= min_height) & (depth < max_height)) | ((depth <= 0.5) & (depth > 0.02)) | ||
| elif self.robot.config.type == 'VLNAliengoRobot': | ||
| base_height = self.robot.get_robot_base().get_world_pose()[0][2] | ||
| foot_height = self.robot.get_ankle_height() | ||
| min_height = base_height - foot_height + 0.05 | ||
| depth_mask = (depth >= min_height) & (depth < max_height) | ||
| free_map = np.zeros_like(depth, dtype=int) | ||
| free_map[flat_surface_mask & depth_mask] = 1 | ||
| # free_map[robot_mask == 1] = 1 # This mask is too large, cover all obstacles around the robot | ||
| return free_map | ||
|
|
||
| def check_collision(self, position, aperture=200) -> bool: | ||
| """ | ||
| Check if there are any obstacles at the position. | ||
| Generate a depth map based on a top down camera and check the position | ||
|
|
||
| Return: | ||
| bool: True if the position is already occupied | ||
| """ | ||
| topdown_global_map_camera = self.robot.sensors['topdown_camera_500'] | ||
| free_map = self.get_map_info(topdown_global_map_camera, dilation_iterations=2) | ||
|
|
||
| # convert position to free map pixel | ||
| camera_pose = topdown_global_map_camera.get_world_pose()[0] | ||
| width, height = topdown_global_map_camera.resolution | ||
| px, py = world_to_pixel(position, camera_pose, aperture, width, height) | ||
|
|
||
| px_int, py_int = int(px), int(py) | ||
| # Get a region: (px, py) and one pixel right/down | ||
| robot_size = 3 | ||
| sub_map = free_map[px_int - robot_size : px_int + robot_size, py_int - robot_size : py_int + robot_size] | ||
| return np.any(sub_map == 0) # 1 = free, so True = collision | ||
|
|
||
| def forward(self, action: int) -> ArticulationAction: | ||
| """ | ||
| Teleport robot by position, orientation and action | ||
|
|
||
| Args: | ||
| action: int | ||
| 0. discrete action (int): 0: stop, 1: forward, 2: left, 3: right | ||
|
|
||
| Returns: | ||
| ArticulationAction: joint signals to apply (nothing). | ||
| """ | ||
| # get robot new position | ||
| positions, orientations = self.robot.articulation.get_world_pose() | ||
| new_robot_position, new_robot_rotation = self.get_new_position_and_rotation(positions, orientations, action) | ||
|
|
||
| # Check if there is a collision with obstacles. Abort the teleport if there is | ||
| if action != 1 or not self.check_collision(new_robot_position): | ||
| # set robot to new state | ||
| self.reset_robot_state(new_robot_position, new_robot_rotation) | ||
| else: | ||
| print("[FLASH CONTROLLER]: Collision detected, flash abort") | ||
|
|
||
| # Dummy action to do nothing | ||
| return ArticulationAction() | ||
|
|
||
| def action_to_control(self, action: List | np.ndarray) -> ArticulationAction: | ||
| """ | ||
| Convert input action (in 1d array format) to joint signals to apply. | ||
|
|
||
| Args: | ||
| action (List | np.ndarray): 1-element 1d array containing | ||
| 0. discrete action (int): 0: stop, 1: forward, 2: left, 3: right | ||
|
|
||
| Returns: | ||
| ArticulationAction: joint signals to apply. | ||
| """ | ||
| assert len(action) == 1, 'action must contain 1 element' | ||
| return self.forward(action=int(action[0])) | ||
|
|
||
| def get_obs(self) -> Dict[str, Any]: | ||
| return { | ||
| 'finished': True, | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
polish it as the standard format