Skip to content

能指导一下real deploy吗? #27

@JackChenFight

Description

@JackChenFight

我尝试将模型应用到G1上面,但是启动代码后,机器人完成准备动作后,就躺在地上一动不动。可以分享一下你的deploy代码吗?
或者看看我的适配的代码呢?感激不尽啦~~~~

import sys
from legged_gym import LEGGED_GYM_ROOT_DIR
from typing import Union


from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber,ChannelFactoryInitialize

from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_, unitree_hg_msg_dds__LowState_
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_, unitree_go_msg_dds__LowState_
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ as LowCmdHG
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ as LowCmdGo
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ as LowStateHG
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ as LowStateGo
from unitree_sdk2py.utils.crc import CRC

from common.command_helper import create_damping_cmd, create_zero_cmd, init_cmd_hg, init_cmd_go, MotorMode
from common.rotation_helper import get_gravity_orientation, transform_imu_data
from common.remote_controller import RemoteController, KeyMap
from config import Config

import os

import isaacgym
from legged_gym.envs import *
from legged_gym.utils import  get_args, export_policy_as_jit, task_registry, Logger

import time
import torch

import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict
from multiprocessing import Process, Value

class Controller:
    def __init__(self, config: Config, args) -> None:
        self.config = config
        self.remote_controller = RemoteController()

        # Initializing process variables
        self.qj = np.zeros(config.num_actions, dtype=np.float32) 
        self.dqj = np.zeros(config.num_actions, dtype=np.float32) 
        self.action = np.zeros(config.num_actions, dtype=np.float32) # action 12
        self.target_dof_pos = config.default_angles.copy() # target joint position 12
        self.obs = np.zeros(config.num_obs, dtype=np.float32) # observation 47
        self.cmd = np.array([0.0, 0, 0]) # command 3
        self.counter = 0
        self.default_pos = np.concatenate((self.config.default_angles, self.config.arm_waist_target), axis=0)
        
        # --------------加载模型相关参数(这是我后加的)-------------------------

        env_cfg, train_cfg = task_registry.get_cfgs(name=args.task)
        env_cfg.env.num_envs = min(env_cfg.env.num_envs, 1) # 最多100个并行环境,这个是针对仿真环境
        env_cfg.terrain.num_rows = 4
        env_cfg.terrain.num_cols = 4
        env_cfg.terrain.curriculum = False
        env_cfg.noise.add_noise = False
        env_cfg.control.action_scale = 0.3
        env_cfg.curriculum.pull_force = False
        env_cfg.env.test = True
        self.env_cfg = env_cfg
        self.train_cfg = train_cfg

        # prepare environment
        env, _ = task_registry.make_env(name=args.task, args=args, env_cfg=env_cfg)
        self.obs = env.get_observations() # torch.Size([100, 456])
        # 3. 加载训练好的策略
        train_cfg.runner.resume = False  # 启用恢复模式(加载预训练模型)
        ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, env_cfg=env_cfg, name=args.task, args=args, train_cfg=train_cfg)
        self.policy = ppo_runner.get_inference_policy(device=env.device)
        
        logger = Logger(env.dt)
        
        self.env = env
        self.env_cfg    = env_cfg
        self.train_cfg  = train_cfg
        self.logger     = logger

        #-----------------------------------------------
        if config.msg_type == "hg":
            # g1 and h1_2 use the hg msg type
            self.low_cmd = unitree_hg_msg_dds__LowCmd_()
            self.low_state = unitree_hg_msg_dds__LowState_()
            self.mode_pr_ = MotorMode.PR
            self.mode_machine_ = 0

            self.lowcmd_publisher_ = ChannelPublisher(config.lowcmd_topic, LowCmdHG)
            self.lowcmd_publisher_.Init()

            self.lowstate_subscriber = ChannelSubscriber(config.lowstate_topic, LowStateHG)
            self.lowstate_subscriber.Init(self.LowStateHgHandler, 10)

        elif config.msg_type == "go":
            # h1 uses the go msg type
            self.low_cmd = unitree_go_msg_dds__LowCmd_()
            self.low_state = unitree_go_msg_dds__LowState_()

            self.lowcmd_publisher_ = ChannelPublisher(config.lowcmd_topic, LowCmdGo)
            self.lowcmd_publisher_.Init()

            self.lowstate_subscriber = ChannelSubscriber(config.lowstate_topic, LowStateGo)
            self.lowstate_subscriber.Init(self.LowStateGoHandler, 10)

        else:
            raise ValueError("Invalid msg_type")

        # wait for the subscriber to receive data
        self.wait_for_low_state()

        # Initialize the command msg
        if config.msg_type == "hg":
            init_cmd_hg(self.low_cmd, self.mode_machine_, self.mode_pr_)
        elif config.msg_type == "go":
            init_cmd_go(self.low_cmd, weak_motor=self.config.weak_motor)

    def LowStateHgHandler(self, msg: LowStateHG):
        self.low_state = msg
        self.mode_machine_ = self.low_state.mode_machine
        self.remote_controller.set(self.low_state.wireless_remote)

    def LowStateGoHandler(self, msg: LowStateGo):
        self.low_state = msg
        self.remote_controller.set(self.low_state.wireless_remote)

    def send_cmd(self, cmd: Union[LowCmdGo, LowCmdHG]):
        cmd.crc = CRC().Crc(cmd)
        self.lowcmd_publisher_.Write(cmd)

    def wait_for_low_state(self):
        while self.low_state.tick == 0:
            time.sleep(self.config.control_dt)
        print("Successfully connected to the robot.")

    def zero_torque_state(self):
        print("Enter zero torque state.")
        print("Waiting for the start signal...")
        while self.remote_controller.button[KeyMap.start] != 1:
            create_zero_cmd(self.low_cmd)
            self.send_cmd(self.low_cmd)
            time.sleep(self.config.control_dt)

    def move_to_default_pos(self):
        print("Moving to default pos.")
        # move time 2s
        total_time = 2
        num_step = int(total_time / self.config.control_dt)
        
        dof_idx = self.config.leg_joint2motor_idx + self.config.arm_waist_joint2motor_idx
        kps = self.config.kps + self.config.arm_waist_kps
        kds = self.config.kds + self.config.arm_waist_kds
        default_pos = np.concatenate((self.config.default_angles, self.config.arm_waist_target), axis=0)
        dof_size = len(dof_idx)
        
        # record the current pos
        init_dof_pos = np.zeros(dof_size, dtype=np.float32)
        for i in range(dof_size):
            init_dof_pos[i] = self.low_state.motor_state[dof_idx[i]].q
        
        # move to default pos
        for i in range(num_step):
            alpha = i / num_step
            for j in range(dof_size):
                motor_idx = dof_idx[j]
                target_pos = default_pos[j]
                self.low_cmd.motor_cmd[motor_idx].q = init_dof_pos[j] * (1 - alpha) + target_pos * alpha
                self.low_cmd.motor_cmd[motor_idx].qd = 0
                self.low_cmd.motor_cmd[motor_idx].kp = kps[j]
                self.low_cmd.motor_cmd[motor_idx].kd = kds[j]
                self.low_cmd.motor_cmd[motor_idx].tau = 0
            self.send_cmd(self.low_cmd)
            time.sleep(self.config.control_dt)

    def default_pos_state(self):
        print("Enter default pos state.")
        print("Waiting for the Button A signal...")
        while self.remote_controller.button[KeyMap.A] != 1:
            for i in range(len(self.config.leg_joint2motor_idx)):
                motor_idx = self.config.leg_joint2motor_idx[i]
                self.low_cmd.motor_cmd[motor_idx].q = self.config.default_angles[i]
                self.low_cmd.motor_cmd[motor_idx].qd = 0
                self.low_cmd.motor_cmd[motor_idx].kp = self.config.kps[i]
                self.low_cmd.motor_cmd[motor_idx].kd = self.config.kds[i]
                self.low_cmd.motor_cmd[motor_idx].tau = 0
            for i in range(len(self.config.arm_waist_joint2motor_idx)):
                motor_idx = self.config.arm_waist_joint2motor_idx[i]
                self.low_cmd.motor_cmd[motor_idx].q = self.config.arm_waist_target[i]
                self.low_cmd.motor_cmd[motor_idx].qd = 0
                self.low_cmd.motor_cmd[motor_idx].kp = self.config.arm_waist_kps[i]
                self.low_cmd.motor_cmd[motor_idx].kd = self.config.arm_waist_kds[i]
                self.low_cmd.motor_cmd[motor_idx].tau = 0
            self.send_cmd(self.low_cmd)
            time.sleep(self.config.control_dt)

    def wait_begin(self):
        print("Enter default pos state.")
        print("Waiting for the Button A signal...")
        while self.remote_controller.button[KeyMap.A] != 1:
            time.sleep(self.config.control_dt)
            
    def run(self, enable_real_pose=False ):
        self.counter += 1
        if enable_real_pose:
            # Get the current joint position and velocity
            for i in range(len(self.config.leg_joint2motor_idx)):
                self.qj[i] = self.low_state.motor_state[self.config.leg_joint2motor_idx[i]].q
                self.dqj[i] = self.low_state.motor_state[self.config.leg_joint2motor_idx[i]].dq

            # imu_state quaternion: w, x, y, z
            quat = self.low_state.imu_state.quaternion
            ang_vel = np.array([self.low_state.imu_state.gyroscope], dtype=np.float32)

            if self.config.imu_type == "torso":
                # h1 and h1_2 imu is on the torso
                # imu data needs to be transformed to the pelvis frame
                waist_yaw = self.low_state.motor_state[self.config.arm_waist_joint2motor_idx[0]].q
                waist_yaw_omega = self.low_state.motor_state[self.config.arm_waist_joint2motor_idx[0]].dq
                quat, ang_vel = transform_imu_data(waist_yaw=waist_yaw, waist_yaw_omega=waist_yaw_omega, imu_quat=quat, imu_omega=ang_vel)

            # create observation
            gravity_orientation = get_gravity_orientation(quat)
            qj_obs = self.qj.copy()
            dqj_obs = self.dqj.copy()
            qj_obs = (qj_obs - self.config.default_angles) * self.config.dof_pos_scale
            dqj_obs = dqj_obs * self.config.dof_vel_scale
            ang_vel = ang_vel * self.config.ang_vel_scale
            period = 0.8
            count = self.counter * self.config.control_dt
            phase = count % period / period
            sin_phase = np.sin(2 * np.pi * phase)
            cos_phase = np.cos(2 * np.pi * phase)

            num_actions = self.config.num_actions
            self.obs[:3] = ang_vel
            self.obs[3:6] = gravity_orientation
            self.obs[6:9] = self.cmd * self.config.cmd_scale * self.config.max_cmd
            self.obs[9 : 9 + num_actions] = qj_obs
            self.obs[9 + num_actions : 9 + num_actions * 2] = dqj_obs
            self.obs[9 + num_actions * 2 : 9 + num_actions * 3] = self.action
            self.obs[9 + num_actions * 3] = sin_phase
            self.obs[9 + num_actions * 3 + 1] = cos_phase
        
            self.cmd[0] = self.remote_controller.ly
            self.cmd[1] = self.remote_controller.lx * -1
            self.cmd[2] = self.remote_controller.rx * -1
        # Get the action from the policy network
        result  = self.env.gym.fetch_results(self.env.sim, True)
        action = self.policy(self.obs.detach())    # torch.Size([num_env, 23]),num_env是指环境个数
        self.obs, _, rews, dones, infos = self.env.step(action.detach())  # 执行动作并更新状态
        self.action = self.action + action
        # transform action to target_dof_pos
        # target_dof_pos = self.config.default_angles + self.action * self.config.action_scale 
        target_dof_pos = self.action * self.config.action_scale 
        target_dof_pos = target_dof_pos.detach().cpu().numpy().squeeze()
        # Build low cmd
        for i in range(len(self.config.leg_joint2motor_idx)):
            motor_idx = self.config.leg_joint2motor_idx[i]
            self.low_cmd.motor_cmd[motor_idx].q = target_dof_pos[i]
            self.low_cmd.motor_cmd[motor_idx].qd = 0
            self.low_cmd.motor_cmd[motor_idx].kp = self.config.kps[i]
            self.low_cmd.motor_cmd[motor_idx].kd = self.config.kds[i]
            self.low_cmd.motor_cmd[motor_idx].tau = 0
        # 上半身控制,我这边有23-12=11个关节,但是需要输入29-12=17个关节,因此有些关节要设为0
        # 12, (13, 14)控制腰部, 19,(20,21) 控制左手腕,26,(27,28) 控制右手腕
        for i in range(len(self.config.arm_waist_joint2motor_idx)):
            # 因为机器人是23个自由度,但是idx是29个自由度,因此有些关节需要重新调整一下。
            if i == 13 or i == 14 or i == 20 or i == 21 or i == 27 or i == 28:
                continue
            motor_idx = self.config.arm_waist_joint2motor_idx[i]
            self.low_cmd.motor_cmd[motor_idx].q = target_dof_pos[i]
            self.low_cmd.motor_cmd[motor_idx].qd = 0
            self.low_cmd.motor_cmd[motor_idx].kp = self.config.arm_waist_kps[i]
            self.low_cmd.motor_cmd[motor_idx].kd = self.config.arm_waist_kds[i]
            self.low_cmd.motor_cmd[motor_idx].tau = 0

        # send the command
        self.send_cmd(self.low_cmd)

        time.sleep(self.config.control_dt)


if __name__ == "__main__":
    args = get_args()
    
    robot_net_name = "enp2s0"
    robot_config_name = "g1.yaml"
    # Load config
    config_path = f"{LEGGED_GYM_ROOT_DIR}/../deploy/deploy_real/configs/{robot_config_name}"
    config = Config(config_path)

    # Initialize DDS communication
    ChannelFactoryInitialize(0, robot_net_name)

    controller = Controller(config,args)

    # Enter the zero torque state, press the start key to continue executing
    controller.zero_torque_state()

    # Move to the default position
    controller.move_to_default_pos()

    # Enter the default position state, press the A key to continue executing
    controller.wait_begin()

    while True:
        try:
            controller.run()
            # Press the select key to exit
            if controller.remote_controller.button[KeyMap.select] == 1:
                break
        except KeyboardInterrupt:
            break
    # Enter the damping state
    create_damping_cmd(controller.low_cmd)
    controller.send_cmd(controller.low_cmd)
    print("Exit")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions