重点类一:ManagerBasedRlEnv(环境生命周期与数据流)#
这一章解决一个核心问题:一个动作 action 从哪里来、如何进入 physics、如何产出 obs/reward/done,并在 reset 时怎样“按正确顺序”重置各模块。
配置入口:ManagerBasedRlEnvCfg#
路径:src/mjlab/envs/manager_based_rl_env.py
ManagerBasedRlEnvCfg 不是 Isaac Lab 那种嵌套 @configclass,而是“ 顶层 dataclass + 多个 dict(name -> term cfg) ”:
# file: src/mjlab/envs/manager_based_rl_env.py
@dataclass(kw_only=True)
class ManagerBasedRlEnvCfg:
decimation: int # 每个 env step 对应多少个 physics step
scene: SceneCfg # 场景(terrain + entities + sensors)
observations: dict[str, ObservationGroupCfg]
actions: dict[str, ActionTermCfg]
events: dict[str, EventTermCfg] = {...} # 默认包含 reset_scene_to_default
rewards: dict[str, RewardTermCfg] = {}
terminations: dict[str, TerminationTermCfg] = {}
commands: dict[str, CommandTermCfg] | None = None
curriculum: dict[str, CurriculumTermCfg] | None = None
sim: SimulationCfg = SimulationCfg()
viewer: ViewerConfig = ViewerConfig()
episode_length_s: float = 0.0
is_finite_horizon: bool = False
你应该记住的 3 个时间尺度:
physics_dt:MuJoCo 的 timestep(
cfg.sim.mujoco.timestep)step_dt:环境控制周期(
physics_dt * decimation)episode_length:由
episode_length_s / step_dt推出来(向上取整)
环境构建:Scene + Simulation + Managers#
ManagerBasedRlEnv.__init__ 的结构非常“可读”,按层次搭积木:
Scene(cfg.scene, device):构建 MuJoCoMjSpec(terrain/entities/sensors),随后 compile 成MjModel。Simulation(num_envs, cfg.sim, model, device):把 MuJoCo model/data 放到 MJWarp 里,准备 GPU step/forward/reset。scene.initialize(mj_model, model, data):把 entity/sensor 绑定到仿真数据。load_managers():把 cfg 里的 dict term_cfg 装配成各类 managers( 顺序非常关键 )。
Manager 加载顺序(为什么重要)#
路径:src/mjlab/envs/manager_based_rl_env.py
# file: src/mjlab/envs/manager_based_rl_env.py
def load_managers(self) -> None:
# 1) EventManager 必须最先:它决定 domain randomization 需要扩展哪些 model fields
self.event_manager = EventManager(self.cfg.events, self)
self.sim.expand_model_fields(self.event_manager.domain_randomization_fields)
# 2) CommandManager 在 ObservationManager 之前:obs 可能引用 command
self.command_manager = CommandManager(...) or NullCommandManager()
# 3) Action + Observation:定义 action/obs 空间与缓存逻辑
self.action_manager = ActionManager(self.cfg.actions, self)
self.observation_manager = ObservationManager(self.cfg.observations, self)
# 4) RL managers:termination/reward/curriculum
self.termination_manager = TerminationManager(self.cfg.terminations, self)
self.reward_manager = RewardManager(self.cfg.rewards, self)
self.curriculum_manager = CurriculumManager(...) or NullCurriculumManager()
self._configure_gym_env_spaces()
if "startup" in self.event_manager.available_modes:
self.event_manager.apply(mode="startup")
这里最容易忽略的一点是:domain randomization = 改 model 字段。因此 env 会用
event_manager.domain_randomization_fields 调用 sim.expand_model_fields,让每个 env 都能有自己的参数(例如不同的摩擦系数)。
step():action → physics → done/reward → reset → obs#
路径:src/mjlab/envs/manager_based_rl_env.py
# file: src/mjlab/envs/manager_based_rl_env.py
def step(self, action: torch.Tensor):
self.action_manager.process_action(action.to(self.device))
for _ in range(self.cfg.decimation):
self.action_manager.apply_action()
self.scene.write_data_to_sim()
self.sim.step()
self.scene.update(dt=self.physics_dt)
self.episode_length_buf += 1
self.common_step_counter += 1
# done / reward
self.reset_buf = self.termination_manager.compute()
self.reward_buf = self.reward_manager.compute(dt=self.step_dt)
# reset(terminated 或 time_out)
reset_env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(reset_env_ids) > 0:
self._reset_idx(reset_env_ids)
self.scene.write_data_to_sim()
self.sim.forward()
# command + events
self.command_manager.compute(dt=self.step_dt)
if "interval" in self.event_manager.available_modes:
self.event_manager.apply(mode="interval", dt=self.step_dt)
# obs(注意 update_history=True)
self.obs_buf = self.observation_manager.compute(update_history=True)
return self.obs_buf, self.reward_buf, self.reset_terminated, self.reset_time_outs, self.extras
你可以把 step 看作一个严格顺序的 pipeline:动作先进入 physics,之后才计算 termination/reward,然后可能 reset,最后再算 obs。
reset 的顺序敏感(为什么 extras/log 很重要)#
路径:src/mjlab/envs/manager_based_rl_env.py
_reset_idx 在 reset 时会按固定顺序 reset managers,并把每个 manager 的统计写进 extras["log"]:
# file: src/mjlab/envs/manager_based_rl_env.py
def _reset_idx(self, env_ids):
self.curriculum_manager.compute(env_ids=env_ids)
self.sim.reset(env_ids)
self.scene.reset(env_ids)
if "reset" in self.event_manager.available_modes:
self.event_manager.apply(mode="reset", env_ids=env_ids, global_env_step_count=...)
self.extras["log"] = {}
self.extras["log"].update(self.observation_manager.reset(env_ids))
self.extras["log"].update(self.action_manager.reset(env_ids))
self.extras["log"].update(self.reward_manager.reset(env_ids))
self.extras["log"].update(self.curriculum_manager.reset(env_ids))
self.extras["log"].update(self.command_manager.reset(env_ids))
self.extras["log"].update(self.event_manager.reset(env_ids))
self.extras["log"].update(self.termination_manager.reset(env_ids))
这套顺序的直觉是:
obs/action/reward 这类模块通常维护 buffers 或 episodic accumulators,reset 时要先清。
command/event 可能在 reset 时 resample,且 event(reset) 可能会改仿真状态。
termination 最后重置统计,避免把上一回合的终止原因污染到下一回合。
finite horizon vs infinite horizon(terminated vs truncated)#
TerminationManager 会把不同 termination term 分成两类:
time_out=True→ truncated其他 → terminated
路径:src/mjlab/managers/termination_manager.py
这对 value bootstrapping 很关键(尤其是 PPO 这类算法)。