From c7d0756ff6a539cf35a57fdb09994c799e2b1965 Mon Sep 17 00:00:00 2001 From: Fangzhou Yu Date: Mon, 7 Oct 2024 22:13:58 -0400 Subject: [PATCH 1/6] add torchrl workflow --- .../standalone/workflows/torchrl/cli_args.py | 145 ++++++++++++++++++ source/standalone/workflows/torchrl/play.py | 133 ++++++++++++++++ source/standalone/workflows/torchrl/train.py | 139 +++++++++++++++++ 3 files changed, 417 insertions(+) create mode 100644 source/standalone/workflows/torchrl/cli_args.py create mode 100644 source/standalone/workflows/torchrl/play.py create mode 100644 source/standalone/workflows/torchrl/train.py diff --git a/source/standalone/workflows/torchrl/cli_args.py b/source/standalone/workflows/torchrl/cli_args.py new file mode 100644 index 0000000000..c982d52094 --- /dev/null +++ b/source/standalone/workflows/torchrl/cli_args.py @@ -0,0 +1,145 @@ +# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved. + +from __future__ import annotations + +import argparse +import random +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from omni.isaac.lab_tasks.utils.wrappers.torchrl import OnPolicyPPORunnerCfg + + +def add_torchrl_args(parser: argparse.ArgumentParser): + """Add TorchRL arguments to the parser. + + Adds the following fields to argparse: + - "--experiment_name" : Name of the experiment folder where logs will be stored (default: None). + - "--run_name" : Run name suffix to the log directory (default: None). + - "--resume" : Whether to resume from a checkpoint (default: None). + - "--load_run" : Name of the run folder to resume from (default: None). + - "--checkpoint" : Checkpoint file to resume from (default: None). + - "--logger" : Logger module to use (default: None). + - "--log_project_name" : Name of the logging project when using wandb or neptune (default: None). + Args: + parser: The parser to add the arguments to. + """ + # create a new argument group + arg_group = parser.add_argument_group("torchrl", description="Arguments for RSL-RL agent.") + # -- experiment arguments + arg_group.add_argument( + "--experiment_name", + type=str, + default=None, + help="Name of the experiment folder where logs will be stored.", + ) + arg_group.add_argument( + "--run_name", + type=str, + default=None, + help="Run name suffix to the log directory.", + ) + # -- load arguments + arg_group.add_argument( + "--resume", + type=bool, + default=None, + help="Whether to resume from a checkpoint.", + ) + arg_group.add_argument( + "--load_run", + type=str, + default=None, + help="Name of the run folder to resume from.", + ) + arg_group.add_argument( + "--checkpoint", + type=str, + default=None, + help="Checkpoint file to resume from.", + ) + # -- logger arguments + arg_group.add_argument( + "--logger", + type=str, + default=None, + choices={"wandb", "tensorboard", "neptune"}, + help="Logger module to use.", + ) + arg_group.add_argument( + "--log_project_name", + type=str, + default=None, + help="Name of the logging project when using wandb or neptune.", + ) + + +def parse_torchrl_cfg(task_name: str, args_cli: argparse.Namespace) -> OnPolicyPPORunnerCfg: + """Parse configuration for RSL-RL agent based on inputs. + + Args: + task_name: The name of the environment. + args_cli: The command line arguments. + + Returns: + The parsed configuration for RSL-RL agent based on inputs. + """ + from omni.isaac.lab_tasks.utils.parse_cfg import load_cfg_from_registry + + # load the default configuration + torchrl_cfg: OnPolicyPPORunnerCfg = load_cfg_from_registry(task_name, "torchrl_cfg_entry_point") + + # override the default configuration with CLI arguments + torchrl_cfg.device = "cpu" if args_cli.cpu else f"cuda:{args_cli.physics_gpu}" + + # override the default configuration with CLI arguments + if args_cli.seed is not None: + torchrl_cfg.seed = args_cli.seed + if args_cli.resume is not None: + torchrl_cfg.resume = args_cli.resume + if args_cli.load_run is not None: + torchrl_cfg.load_run = args_cli.load_run + if args_cli.checkpoint is not None: + torchrl_cfg.load_checkpoint = args_cli.checkpoint + if args_cli.run_name is not None: + torchrl_cfg.run_name = args_cli.run_name + if args_cli.logger is not None: + torchrl_cfg.logger = args_cli.logger + # set the project name for wandb and neptune + if torchrl_cfg.logger == "wandb" and args_cli.log_project_name: + torchrl_cfg.wandb_project = args_cli.log_project_name + + return torchrl_cfg + +def update_torchrl_cfg(agent_cfg: OnPolicyPPORunnerCfg, args_cli: argparse.Namespace): + """Update configuration for torchrl agent based on inputs. + + Args: + agent_cfg: The configuration for torchrl agent. + args_cli: The command line arguments. + + Returns: + The updated configuration for torchrl agent based on inputs. + """ + # override the default configuration with CLI arguments + if hasattr(args_cli, "seed") and args_cli.seed is not None: + # randomly sample a seed if seed = -1 + if args_cli.seed == -1: + args_cli.seed = random.randint(0, 10000) + agent_cfg.seed = args_cli.seed + if args_cli.resume is not None: + agent_cfg.resume = args_cli.resume + if args_cli.load_run is not None: + agent_cfg.load_run = args_cli.load_run + if args_cli.checkpoint is not None: + agent_cfg.load_checkpoint = args_cli.checkpoint + if args_cli.run_name is not None: + agent_cfg.run_name = args_cli.run_name + if args_cli.logger is not None: + agent_cfg.logger = args_cli.logger + # set the project name for wandb and neptune + if agent_cfg.logger in {"wandb"} and args_cli.log_project_name: + agent_cfg.wandb_project = args_cli.log_project_name + + return agent_cfg + diff --git a/source/standalone/workflows/torchrl/play.py b/source/standalone/workflows/torchrl/play.py new file mode 100644 index 0000000000..92b870d513 --- /dev/null +++ b/source/standalone/workflows/torchrl/play.py @@ -0,0 +1,133 @@ +# Copyright (c) 2022-2024, The Isaac Lab Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Script to play a checkpoint if an RL agent from RSL-RL.""" + +"""Launch Isaac Sim Simulator first.""" + +import argparse + +from omni.isaac.lab.app import AppLauncher + +# local imports +import cli_args # isort: skip + +# add argparse arguments +parser = argparse.ArgumentParser(description="Play an RL agent with TorchRL.") +parser.add_argument("--video", action="store_true", default=False, help="Record videos during play.") +parser.add_argument("--video_length", type=int, default=200, help="Length of the recorded video (in steps).") +parser.add_argument( + "--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations." +) +parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.") +parser.add_argument("--task", type=str, default=None, help="Name of the task.") +parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") + +# append RSL-RL cli arguments +cli_args.add_torchrl_args(parser) +# append AppLauncher cli args +AppLauncher.add_app_launcher_args(parser) +args_cli = parser.parse_args() +# always enable cameras to record video +if args_cli.video: + args_cli.enable_cameras = True + +# launch omniverse app +app_launcher = AppLauncher(args_cli) +simulation_app = app_launcher.app + +"""Rest everything follows.""" + +import gymnasium as gym +import os +import torch + + +from omni.isaac.lab.utils.dict import print_dict + +import omni.isaac.lab_tasks # noqa: F401 +from torchrl.envs.utils import ExplorationType, set_exploration_type +from omni.isaac.lab_tasks.utils import get_checkpoint_path, parse_env_cfg +from omni.isaac.lab_tasks.utils.wrappers.torchrl import ( + OnPolicyPPORunnerCfg, + OnPolicyPPORunner, + TorchRLEnvWrapper, + export_policy_as_onnx +) + +def main(): + """Play with TorchRL agent.""" + # parse configuration + env_cfg = parse_env_cfg( + args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric + ) + agent_cfg: OnPolicyPPORunnerCfg = cli_args.parse_torchrl_cfg(args_cli.task, args_cli) + + # specify directory for logging experiments + log_root_path = os.path.join("logs", "torchrl", agent_cfg.experiment_name) + log_root_path = os.path.abspath(log_root_path) + print(f"[INFO] Loading experiment from directory: {log_root_path}") + resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint) + log_dir = os.path.dirname(resume_path) + + # create isaac environment + env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None) + # wrap for video recording + if args_cli.video: + video_kwargs = { + "video_folder": os.path.join(log_dir, "videos", "play"), + "step_trigger": lambda step: step == 0, + "video_length": args_cli.video_length, + "disable_logger": True, + } + print("[INFO] Recording videos during training.") + print_dict(video_kwargs, nesting=4) + env = gym.wrappers.RecordVideo(env, **video_kwargs) + + # wrap around environment for rsl-rl + env = TorchRLEnvWrapper(env) + + print(f"environment observation spec: {env.observation_spec}") + print(f"environment action spec: {env.action_spec}") + + print(f"[INFO]: Loading model checkpoint from: {resume_path}") + # load previously trained model + ppo_runner = OnPolicyPPORunner(env, agent_cfg, log_dir=None, device=agent_cfg.device) + ppo_runner.load(resume_path, eval_mode=True) + + # obtain the trained policy for inference + policy = ppo_runner.actor_module + + # export policy to onnx/jit + export_model_dir = os.path.join(os.path.dirname(resume_path), "exported") + export_policy_as_onnx( + ppo_runner.loss_module_cfg, normalizer=None, path=export_model_dir, filename="policy.onnx" + ) + + # reset environment + td = env.reset() + timestep = 0 + # simulate environment + while simulation_app.is_running(): + # run everything in inference mode + with set_exploration_type(ExplorationType.MEAN), torch.inference_mode(): + # agent stepping + td = policy(td) + td = env.step(td) + if args_cli.video: + timestep += 1 + # Exit the play loop after recording one video + if timestep == args_cli.video_length: + break + + # close the simulator + env.close() + + +if __name__ == "__main__": + # run the main function + main() + # close sim app + simulation_app.close() diff --git a/source/standalone/workflows/torchrl/train.py b/source/standalone/workflows/torchrl/train.py new file mode 100644 index 0000000000..b93fb4b778 --- /dev/null +++ b/source/standalone/workflows/torchrl/train.py @@ -0,0 +1,139 @@ +# Copyright (c) 2022-2024, The Isaac Lab Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Script to train RL agent with torchrl.""" + +"""Launch Isaac Sim Simulator first.""" + +"""Launch Isaac Sim Simulator first.""" + +import argparse +import sys + +from omni.isaac.lab.app import AppLauncher + +# local imports +import cli_args # isort: skip + + +# add argparse arguments +parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL.") +parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.") +parser.add_argument("--video_length", type=int, default=200, help="Length of the recorded video (in steps).") +parser.add_argument("--video_interval", type=int, default=2000, help="Interval between video recordings (in steps).") +parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.") +parser.add_argument("--task", type=str, default=None, help="Name of the task.") +parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") +parser.add_argument("--max_iterations", type=int, default=None, help="RL Policy training iterations.") +# append RSL-RL cli arguments +cli_args.add_torchrl_args(parser) +# append AppLauncher cli args +AppLauncher.add_app_launcher_args(parser) +args_cli, hydra_args = parser.parse_known_args() + +# always enable cameras to record video +if args_cli.video: + args_cli.enable_cameras = True + +# clear out sys.argv for Hydra +sys.argv = [sys.argv[0]] + hydra_args + +# launch omniverse app +app_launcher = AppLauncher(args_cli) +simulation_app = app_launcher.app + +"""Rest everything follows.""" + +import gymnasium as gym +import os +import torch +from datetime import datetime + +from omni.isaac.lab_tasks.utils.wrappers.torchrl import TorchRLEnvWrapper, OnPolicyPPORunnerCfg, OnPolicyPPORunner + +from omni.isaac.lab.envs import ( + ManagerBasedRLEnvCfg +) +from omni.isaac.lab.utils.dict import print_dict +from omni.isaac.lab.utils.io import dump_pickle, dump_yaml +from omni.isaac.lab_tasks.utils import get_checkpoint_path +from omni.isaac.lab_tasks.utils.hydra import hydra_task_config + +torch.backends.cuda.matmul.allow_tf32 = True +torch.backends.cudnn.allow_tf32 = True +torch.backends.cudnn.deterministic = False +torch.backends.cudnn.benchmark = False + +@hydra_task_config(args_cli.task, "torchrl_cfg_entry_point") +def main(env_cfg: ManagerBasedRLEnvCfg, agent_cfg: OnPolicyPPORunnerCfg): + """Train with RSL-RL agent.""" + # override configurations with non-hydra CLI arguments + agent_cfg = cli_args.update_torchrl_cfg(agent_cfg, args_cli) + env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs + agent_cfg.max_iterations = ( + args_cli.max_iterations if args_cli.max_iterations is not None else agent_cfg.max_iterations + ) + + # set the environment seed + # note: certain randomizations occur in the environment initialization so we set the seed here + env_cfg.seed = agent_cfg.seed + env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device + + # specify directory for logging experiments + log_root_path = os.path.join("logs", "torchrl", agent_cfg.experiment_name) + log_root_path = os.path.abspath(log_root_path) + print(f"[INFO] Logging experiment in directory: {log_root_path}") + # specify directory for logging runs: {time-stamp}_{run_name} + log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + if agent_cfg.run_name: + log_dir += f"_{agent_cfg.run_name}" + log_dir = os.path.join(log_root_path, log_dir) + + # create isaac environment + env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None) + # wrap for video recording + if args_cli.video: + video_kwargs = { + "video_folder": os.path.join(log_dir, "videos", "train"), + "step_trigger": lambda step: step % args_cli.video_interval == 0, + "video_length": args_cli.video_length, + "disable_logger": True, + } + print("[INFO] Recording videos during training.") + print_dict(video_kwargs, nesting=4) + env = gym.wrappers.RecordVideo(env, **video_kwargs) + + # wrap environment for TorchRL + env = TorchRLEnvWrapper(env) + + # create runner from rsl-rl + runner = OnPolicyPPORunner(env, agent_cfg, log_dir=log_dir, device=agent_cfg.device) + # save resume path before creating a new log_dir + if agent_cfg.resume: + # get path to previous checkpoint + resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint) + print(f"[INFO]: Loading model checkpoint from: {resume_path}") + # load previously trained model + runner.load(resume_path, eval_mode=False) + + env.unwrapped.seed(agent_cfg.seed) + + # dump the configuration into log-directory + dump_yaml(os.path.join(log_dir, "params", "env.yaml"), env_cfg) + dump_pickle(os.path.join(log_dir, "params", "env.pkl"), env_cfg) + + # run training + runner.learn(init_at_random_ep_len=True) + + # close the simulator + env.close() + + +if __name__ == "__main__": + # run the main function + main() + # close sim app + simulation_app.close() + From 6d165a73411a154ccc67e939667c18d26833d51f Mon Sep 17 00:00:00 2001 From: Fangzhou Yu Date: Mon, 7 Oct 2024 22:14:42 -0400 Subject: [PATCH 2/6] run formatter --- source/standalone/workflows/torchrl/cli_args.py | 7 ++++++- source/standalone/workflows/torchrl/play.py | 11 +++++------ source/standalone/workflows/torchrl/train.py | 10 ++++------ 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/source/standalone/workflows/torchrl/cli_args.py b/source/standalone/workflows/torchrl/cli_args.py index c982d52094..5c9062eee1 100644 --- a/source/standalone/workflows/torchrl/cli_args.py +++ b/source/standalone/workflows/torchrl/cli_args.py @@ -1,3 +1,8 @@ +# Copyright (c) 2022-2024, The Isaac Lab Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + # Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved. from __future__ import annotations @@ -111,6 +116,7 @@ def parse_torchrl_cfg(task_name: str, args_cli: argparse.Namespace) -> OnPolicyP return torchrl_cfg + def update_torchrl_cfg(agent_cfg: OnPolicyPPORunnerCfg, args_cli: argparse.Namespace): """Update configuration for torchrl agent based on inputs. @@ -142,4 +148,3 @@ def update_torchrl_cfg(agent_cfg: OnPolicyPPORunnerCfg, args_cli: argparse.Names agent_cfg.wandb_project = args_cli.log_project_name return agent_cfg - diff --git a/source/standalone/workflows/torchrl/play.py b/source/standalone/workflows/torchrl/play.py index 92b870d513..7ae8c5cdcc 100644 --- a/source/standalone/workflows/torchrl/play.py +++ b/source/standalone/workflows/torchrl/play.py @@ -44,19 +44,20 @@ import os import torch +from torchrl.envs.utils import ExplorationType, set_exploration_type from omni.isaac.lab.utils.dict import print_dict import omni.isaac.lab_tasks # noqa: F401 -from torchrl.envs.utils import ExplorationType, set_exploration_type from omni.isaac.lab_tasks.utils import get_checkpoint_path, parse_env_cfg from omni.isaac.lab_tasks.utils.wrappers.torchrl import ( - OnPolicyPPORunnerCfg, OnPolicyPPORunner, + OnPolicyPPORunnerCfg, TorchRLEnvWrapper, - export_policy_as_onnx + export_policy_as_onnx, ) + def main(): """Play with TorchRL agent.""" # parse configuration @@ -102,9 +103,7 @@ def main(): # export policy to onnx/jit export_model_dir = os.path.join(os.path.dirname(resume_path), "exported") - export_policy_as_onnx( - ppo_runner.loss_module_cfg, normalizer=None, path=export_model_dir, filename="policy.onnx" - ) + export_policy_as_onnx(ppo_runner.loss_module_cfg, normalizer=None, path=export_model_dir, filename="policy.onnx") # reset environment td = env.reset() diff --git a/source/standalone/workflows/torchrl/train.py b/source/standalone/workflows/torchrl/train.py index b93fb4b778..76d2edc6f6 100644 --- a/source/standalone/workflows/torchrl/train.py +++ b/source/standalone/workflows/torchrl/train.py @@ -51,21 +51,20 @@ import torch from datetime import datetime -from omni.isaac.lab_tasks.utils.wrappers.torchrl import TorchRLEnvWrapper, OnPolicyPPORunnerCfg, OnPolicyPPORunner - -from omni.isaac.lab.envs import ( - ManagerBasedRLEnvCfg -) +from omni.isaac.lab.envs import ManagerBasedRLEnvCfg from omni.isaac.lab.utils.dict import print_dict from omni.isaac.lab.utils.io import dump_pickle, dump_yaml + from omni.isaac.lab_tasks.utils import get_checkpoint_path from omni.isaac.lab_tasks.utils.hydra import hydra_task_config +from omni.isaac.lab_tasks.utils.wrappers.torchrl import OnPolicyPPORunner, OnPolicyPPORunnerCfg, TorchRLEnvWrapper torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True torch.backends.cudnn.deterministic = False torch.backends.cudnn.benchmark = False + @hydra_task_config(args_cli.task, "torchrl_cfg_entry_point") def main(env_cfg: ManagerBasedRLEnvCfg, agent_cfg: OnPolicyPPORunnerCfg): """Train with RSL-RL agent.""" @@ -136,4 +135,3 @@ def main(env_cfg: ManagerBasedRLEnvCfg, agent_cfg: OnPolicyPPORunnerCfg): main() # close sim app simulation_app.close() - From 326d367df5254cafd4b8e2d1b3b8dc27083fb44c Mon Sep 17 00:00:00 2001 From: Fangzhou Yu Date: Mon, 7 Oct 2024 22:19:27 -0400 Subject: [PATCH 3/6] edit changelog --- source/extensions/omni.isaac.lab/docs/CHANGELOG.rst | 9 +++++++++ source/standalone/workflows/torchrl/play.py | 4 ++-- source/standalone/workflows/torchrl/train.py | 8 +++----- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/source/extensions/omni.isaac.lab/docs/CHANGELOG.rst b/source/extensions/omni.isaac.lab/docs/CHANGELOG.rst index 2e67c3708c..16f5548101 100644 --- a/source/extensions/omni.isaac.lab/docs/CHANGELOG.rst +++ b/source/extensions/omni.isaac.lab/docs/CHANGELOG.rst @@ -1,6 +1,15 @@ Changelog --------- +0.24.19 (2024-10-07) +~~~~~~~~~~~~~~~~~~~~ + +Added +^^^^^ + +* Added `play.py` and `train.py` scripts to support new torchrl workflow. + + 0.24.19 (2024-10-05) ~~~~~~~~~~~~~~~~~~~~ diff --git a/source/standalone/workflows/torchrl/play.py b/source/standalone/workflows/torchrl/play.py index 7ae8c5cdcc..bcffe530c5 100644 --- a/source/standalone/workflows/torchrl/play.py +++ b/source/standalone/workflows/torchrl/play.py @@ -3,7 +3,7 @@ # # SPDX-License-Identifier: BSD-3-Clause -"""Script to play a checkpoint if an RL agent from RSL-RL.""" +"""Script to play a checkpoint for an RL agent from TorchRL.""" """Launch Isaac Sim Simulator first.""" @@ -25,7 +25,7 @@ parser.add_argument("--task", type=str, default=None, help="Name of the task.") parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") -# append RSL-RL cli arguments +# append torchrl cli arguments cli_args.add_torchrl_args(parser) # append AppLauncher cli args AppLauncher.add_app_launcher_args(parser) diff --git a/source/standalone/workflows/torchrl/train.py b/source/standalone/workflows/torchrl/train.py index 76d2edc6f6..71c32ff58a 100644 --- a/source/standalone/workflows/torchrl/train.py +++ b/source/standalone/workflows/torchrl/train.py @@ -7,8 +7,6 @@ """Launch Isaac Sim Simulator first.""" -"""Launch Isaac Sim Simulator first.""" - import argparse import sys @@ -19,7 +17,7 @@ # add argparse arguments -parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL.") +parser = argparse.ArgumentParser(description="Train an RL agent with torchrl.") parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.") parser.add_argument("--video_length", type=int, default=200, help="Length of the recorded video (in steps).") parser.add_argument("--video_interval", type=int, default=2000, help="Interval between video recordings (in steps).") @@ -27,7 +25,7 @@ parser.add_argument("--task", type=str, default=None, help="Name of the task.") parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") parser.add_argument("--max_iterations", type=int, default=None, help="RL Policy training iterations.") -# append RSL-RL cli arguments +# append torchrl cli arguments cli_args.add_torchrl_args(parser) # append AppLauncher cli args AppLauncher.add_app_launcher_args(parser) @@ -67,7 +65,7 @@ @hydra_task_config(args_cli.task, "torchrl_cfg_entry_point") def main(env_cfg: ManagerBasedRLEnvCfg, agent_cfg: OnPolicyPPORunnerCfg): - """Train with RSL-RL agent.""" + """Train with torchrl agent.""" # override configurations with non-hydra CLI arguments agent_cfg = cli_args.update_torchrl_cfg(agent_cfg, args_cli) env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs From dbb2e560d5221f1d58215cc654e4bfca125d08a1 Mon Sep 17 00:00:00 2001 From: Fangzhou Yu Date: Mon, 7 Oct 2024 22:20:54 -0400 Subject: [PATCH 4/6] run formatter --- source/extensions/omni.isaac.lab/docs/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/extensions/omni.isaac.lab/docs/CHANGELOG.rst b/source/extensions/omni.isaac.lab/docs/CHANGELOG.rst index 16f5548101..b5b0b6eed4 100644 --- a/source/extensions/omni.isaac.lab/docs/CHANGELOG.rst +++ b/source/extensions/omni.isaac.lab/docs/CHANGELOG.rst @@ -7,7 +7,7 @@ Changelog Added ^^^^^ -* Added `play.py` and `train.py` scripts to support new torchrl workflow. +* Added ``play.py`` and ``train.py`` scripts to support new torchrl workflow. 0.24.19 (2024-10-05) From 2ce5b5bb5c39c42ff0bbd43225d49b51bf942f11 Mon Sep 17 00:00:00 2001 From: Fangzhou Yu Date: Tue, 8 Oct 2024 13:04:12 -0400 Subject: [PATCH 5/6] move ppo runners --- .../standalone/workflows/torchrl/cli_args.py | 2 +- source/standalone/workflows/torchrl/play.py | 4 +- .../torchrl/ppo/torchrl_ppo_runner.py | 304 ++++++++++++++++++ .../torchrl/ppo/torchrl_ppo_runner_cfg.py | 225 +++++++++++++ source/standalone/workflows/torchrl/train.py | 4 +- 5 files changed, 535 insertions(+), 4 deletions(-) create mode 100644 source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner.py create mode 100644 source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner_cfg.py diff --git a/source/standalone/workflows/torchrl/cli_args.py b/source/standalone/workflows/torchrl/cli_args.py index 5c9062eee1..f18cb9ca0d 100644 --- a/source/standalone/workflows/torchrl/cli_args.py +++ b/source/standalone/workflows/torchrl/cli_args.py @@ -12,7 +12,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from omni.isaac.lab_tasks.utils.wrappers.torchrl import OnPolicyPPORunnerCfg + from .ppo.torchrl_ppo_runner_cfg import OnPolicyPPORunnerCfg def add_torchrl_args(parser: argparse.ArgumentParser): diff --git a/source/standalone/workflows/torchrl/play.py b/source/standalone/workflows/torchrl/play.py index bcffe530c5..2b224c4cee 100644 --- a/source/standalone/workflows/torchrl/play.py +++ b/source/standalone/workflows/torchrl/play.py @@ -51,11 +51,11 @@ import omni.isaac.lab_tasks # noqa: F401 from omni.isaac.lab_tasks.utils import get_checkpoint_path, parse_env_cfg from omni.isaac.lab_tasks.utils.wrappers.torchrl import ( - OnPolicyPPORunner, - OnPolicyPPORunnerCfg, TorchRLEnvWrapper, export_policy_as_onnx, ) +from .ppo.torchrl_ppo_runner_cfg import OnPolicyPPORunnerCfg, +from .ppo.torchrl_ppo_runner import OnPolicyPPORunner, def main(): diff --git a/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner.py b/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner.py new file mode 100644 index 0000000000..1229e93984 --- /dev/null +++ b/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner.py @@ -0,0 +1,304 @@ +# Copyright (c) 2022-2024, The Isaac Lab Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +# Copyright 2021 ETH Zurich, NVIDIA CORPORATION +# SPDX-License-Identifier: BSD-3-Clause + +from __future__ import annotations + +import math +import os +import pathlib +import time +import torch +import yaml +from numbers import Real +from tensordict.nn import InteractionType, TensorDictModule +from torch import nn +from torch.distributions import Normal +from typing import TYPE_CHECKING + +import wandb +from torchrl.data.tensor_specs import CompositeSpec, UnboundedContinuousTensorSpec +from torchrl.envs.libs.gym import GymEnv +from torchrl.envs.utils import ExplorationType +from torchrl.modules import NormalParamExtractor, ProbabilisticActor, ValueOperator +from torchrl.objectives.value import GAE +from torchrl.record.loggers import TensorboardLogger + +from .torchrl_env_wrapper import ( + ClipPPOLossWrapper, + InfoDictReaderWrapper, + SyncDataCollectorWrapper, + TrainerWrapper, + WandbLoggerWrapper, +) + +if TYPE_CHECKING: + from torchrl.trainers import Trainer + + from .torchrl_ppo_runner_cfg import OnPolicyPPORunnerCfg + + +class NormalWrapper(Normal): + def __init__(self, loc, scale, max_scale: float | None = None, validate_args=None): + if max_scale is not None and max_scale >= 0: + scale = torch.clip(scale, min=0, max=max_scale) + elif max_scale is not None: + raise ValueError("Maximum scale must be greater than 0") + super().__init__(loc, scale, validate_args=validate_args) + + def log_prob(self, value): + if self._validate_args: + self._validate_sample(value) + # compute the variance + var = self.scale**2 + log_scale = math.log(self.scale) if isinstance(self.scale, Real) else self.scale.log() + log_prob = -((value - self.loc) ** 2) / (2 * var) - log_scale - math.log(math.sqrt(2 * math.pi)) + return torch.sum(log_prob, dim=-1) + + +class OnPolicyPPORunner: + """On-policy runner for training and evaluation.""" + + def __init__(self, env: GymEnv, train_cfg: OnPolicyPPORunnerCfg, log_dir=None, device="cpu"): + self.cfg = train_cfg + self.log_dir = log_dir + self.loss_module_cfg = train_cfg.loss_module + self.collector_module_cfg = train_cfg.collector_module + + self.actor_network_cfg = self.loss_module_cfg.actor_network + self.value_network_cfg = self.loss_module_cfg.value_network + self.device = device + self.env = env + self.num_envs = self.env.unwrapped.num_envs + + self.pre_optim_time_start = 0 + + self.saved_video_files = {} + + actor_network = self.actor_network_cfg.actor_network() + critic_network = self.value_network_cfg.critic_network() + actor_network.to(device=self.device) + actor_td = TensorDictModule( + nn.Sequential( + actor_network, + NormalParamExtractor(scale_mapping=f"biased_softplus_{self.actor_network_cfg.init_noise_std}"), + ), + in_keys=self.actor_network_cfg.in_keys, + out_keys=["loc", "scale"], + ) + self.actor_module = ProbabilisticActor( + spec=env.action_spec, + module=actor_td, + in_keys=["loc", "scale"], + distribution_class=NormalWrapper, + default_interaction_type=InteractionType.RANDOM, + return_log_prob=True, + ) + # create the critic module + critic_network.to(device=self.device) + self.value_module = ValueOperator( + module=critic_network, + in_keys=self.value_network_cfg.in_keys, + out_keys=self.value_network_cfg.out_keys, + ) + + def _create_trainer(self, eval_mode: bool = False) -> Trainer: + """Creates TorchRL trainer module""" + # register info dict for logging rewards from IsaacLab extras dict + keys = self.env.unwrapped.extras + info_spec = CompositeSpec( + {key: UnboundedContinuousTensorSpec([self.num_envs]) for key in keys}, shape=[self.num_envs] + ) + self.env.set_info_dict_reader(InfoDictReaderWrapper(spec=info_spec)) + + self.loss_module = ClipPPOLossWrapper( + actor_network=self.actor_module, + critic_network=self.value_module, + clip_epsilon=self.loss_module_cfg.clip_param, + loss_critic_type=self.loss_module_cfg.loss_critic_type, + desired_kl=self.loss_module_cfg.desired_kl, + entropy_bonus=self.loss_module_cfg.entropy_bonus, + entropy_coef=self.loss_module_cfg.entropy_coef, + critic_coef=self.loss_module_cfg.value_loss_coef, + normalize_advantage=False, + separate_losses=True, + clip_value=self.loss_module_cfg.clip_param, + device=self.device, + ) + self.advantage_module = GAE( + gamma=self.loss_module_cfg.gamma, + lmbda=self.loss_module_cfg.lam, + value_network=self.value_module, + vectorized=True, + average_gae=True, + ) + + total_frames = self.cfg.num_steps_per_env * self.num_envs * self.cfg.max_iterations + frames_per_batch = self.cfg.num_steps_per_env * self.num_envs + + self.collector = SyncDataCollectorWrapper( + create_env_fn=self.env, + policy=self.actor_module, + frames_per_batch=frames_per_batch, + total_frames=total_frames, + split_trajs=self.collector_module_cfg.split_trajs, + exploration_type=ExplorationType.RANDOM, + set_truncated=False, + device=self.device, + ) + + optimizer = torch.optim.Adam(self.loss_module.parameters(), lr=self.loss_module_cfg.learning_rate) + self.logger_module = None + if not eval_mode: + if self.cfg.logger == "wandb": + self.logger_module = WandbLoggerWrapper( + exp_name=self.cfg.experiment_name, + project=self.cfg.wandb_project, + save_dir=self.log_dir, + ) + self.logger_module.log_config(self.env.unwrapped.cfg) + elif self.cfg.logger == "tensorboard": + self.logger_module = TensorboardLogger(exp_name=self.cfg.experiment_name, log_dir=self.log_dir) + + policy_save_interval = self.cfg.save_trainer_interval * (frames_per_batch - 1) + + self.trainer_module = TrainerWrapper( + collector=self.collector, + loss_module=self.loss_module, + total_frames=total_frames, + frame_skip=1, + optimizer=optimizer, + clip_grad_norm=True, + clip_norm=self.loss_module_cfg.max_grad_norm, + logger=self.logger_module, + optim_steps_per_batch=self.cfg.num_epochs, + num_mini_batches=self.cfg.num_mini_batches, + save_trainer_interval=policy_save_interval, + log_interval=frames_per_batch, + lr_schedule=self.cfg.lr_schedule, + save_trainer_file=f"{self.log_dir}/model.pt", + ) + self.trainer_module.register_module(module_name="advantage_module", module=self.advantage_module) + self.trainer_module.register_op("batch_process", self.compute_advantages) + self.trainer_module.register_op("batch_process", self.bootstrap_reward) + + # register hooks for logging + self.trainer_module.register_op("pre_steps_log", self.log_info_dict) + self.trainer_module.register_op("pre_steps_log", self.log_collection_time) + self.trainer_module.register_op("pre_optim_steps", self.log_pre_optim_time) + self.trainer_module.register_op("post_optim_log", self.log_optim_time) + self.trainer_module.register_op("pre_steps_log", self.log_episode_stats) + + # upload video to wandb + if hasattr(self.env, "video_recorder") and self.cfg.logger == "wandb": + self.trainer_module.register_op("post_steps_log", self.upload_training_video, log_name="Video", fps=30) + + return self.trainer_module + + def learn(self, init_at_random_ep_len: bool = False): + trainer_module = self._create_trainer() + if init_at_random_ep_len: + self.env.unwrapped.episode_length_buf = torch.randint_like( + self.env.unwrapped.episode_length_buf, high=int(self.env.unwrapped.max_episode_length) + ) + + trainer_module.train() + + def load(self, path, eval_mode: bool = False): + loaded_dict = torch.load(path, weights_only=False) + trainer = self._create_trainer(eval_mode=eval_mode) + trainer.load_from_file(path) + return loaded_dict["state"] + + def save_cfg(self): + # Get the configuration from the environment + config = self.env.unwrapped.cfg + + # Construct the path to the config file + cfg_file_path = os.path.join(self.log_dir, "env_cfg.yaml") + + # Ensure the directory exists + os.makedirs(self.log_dir, exist_ok=True) + + # Write the config dictionary to the YAML file + with open(cfg_file_path, "w") as file: + yaml.dump(config, file, default_flow_style=False) + + wandb.save(cfg_file_path, base_path=os.path.dirname(cfg_file_path)) + + def bootstrap_reward(self, batch): + gamma = self.advantage_module.gamma + if batch["next"]["truncated"].any(): + batch["next"]["reward"] += gamma * batch["next"]["state_value"] * batch["next"]["truncated"] + + def compute_advantages(self, batch): + self.advantage_module(batch) + + def log_pre_optim_time( + self, + ): + self.pre_optim_time_start = time.perf_counter() + + def log_optim_time(self, batch): + optim_time_end = time.perf_counter() + optim_time = optim_time_end - self.pre_optim_time_start + collection_time = batch["rollout_time"][0, 0].item() + fps = int(self.cfg.num_steps_per_env * self.env.unwrapped.num_envs / (collection_time + optim_time)) + learning_rate = batch["learning_rate"].mean().item() + log_dict = {"Perf/learning_time": optim_time, "Perf/total_fps": fps, "Loss/learning_rate": learning_rate} + return log_dict + + def log_collection_time(self, batch): + log_dict = {} + val = batch["rollout_time"][0, 0].item() + log_dict["Perf/collection time"] = val + return log_dict + + def log_info_dict(self, batch): + log_dict = {} + entries = list(self.env.unwrapped.extras) + for entry in entries: + key = entry + "_data" + data = batch[key] + for k, v in data.items(): + log_dict[k] = v.float().mean().item() + return log_dict + + def log_episode_stats(self, batch): + log_dict = {} + mean_ep_len = batch["episode_length"].mean().item() + mean_reward = batch["episode_reward"].mean().item() + log_dict["Train/mean_reward"] = mean_reward + log_dict["Train/mean_episode_length"] = mean_ep_len + + return log_dict + + def upload_training_video(self, batch, log_name: str = "Video", fps: int = 30): + """ + Upload locally saved training .mp4 videos from VideoRecorder to WandB. + """ + log_dir = pathlib.Path(self.log_dir) + # exclude any files inside the wandb logs folder + video_files = [file for file in log_dir.rglob("*.mp4") if "wandb" not in file.parts] + for video_file in video_files: + file_path = str(video_file) + file_size_kb = os.stat(file_path).st_size / 1024 + mod_time = os.path.getmtime(file_path) + + if file_path not in self.saved_video_files: + self.saved_video_files[file_path] = {"mod_time": mod_time, "added": False} + else: + video_info = self.saved_video_files[file_path] + current_time = time.time() + + # Check if file hasn't been modified in the last 20 seconds and is larger than 100KB + if not video_info["added"] and (current_time - mod_time > 20) and file_size_kb > 100: + print(f"[Wandb] Uploading {os.path.basename(file_path)}.") + wandb.log({log_name: wandb.Video(file_path, fps=fps)}) + video_info["added"] = True + else: + video_info["mod_time"] = mod_time \ No newline at end of file diff --git a/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner_cfg.py b/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner_cfg.py new file mode 100644 index 0000000000..0b054c8dae --- /dev/null +++ b/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner_cfg.py @@ -0,0 +1,225 @@ +# Copyright (c) 2022-2024, The Isaac Lab Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +import torch +from dataclasses import MISSING +from typing import Literal + +from omni.isaac.lab.utils import configclass + + +@configclass +class DistributionCfg: + + distribution_class: torch.distributions.Distribution = MISSING + """A torch.distributions.Distribution class to be used for sampling. """ + + distribution_kwargs: dict | None = None + """Keyword arguments to be passed to the distribution. """ + + return_log_prob: bool = True + """If ``True``, the log-probability of the distribution sample will be written in the tensordict with the key + `'sample_log_prob'`. Default is ``True``.""" + + +@configclass +class ProbabilisticActorCfg: + """Configuration for the Actor network.""" + + class_name: str = "ProbabilisticActor" + """The actor class name. Default is ProbabilisticActor.""" + + actor_network: object = MISSING + """Actor network to use for value estimation""" + + in_keys: list[str] = ["policy"] + """Key(s) that will be read from the input TensorDict and used to build the distribution. Importantly, if it's an + iterable of string or a string, those keys must match the keywords used by the distribution class of interest, + e.g. :obj:`"loc"` and :obj:`"scale"` for the Normal distribution and similar. If in_keys is a dictionary, + the keys are the keys of the distribution and the values are the keys in the tensordict that will get match to the + corresponding distribution keys. + """ + + out_keys: list[str] = ["loc", "scale"] + """Keys where the sampled values will be written. + Importantly, if these keys are found in the input TensorDict, the sampling step will be skipped. + """ + + distribution: DistributionCfg = MISSING + """Distribution module Cfg used for sampling policy actions. """ + + init_noise_std: float = 1.0 + """The standard deviation of the Gaussian noise added to the policy actions during exploration. """ + + +@configclass +class ValueOperatorCfg: + """Configuration for the Critic network.""" + + critic_network: object = MISSING + """Critic network to use for value estimation""" + + in_keys: list[str] | None = ["policy"] + """Keys to be read from input tensordict and passed to the module. + If it contains more than one element, the values will be passed in the order given by the in_keys iterable. + Defaults to ``["policy"]``. + """ + + out_keys: list[str] | None = None + + """Keys to be written to the input tensordict. + The length of out_keys must match the + number of tensors returned by the embedded module. Using "_" as a + key avoid writing tensor to output. + Defaults to ``["state_value"]`` or ``["state_action_value"]`` if ``"action"`` is part of the ``in_keys``. + """ + + +@configclass +class ClipPPOLossCfg: + """Configuration for the TorchRL loss module. Defines policy model architecture and sets PPO parameters.""" + + class_name: str = "CLipPPOLoss" + """The loss module class name. Default is ClipPPOLoss.""" + + actor_network: ProbabilisticActorCfg = MISSING + """The model architecture configuration for the actor network.""" + + value_network: ValueOperatorCfg = MISSING + """The model architecture configuration for the critic network.""" + + value_key: str = "state_value" + """The input tensordict key where the state value is expected to be written. Defaults to ``"state_value"``.""" + + desired_kl: float = MISSING + """The target KL divergence.""" + + value_loss_coef: float = MISSING + """Critic loss multiplier when computing the total loss.""" + + clip_param: float = MISSING + """The PPO epsilon clipping parameter for the policy.""" + + entropy_coef: float = MISSING + """The coefficient for the entropy loss.""" + + entropy_bonus: bool = False + """If ``True``, an entropy bonus will be added to the loss to favour exploratory policies..""" + + loss_critic_type: Literal["l1", "l2", "smooth_l1"] = "l2" + """loss function for the value discrepancy. Can be one of "l1", "l2" or "smooth_l1".""" + + normalize_advantage: bool = False + """Normalize advantages by subtracting the mean and dividing by its std before computing loss. Defaults to False.""" + + learning_rate: float = MISSING + """The learning rate for the policy.""" + + gamma: float = MISSING + """The discount factor.""" + + lam: float = MISSING + """The lambda parameter for Generalized Advantage Estimation (GAE).""" + + max_grad_norm: float = MISSING + """value to be used for clipping gradients. .""" + + +@configclass +class CollectorCfg: + """Configuration for the PPO actor-critic networks.""" + + class_name: str = "SyncDataCollector" + """The collector class name. Default is SyncDataCollector.""" + + actor_network: ProbabilisticActorCfg = MISSING + """The model architecture configuration for the actor network.""" + + split_trajs: bool = False + + +@configclass +class OnPolicyPPORunnerCfg: + """Configuration of the PPO torchRL runner.""" + + loss_module: ClipPPOLossCfg = MISSING + """The loss module configuration.""" + + collector_module: CollectorCfg = MISSING + """The collector module configuration.""" + + seed: int = 42 + """The seed for the experiment. Default is 42.""" + + device: str = "cuda:0" + """The device for the rl-agent. Default is cuda:0.""" + + num_steps_per_env: int = MISSING + """The number of steps per environment per update.""" + + num_epochs: int = MISSING + """The number of model optimizations to do per batch of experiences.""" + + num_mini_batches: int = MISSING + """The number of mini-batches per update.""" + + lr_schedule: Literal["fixed", "adaptive"] = "adaptive" + """The learning rate schedule. "fixed" for no learning rate annealing, "adaptive" to use a kl-based scheduler.""" + + max_iterations: int = MISSING + """The maximum number of iterations.""" + + min_sub_traj_len: int = -1 + """Minimum value of :obj:`sub_traj_len`, in case some elements of the batch contain few steps. Default is -1 (i.e. no minimum value)""" + + ## + # Checkpointing parameters + ## + + save_interval: int = MISSING + """The number of iterations between saves.""" + + experiment_name: str = MISSING + """The experiment name.""" + + run_name: str = "" + """The run name. Default is empty string. + + The name of the run directory is typically the time-stamp at execution. If the run name is not empty, + then it is appended to the run directory's name, i.e. the logging directory's name will become + ``{time-stamp}_{run_name}``. + """ + + ## + # Logging parameters + ## + + logger: Literal["tensorboard", "wandb"] = "tensorboard" + """The logger to use. Default is tensorboard.""" + + wandb_project: str = "isaaclab" + """The wandb project name. Default is "isaaclab".""" + + save_trainer_interval: int = 100 + """"How often to save the current policy to disk, in number of optimization steps""" + + ## + # Loading parameters + ## + + resume: bool = False + """Whether to resume. Default is False.""" + + load_run: str = ".*" + """The run directory to load. Default is ".*" (all). + + If regex expression, the latest (alphabetical order) matching run will be loaded. + """ + + load_checkpoint: str = "model_.*.pt" + """The checkpoint file to load. Default is ``"model_.*.pt"`` (all). + + If regex expression, the latest (alphabetical order) matching file will be loaded. + """ \ No newline at end of file diff --git a/source/standalone/workflows/torchrl/train.py b/source/standalone/workflows/torchrl/train.py index 71c32ff58a..43e4e49fe9 100644 --- a/source/standalone/workflows/torchrl/train.py +++ b/source/standalone/workflows/torchrl/train.py @@ -55,7 +55,9 @@ from omni.isaac.lab_tasks.utils import get_checkpoint_path from omni.isaac.lab_tasks.utils.hydra import hydra_task_config -from omni.isaac.lab_tasks.utils.wrappers.torchrl import OnPolicyPPORunner, OnPolicyPPORunnerCfg, TorchRLEnvWrapper +from omni.isaac.lab_tasks.utils.wrappers.torchrl import TorchRLEnvWrapper +from .ppo.torchrl_ppo_runner_cfg import OnPolicyPPORunnerCfg, +from .ppo.torchrl_ppo_runner import OnPolicyPPORunner, torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True From be1000ab3c4f6b6e71e305257e9f54ae287e3073 Mon Sep 17 00:00:00 2001 From: Fangzhou Yu Date: Tue, 8 Oct 2024 13:22:50 -0400 Subject: [PATCH 6/6] Revert "move ppo runners" This reverts commit 2ce5b5bb5c39c42ff0bbd43225d49b51bf942f11. --- .../standalone/workflows/torchrl/cli_args.py | 2 +- source/standalone/workflows/torchrl/play.py | 4 +- .../torchrl/ppo/torchrl_ppo_runner.py | 304 ------------------ .../torchrl/ppo/torchrl_ppo_runner_cfg.py | 225 ------------- source/standalone/workflows/torchrl/train.py | 4 +- 5 files changed, 4 insertions(+), 535 deletions(-) delete mode 100644 source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner.py delete mode 100644 source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner_cfg.py diff --git a/source/standalone/workflows/torchrl/cli_args.py b/source/standalone/workflows/torchrl/cli_args.py index f18cb9ca0d..5c9062eee1 100644 --- a/source/standalone/workflows/torchrl/cli_args.py +++ b/source/standalone/workflows/torchrl/cli_args.py @@ -12,7 +12,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from .ppo.torchrl_ppo_runner_cfg import OnPolicyPPORunnerCfg + from omni.isaac.lab_tasks.utils.wrappers.torchrl import OnPolicyPPORunnerCfg def add_torchrl_args(parser: argparse.ArgumentParser): diff --git a/source/standalone/workflows/torchrl/play.py b/source/standalone/workflows/torchrl/play.py index 2b224c4cee..bcffe530c5 100644 --- a/source/standalone/workflows/torchrl/play.py +++ b/source/standalone/workflows/torchrl/play.py @@ -51,11 +51,11 @@ import omni.isaac.lab_tasks # noqa: F401 from omni.isaac.lab_tasks.utils import get_checkpoint_path, parse_env_cfg from omni.isaac.lab_tasks.utils.wrappers.torchrl import ( + OnPolicyPPORunner, + OnPolicyPPORunnerCfg, TorchRLEnvWrapper, export_policy_as_onnx, ) -from .ppo.torchrl_ppo_runner_cfg import OnPolicyPPORunnerCfg, -from .ppo.torchrl_ppo_runner import OnPolicyPPORunner, def main(): diff --git a/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner.py b/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner.py deleted file mode 100644 index 1229e93984..0000000000 --- a/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright (c) 2022-2024, The Isaac Lab Project Developers. -# All rights reserved. -# -# SPDX-License-Identifier: BSD-3-Clause - -# Copyright 2021 ETH Zurich, NVIDIA CORPORATION -# SPDX-License-Identifier: BSD-3-Clause - -from __future__ import annotations - -import math -import os -import pathlib -import time -import torch -import yaml -from numbers import Real -from tensordict.nn import InteractionType, TensorDictModule -from torch import nn -from torch.distributions import Normal -from typing import TYPE_CHECKING - -import wandb -from torchrl.data.tensor_specs import CompositeSpec, UnboundedContinuousTensorSpec -from torchrl.envs.libs.gym import GymEnv -from torchrl.envs.utils import ExplorationType -from torchrl.modules import NormalParamExtractor, ProbabilisticActor, ValueOperator -from torchrl.objectives.value import GAE -from torchrl.record.loggers import TensorboardLogger - -from .torchrl_env_wrapper import ( - ClipPPOLossWrapper, - InfoDictReaderWrapper, - SyncDataCollectorWrapper, - TrainerWrapper, - WandbLoggerWrapper, -) - -if TYPE_CHECKING: - from torchrl.trainers import Trainer - - from .torchrl_ppo_runner_cfg import OnPolicyPPORunnerCfg - - -class NormalWrapper(Normal): - def __init__(self, loc, scale, max_scale: float | None = None, validate_args=None): - if max_scale is not None and max_scale >= 0: - scale = torch.clip(scale, min=0, max=max_scale) - elif max_scale is not None: - raise ValueError("Maximum scale must be greater than 0") - super().__init__(loc, scale, validate_args=validate_args) - - def log_prob(self, value): - if self._validate_args: - self._validate_sample(value) - # compute the variance - var = self.scale**2 - log_scale = math.log(self.scale) if isinstance(self.scale, Real) else self.scale.log() - log_prob = -((value - self.loc) ** 2) / (2 * var) - log_scale - math.log(math.sqrt(2 * math.pi)) - return torch.sum(log_prob, dim=-1) - - -class OnPolicyPPORunner: - """On-policy runner for training and evaluation.""" - - def __init__(self, env: GymEnv, train_cfg: OnPolicyPPORunnerCfg, log_dir=None, device="cpu"): - self.cfg = train_cfg - self.log_dir = log_dir - self.loss_module_cfg = train_cfg.loss_module - self.collector_module_cfg = train_cfg.collector_module - - self.actor_network_cfg = self.loss_module_cfg.actor_network - self.value_network_cfg = self.loss_module_cfg.value_network - self.device = device - self.env = env - self.num_envs = self.env.unwrapped.num_envs - - self.pre_optim_time_start = 0 - - self.saved_video_files = {} - - actor_network = self.actor_network_cfg.actor_network() - critic_network = self.value_network_cfg.critic_network() - actor_network.to(device=self.device) - actor_td = TensorDictModule( - nn.Sequential( - actor_network, - NormalParamExtractor(scale_mapping=f"biased_softplus_{self.actor_network_cfg.init_noise_std}"), - ), - in_keys=self.actor_network_cfg.in_keys, - out_keys=["loc", "scale"], - ) - self.actor_module = ProbabilisticActor( - spec=env.action_spec, - module=actor_td, - in_keys=["loc", "scale"], - distribution_class=NormalWrapper, - default_interaction_type=InteractionType.RANDOM, - return_log_prob=True, - ) - # create the critic module - critic_network.to(device=self.device) - self.value_module = ValueOperator( - module=critic_network, - in_keys=self.value_network_cfg.in_keys, - out_keys=self.value_network_cfg.out_keys, - ) - - def _create_trainer(self, eval_mode: bool = False) -> Trainer: - """Creates TorchRL trainer module""" - # register info dict for logging rewards from IsaacLab extras dict - keys = self.env.unwrapped.extras - info_spec = CompositeSpec( - {key: UnboundedContinuousTensorSpec([self.num_envs]) for key in keys}, shape=[self.num_envs] - ) - self.env.set_info_dict_reader(InfoDictReaderWrapper(spec=info_spec)) - - self.loss_module = ClipPPOLossWrapper( - actor_network=self.actor_module, - critic_network=self.value_module, - clip_epsilon=self.loss_module_cfg.clip_param, - loss_critic_type=self.loss_module_cfg.loss_critic_type, - desired_kl=self.loss_module_cfg.desired_kl, - entropy_bonus=self.loss_module_cfg.entropy_bonus, - entropy_coef=self.loss_module_cfg.entropy_coef, - critic_coef=self.loss_module_cfg.value_loss_coef, - normalize_advantage=False, - separate_losses=True, - clip_value=self.loss_module_cfg.clip_param, - device=self.device, - ) - self.advantage_module = GAE( - gamma=self.loss_module_cfg.gamma, - lmbda=self.loss_module_cfg.lam, - value_network=self.value_module, - vectorized=True, - average_gae=True, - ) - - total_frames = self.cfg.num_steps_per_env * self.num_envs * self.cfg.max_iterations - frames_per_batch = self.cfg.num_steps_per_env * self.num_envs - - self.collector = SyncDataCollectorWrapper( - create_env_fn=self.env, - policy=self.actor_module, - frames_per_batch=frames_per_batch, - total_frames=total_frames, - split_trajs=self.collector_module_cfg.split_trajs, - exploration_type=ExplorationType.RANDOM, - set_truncated=False, - device=self.device, - ) - - optimizer = torch.optim.Adam(self.loss_module.parameters(), lr=self.loss_module_cfg.learning_rate) - self.logger_module = None - if not eval_mode: - if self.cfg.logger == "wandb": - self.logger_module = WandbLoggerWrapper( - exp_name=self.cfg.experiment_name, - project=self.cfg.wandb_project, - save_dir=self.log_dir, - ) - self.logger_module.log_config(self.env.unwrapped.cfg) - elif self.cfg.logger == "tensorboard": - self.logger_module = TensorboardLogger(exp_name=self.cfg.experiment_name, log_dir=self.log_dir) - - policy_save_interval = self.cfg.save_trainer_interval * (frames_per_batch - 1) - - self.trainer_module = TrainerWrapper( - collector=self.collector, - loss_module=self.loss_module, - total_frames=total_frames, - frame_skip=1, - optimizer=optimizer, - clip_grad_norm=True, - clip_norm=self.loss_module_cfg.max_grad_norm, - logger=self.logger_module, - optim_steps_per_batch=self.cfg.num_epochs, - num_mini_batches=self.cfg.num_mini_batches, - save_trainer_interval=policy_save_interval, - log_interval=frames_per_batch, - lr_schedule=self.cfg.lr_schedule, - save_trainer_file=f"{self.log_dir}/model.pt", - ) - self.trainer_module.register_module(module_name="advantage_module", module=self.advantage_module) - self.trainer_module.register_op("batch_process", self.compute_advantages) - self.trainer_module.register_op("batch_process", self.bootstrap_reward) - - # register hooks for logging - self.trainer_module.register_op("pre_steps_log", self.log_info_dict) - self.trainer_module.register_op("pre_steps_log", self.log_collection_time) - self.trainer_module.register_op("pre_optim_steps", self.log_pre_optim_time) - self.trainer_module.register_op("post_optim_log", self.log_optim_time) - self.trainer_module.register_op("pre_steps_log", self.log_episode_stats) - - # upload video to wandb - if hasattr(self.env, "video_recorder") and self.cfg.logger == "wandb": - self.trainer_module.register_op("post_steps_log", self.upload_training_video, log_name="Video", fps=30) - - return self.trainer_module - - def learn(self, init_at_random_ep_len: bool = False): - trainer_module = self._create_trainer() - if init_at_random_ep_len: - self.env.unwrapped.episode_length_buf = torch.randint_like( - self.env.unwrapped.episode_length_buf, high=int(self.env.unwrapped.max_episode_length) - ) - - trainer_module.train() - - def load(self, path, eval_mode: bool = False): - loaded_dict = torch.load(path, weights_only=False) - trainer = self._create_trainer(eval_mode=eval_mode) - trainer.load_from_file(path) - return loaded_dict["state"] - - def save_cfg(self): - # Get the configuration from the environment - config = self.env.unwrapped.cfg - - # Construct the path to the config file - cfg_file_path = os.path.join(self.log_dir, "env_cfg.yaml") - - # Ensure the directory exists - os.makedirs(self.log_dir, exist_ok=True) - - # Write the config dictionary to the YAML file - with open(cfg_file_path, "w") as file: - yaml.dump(config, file, default_flow_style=False) - - wandb.save(cfg_file_path, base_path=os.path.dirname(cfg_file_path)) - - def bootstrap_reward(self, batch): - gamma = self.advantage_module.gamma - if batch["next"]["truncated"].any(): - batch["next"]["reward"] += gamma * batch["next"]["state_value"] * batch["next"]["truncated"] - - def compute_advantages(self, batch): - self.advantage_module(batch) - - def log_pre_optim_time( - self, - ): - self.pre_optim_time_start = time.perf_counter() - - def log_optim_time(self, batch): - optim_time_end = time.perf_counter() - optim_time = optim_time_end - self.pre_optim_time_start - collection_time = batch["rollout_time"][0, 0].item() - fps = int(self.cfg.num_steps_per_env * self.env.unwrapped.num_envs / (collection_time + optim_time)) - learning_rate = batch["learning_rate"].mean().item() - log_dict = {"Perf/learning_time": optim_time, "Perf/total_fps": fps, "Loss/learning_rate": learning_rate} - return log_dict - - def log_collection_time(self, batch): - log_dict = {} - val = batch["rollout_time"][0, 0].item() - log_dict["Perf/collection time"] = val - return log_dict - - def log_info_dict(self, batch): - log_dict = {} - entries = list(self.env.unwrapped.extras) - for entry in entries: - key = entry + "_data" - data = batch[key] - for k, v in data.items(): - log_dict[k] = v.float().mean().item() - return log_dict - - def log_episode_stats(self, batch): - log_dict = {} - mean_ep_len = batch["episode_length"].mean().item() - mean_reward = batch["episode_reward"].mean().item() - log_dict["Train/mean_reward"] = mean_reward - log_dict["Train/mean_episode_length"] = mean_ep_len - - return log_dict - - def upload_training_video(self, batch, log_name: str = "Video", fps: int = 30): - """ - Upload locally saved training .mp4 videos from VideoRecorder to WandB. - """ - log_dir = pathlib.Path(self.log_dir) - # exclude any files inside the wandb logs folder - video_files = [file for file in log_dir.rglob("*.mp4") if "wandb" not in file.parts] - for video_file in video_files: - file_path = str(video_file) - file_size_kb = os.stat(file_path).st_size / 1024 - mod_time = os.path.getmtime(file_path) - - if file_path not in self.saved_video_files: - self.saved_video_files[file_path] = {"mod_time": mod_time, "added": False} - else: - video_info = self.saved_video_files[file_path] - current_time = time.time() - - # Check if file hasn't been modified in the last 20 seconds and is larger than 100KB - if not video_info["added"] and (current_time - mod_time > 20) and file_size_kb > 100: - print(f"[Wandb] Uploading {os.path.basename(file_path)}.") - wandb.log({log_name: wandb.Video(file_path, fps=fps)}) - video_info["added"] = True - else: - video_info["mod_time"] = mod_time \ No newline at end of file diff --git a/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner_cfg.py b/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner_cfg.py deleted file mode 100644 index 0b054c8dae..0000000000 --- a/source/standalone/workflows/torchrl/ppo/torchrl_ppo_runner_cfg.py +++ /dev/null @@ -1,225 +0,0 @@ -# Copyright (c) 2022-2024, The Isaac Lab Project Developers. -# All rights reserved. -# -# SPDX-License-Identifier: BSD-3-Clause - -import torch -from dataclasses import MISSING -from typing import Literal - -from omni.isaac.lab.utils import configclass - - -@configclass -class DistributionCfg: - - distribution_class: torch.distributions.Distribution = MISSING - """A torch.distributions.Distribution class to be used for sampling. """ - - distribution_kwargs: dict | None = None - """Keyword arguments to be passed to the distribution. """ - - return_log_prob: bool = True - """If ``True``, the log-probability of the distribution sample will be written in the tensordict with the key - `'sample_log_prob'`. Default is ``True``.""" - - -@configclass -class ProbabilisticActorCfg: - """Configuration for the Actor network.""" - - class_name: str = "ProbabilisticActor" - """The actor class name. Default is ProbabilisticActor.""" - - actor_network: object = MISSING - """Actor network to use for value estimation""" - - in_keys: list[str] = ["policy"] - """Key(s) that will be read from the input TensorDict and used to build the distribution. Importantly, if it's an - iterable of string or a string, those keys must match the keywords used by the distribution class of interest, - e.g. :obj:`"loc"` and :obj:`"scale"` for the Normal distribution and similar. If in_keys is a dictionary, - the keys are the keys of the distribution and the values are the keys in the tensordict that will get match to the - corresponding distribution keys. - """ - - out_keys: list[str] = ["loc", "scale"] - """Keys where the sampled values will be written. - Importantly, if these keys are found in the input TensorDict, the sampling step will be skipped. - """ - - distribution: DistributionCfg = MISSING - """Distribution module Cfg used for sampling policy actions. """ - - init_noise_std: float = 1.0 - """The standard deviation of the Gaussian noise added to the policy actions during exploration. """ - - -@configclass -class ValueOperatorCfg: - """Configuration for the Critic network.""" - - critic_network: object = MISSING - """Critic network to use for value estimation""" - - in_keys: list[str] | None = ["policy"] - """Keys to be read from input tensordict and passed to the module. - If it contains more than one element, the values will be passed in the order given by the in_keys iterable. - Defaults to ``["policy"]``. - """ - - out_keys: list[str] | None = None - - """Keys to be written to the input tensordict. - The length of out_keys must match the - number of tensors returned by the embedded module. Using "_" as a - key avoid writing tensor to output. - Defaults to ``["state_value"]`` or ``["state_action_value"]`` if ``"action"`` is part of the ``in_keys``. - """ - - -@configclass -class ClipPPOLossCfg: - """Configuration for the TorchRL loss module. Defines policy model architecture and sets PPO parameters.""" - - class_name: str = "CLipPPOLoss" - """The loss module class name. Default is ClipPPOLoss.""" - - actor_network: ProbabilisticActorCfg = MISSING - """The model architecture configuration for the actor network.""" - - value_network: ValueOperatorCfg = MISSING - """The model architecture configuration for the critic network.""" - - value_key: str = "state_value" - """The input tensordict key where the state value is expected to be written. Defaults to ``"state_value"``.""" - - desired_kl: float = MISSING - """The target KL divergence.""" - - value_loss_coef: float = MISSING - """Critic loss multiplier when computing the total loss.""" - - clip_param: float = MISSING - """The PPO epsilon clipping parameter for the policy.""" - - entropy_coef: float = MISSING - """The coefficient for the entropy loss.""" - - entropy_bonus: bool = False - """If ``True``, an entropy bonus will be added to the loss to favour exploratory policies..""" - - loss_critic_type: Literal["l1", "l2", "smooth_l1"] = "l2" - """loss function for the value discrepancy. Can be one of "l1", "l2" or "smooth_l1".""" - - normalize_advantage: bool = False - """Normalize advantages by subtracting the mean and dividing by its std before computing loss. Defaults to False.""" - - learning_rate: float = MISSING - """The learning rate for the policy.""" - - gamma: float = MISSING - """The discount factor.""" - - lam: float = MISSING - """The lambda parameter for Generalized Advantage Estimation (GAE).""" - - max_grad_norm: float = MISSING - """value to be used for clipping gradients. .""" - - -@configclass -class CollectorCfg: - """Configuration for the PPO actor-critic networks.""" - - class_name: str = "SyncDataCollector" - """The collector class name. Default is SyncDataCollector.""" - - actor_network: ProbabilisticActorCfg = MISSING - """The model architecture configuration for the actor network.""" - - split_trajs: bool = False - - -@configclass -class OnPolicyPPORunnerCfg: - """Configuration of the PPO torchRL runner.""" - - loss_module: ClipPPOLossCfg = MISSING - """The loss module configuration.""" - - collector_module: CollectorCfg = MISSING - """The collector module configuration.""" - - seed: int = 42 - """The seed for the experiment. Default is 42.""" - - device: str = "cuda:0" - """The device for the rl-agent. Default is cuda:0.""" - - num_steps_per_env: int = MISSING - """The number of steps per environment per update.""" - - num_epochs: int = MISSING - """The number of model optimizations to do per batch of experiences.""" - - num_mini_batches: int = MISSING - """The number of mini-batches per update.""" - - lr_schedule: Literal["fixed", "adaptive"] = "adaptive" - """The learning rate schedule. "fixed" for no learning rate annealing, "adaptive" to use a kl-based scheduler.""" - - max_iterations: int = MISSING - """The maximum number of iterations.""" - - min_sub_traj_len: int = -1 - """Minimum value of :obj:`sub_traj_len`, in case some elements of the batch contain few steps. Default is -1 (i.e. no minimum value)""" - - ## - # Checkpointing parameters - ## - - save_interval: int = MISSING - """The number of iterations between saves.""" - - experiment_name: str = MISSING - """The experiment name.""" - - run_name: str = "" - """The run name. Default is empty string. - - The name of the run directory is typically the time-stamp at execution. If the run name is not empty, - then it is appended to the run directory's name, i.e. the logging directory's name will become - ``{time-stamp}_{run_name}``. - """ - - ## - # Logging parameters - ## - - logger: Literal["tensorboard", "wandb"] = "tensorboard" - """The logger to use. Default is tensorboard.""" - - wandb_project: str = "isaaclab" - """The wandb project name. Default is "isaaclab".""" - - save_trainer_interval: int = 100 - """"How often to save the current policy to disk, in number of optimization steps""" - - ## - # Loading parameters - ## - - resume: bool = False - """Whether to resume. Default is False.""" - - load_run: str = ".*" - """The run directory to load. Default is ".*" (all). - - If regex expression, the latest (alphabetical order) matching run will be loaded. - """ - - load_checkpoint: str = "model_.*.pt" - """The checkpoint file to load. Default is ``"model_.*.pt"`` (all). - - If regex expression, the latest (alphabetical order) matching file will be loaded. - """ \ No newline at end of file diff --git a/source/standalone/workflows/torchrl/train.py b/source/standalone/workflows/torchrl/train.py index 43e4e49fe9..71c32ff58a 100644 --- a/source/standalone/workflows/torchrl/train.py +++ b/source/standalone/workflows/torchrl/train.py @@ -55,9 +55,7 @@ from omni.isaac.lab_tasks.utils import get_checkpoint_path from omni.isaac.lab_tasks.utils.hydra import hydra_task_config -from omni.isaac.lab_tasks.utils.wrappers.torchrl import TorchRLEnvWrapper -from .ppo.torchrl_ppo_runner_cfg import OnPolicyPPORunnerCfg, -from .ppo.torchrl_ppo_runner import OnPolicyPPORunner, +from omni.isaac.lab_tasks.utils.wrappers.torchrl import OnPolicyPPORunner, OnPolicyPPORunnerCfg, TorchRLEnvWrapper torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True