HSA Environment Modules

MuJoCo Base RL Class

Base Module for custom Reinforcement Learning environment built with MuJoCo and Gymnasium.

This module provides the core abstract class CustomMujocoEnv which handles MuJoCo simulation setup, rendering, action space definition, and utility methods for handling state, contact forces, and body positions. It is designed to be subclassed for specific robotic tasks.

class hsa_gym.envs.mujoco_env_constrained.CustomMujocoEnv(model_path: str, frame_skip: int, observation_space: Space | None = None, render_mode: str | None = None, width: int = 1920, height: int = 1920, camera_id: int | None = None, camera_name: str | None = None, default_camera_config: dict[str, float | int] | None = None, max_geom: int = 1000, visual_options: dict[int, bool] = {}, actuator_group: list[int] = [1], action_group: list[int] = [1], smooth_positions: bool = True, enable_terrain: bool = False, terrain_type: str = 'flat', goal_position: list[float] = [1.5, 0.0, 0.1], ensure_flat_spawn: bool = True)[source]

Custom MuJoCo Environment base class.

close() None[source]

Close rendering contexts and release associated resources.

Returns:

None

Return type:

None

do_simulation(scaled_action: ndarray[tuple[int, ...], dtype[float32]], frame_skip: int = 4, actuator_group: list[int] = [1]) None[source]

Advance the MuJoCo simulation using an unnormalized control input.

Parameters:
  • scaled_action (NDArray[np.float32]) – The scaled control input (increment) for the active actuators.

  • frame_skip (int) – Number of simulation frames to step..

  • actuator_group (list[int]) – List of actuator group IDs to apply control to.

Returns:

None

Return type:

None

property dt: float

Return the effective time step (duration) of one environment step.

Returns:

The effective simulation time step in seconds.

Return type:

float

get_body_com(body_name: str) ndarray[tuple[int, ...], dtype[float64]][source]

Get the Cartesian position (Center of Mass) of a specified body frame.

The position is returned in world coordinates and is accessed via the self.data.body accessor.

Parameters:

body_name (str) – Name of the body (e.g., ‘torso’) as defined in the XML model.

Returns:

The 3D Cartesian position $[x, y, z]$ of the body’s CoM in the world frame.

Return type:

NDArray[np.float64]

get_contact_force(geom_name: str = 'cylinder3a_con', max_normal_force: float = 30.0) float[source]

Calculate the excessive contact force experienced by a specified geometry.

Parameters:
  • geom_name (str) – Name of the MuJoCo geometry (geom) to check for contact force.

  • max_normal_force (float) – The maximum allowable contact force magnitude before it is considered “excessive”

Returns:

The excessive force magnitude, defined as $max(0.0, ||mathbf{F}|| - ext{max_normal_force})$.

Return type:

float

get_range_bounds(bound_group: list[int] = [1]) tuple[ndarray[tuple[int, ...], dtype[float32]], ndarray[tuple[int, ...], dtype[float32]]][source]

Extract the control range bounds from the MuJoCo model for specified actuator groups. It assumes a consistent grouping/indexing structure (8 actuators per group).

Parameters:

bound_group (list[int]) – List of actuator/action group to get bounds for

Returns:

A tuple containing the lower and upper bounds for the specified groups.

Return type:

tuple[NDArray[np.float32], NDArray[np.float32]]

get_reset_info() dict[str, float64][source]

Generate the initial info dictionary returned during a reset().

This helper method must be implemented by child environments to provide any task-specific information that is available immediately after the environment is reset (e.g., initial goal distance, initial velocity).

Returns:

A dictionary containing initial state information.

Return type:

dict[str, np.float64]

initialize_simulation(actuator_group: list[int] = [1]) tuple[MjModel, MjData][source]

Initialize the MuJoCo simulation model and data structures.

This method handles several core setup tasks:

  1. Loads the MuJoCo model from the internal path (self.fullpath).

  2. Configures the default off-screen rendering dimensions (width/height).

  3. Terrain Generation: Optionally generates and applies procedural terrain data to the model’s heightfield if self.enable_terrain is true.

  4. Actuator Enabling: Iterates over actuator_group to explicitly enable control for the specified groups by clearing the MuJoCo disable flag.

  5. Computes the initial forward dynamics via mujoco.mj_forward.

Parameters:

actuator_group (list[int]) – List of MuJoCo actuator group IDs to enable for control.

Returns:

A tuple containing the initialized MuJoCo model (MjModel) and data (MjData) objects.

Return type:

tuple[mujoco.MjModel, mujoco.MjData]

render() ndarray[tuple[int, ...], dtype[uint8]] | None[source]

Render a frame from the MuJoCo simulation as specified by the render mode.

Returns:

The rendered frame as a NumPy array (if render_mode='human' or render_mode='rgb_array') or None (if rendering is disabled).

Return type:

NDArray[np.uint8] or None

reset(*, seed: int | None = None, options: dict | None = None) tuple[ndarray[tuple[int, ...], dtype[float64]], dict][source]

Reset the environment to an initial state and return an initial observation. This method follows the standard Gymnasium reset signature.

Parameters:
  • seed (int or None) – Optional seed for the environment’s random number generator.

  • options (dict or None) – Optional dictionary of additional options for resetting the environment.

Returns:

A tuple containing the initial observation (state after reset) and an info dictionary.

Return type:

tuple[NDArray[np.float64], dict]

reset_model() ndarray[tuple[int, ...], dtype[float64]][source]

Set the task-specific initial state of the robot and return the initial observation.

Returns:

The initial observation state of the environment.

Return type:

NDArray[np.float64]

set_action_space(action_group: list[int] = [1], actuator_group: list[int] = [1]) Box[source]

Define the normalized Gymnasium action space and store unnormalized actuator bounds.

Parameters:
  • action_group (list[int]) – List of actuator group IDs whose control ranges define the dimensionality and limits of the unnormalized action space.

  • actuator_group (list[int]) – List of actuator group IDs that are enabled and whose control limits should be stored internally for simulation control.

Returns:

The defined Gymnasium action space object (normalized Box).

Return type:

spaces.Box

set_state(qpos: ndarray[tuple[int, ...], dtype[float64]], qvel: ndarray[tuple[int, ...], dtype[float64]])[source]

Set the joint positions (qpos) and joint velocities (qvel) of the MuJoCo model’s state.

Parameters:
  • qpos (NDArray[np.float64]) – The full position state vector (dimension self.model.nq).

  • qvel (NDArray[np.float64]) – The full velocity state vector (dimension self.model.nv).

Returns:

None

Return type:

None

state_vector() ndarray[tuple[int, ...], dtype[float64]][source]

Return the full state vector of the MuJoCo model.

Returns:

The concatenated position and velocity state vector.

Return type:

NDArray[np.float64]

step(action: ndarray[tuple[int, ...], dtype[float32]]) tuple[ndarray[tuple[int, ...], dtype[float64]], float64, bool, bool, dict[str, float64]][source]

Advance the environment by one timestep using the provided action.

Parameters:

action (NDArray[np.float32]) – The normalized action vector supplied by the agent, typically in the range $[-1.0, 1.0]$.

Returns:

A tuple containing the next observation, the step reward, a terminated flag, a truncated flag, and an info dictionary.

Return type:

tuple[NDArray[np.float64], float, bool, bool, dict[str, np.float64]]

step_mujoco_simulation(action: ndarray[tuple[int, ...], dtype[float32]], frame_skip: int = 4, actuator_group: list[int] = [1]) None[source]

Advance the MuJoCo simulation by frame_skip steps with control inputs.

Calculates the new control target (new_targets) by adding the input action (which is interpreted as an increment $Delta u$) to the current control target (self.data.ctrl). If self._smooth_positions is True, the control target is linearly interpolated over the frame_skip simulation steps.

Parameters:
  • action (NDArray[np.float32]) – The control input increment ($Delta u$) for the active actuators.

  • frame_skip (int) – Number of simulation steps (frames) to advance per call.

  • actuator_group (list[int]) – List of actuator group IDs whose control targets will be updated.

Returns:

None

Return type:

None

update_goal_marker(goal_position: list[float] = [1.5, 0.0, 0.1], marker_name: str = 'goal') None[source]

Update the Cartesian position of a specified MuJoCo site/marker in the model.

Parameters:
  • goal_position (list[float]) – The desired $[x, y, z]$ coordinates (in meters) for the marker.

  • marker_name (str) – The name of the MuJoCo site (marker) to update, as defined in the XML model.

Returns:

None

Return type:

None

hsa_gym.envs.mujoco_env_constrained.expand_model_path(model_path: str) str[source]

Expand the model path to a full path if it starts with ‘~’ or ‘.’ or ‘/’.

HSA Gym Environment and Utilities

HSAEnv Module: Custom MuJoCo Environment for Handed Shearing Auxetic (HSA) Robot Locomotion.

This module defines the HSAEnv class, a specialized child environment inherited from CustomMujocoEnv. It implements a 2D locomotion task where a robot composed of two blocks connected by HSA actuators must move towards a goal marker.

Key features implemented in this file include:

  • Reward Function: A complex dense reward structure incorporating forward progress,

    control smoothness, joint constraints, acceleration limits, and contact penalties.

  • Curriculum/Goals: Logic for generating goals for progressive

    task difficulty (when enabled).

  • Terrain Interaction: Logic for spawning the robot safely above procedurally

    generated terrain (heightfield) and managing vertical termination limits relative to the terrain bounds.

  • Observation Space: Construction of a detailed observation vector using

    actuated joint states, COM kinetics, and goal-relative vectors.

class hsa_gym.envs.hsa_constrained.HSAEnv(xml_file: str = 'hsaLooseModel.xml', frame_skip: int = 4, default_camera_config: dict[str, float | int] = {}, forward_reward_weight: float = 10.0, ctrl_cost_weight: float = 0.001, actuator_group: list[int] = [1], action_group: list[int] = [1], smooth_positions: bool = True, clip_actions: float = 1.0, contact_cost_weight: float = 0.0001, yvel_cost_weight: float = 1.0, constraint_cost_weight: float = 0.01, acc_cost_weight: float = 0.01, distance_reward_weight: float = 1.0, early_termination_penalty: float = 50.0, joint_vel_cost_weight: float = 0.001, stagnation_penalty_weight: float = 0.15, alive_bonus: float = 0.1, max_increment: float = 3.14, enable_terrain: bool = False, terrain_type: str = 'craters', goal_position: list[float] = [3.0, 1.0, 0.1], num_checkpoints: int = 20, checkpoint_reward: float = 40.0, checkpoint_radius: float = 0.4, start_radius: float = 0.5, end_radius: float = 4.0, num_turns: int = 2.0, max_episode_steps: int = 4000, ensure_flat_spawn: bool = True, **kwargs)[source]

HSA Environment Class for MuJoCo-based simulation. In this environment, a robot must learn to move towards a direction in the XY plane by coordinating its two blocks. The faster it moves, the more reward it gets. We are trying to train a locomotion policy here.

acceleration_cost() float[source]

Compute the cost based on high joint accelerations (changes in velocity). Acceleration is calculated using a finite difference approximation over the environment timestep

Returns:

The weighted acceleration cost.

Return type:

float

calculate_spiral_checkpoints() list[ndarray[tuple[int, ...], dtype[float64]]][source]

Calculate evenly spaced checkpoints along a clockwise spiral path.

It applies a critical starting angle offset (currently $pi$ radians) to align the path with terrain features like valleys.

Returns:

A list of 2D NumPy arrays, where each array is an $[x, y]$ world-coordinate position of a checkpoint.

Return type:

list[NDArray[np.float64]]

check_checkpoints() float[source]

Check if the robot has reached the current checkpoint, and advance the goal if so.

This function is only active when self.terrain_type_str is "spiral".

Returns:

The checkpoint bonus reward if a checkpoint was collected, otherwise $0.0$.

Return type:

float

check_termination(observation: ndarray[tuple[int, ...], dtype[float64]]) tuple[bool, list[str]][source]

Check for conditions that cause the episode to terminate prematurely.

Termination occurs if any dynamic state (qpos, qvel, qacc) contains NaNs/Infs, if the robot’s blocks exceed vertical $Z$-limits (relative to terrain if enabled), or if the maximum joint velocity (self.qvel_limit) or acceleration (self.qacc_limit) are surpassed.

Parameters:

observation (NDArray[np.float64]) – The current observation vector (checked for NaNs/Infs).

Returns:

A tuple containing a boolean flag (True if termination conditions are met) and a list of strings detailing the reason(s) for termination.

Return type:

tuple[bool, list[str]]

compute_COM() ndarray[tuple[int, ...], dtype[float64]][source]

Compute the projected Center of Mass (COM) position of the robot in the $XY$ plane.

Returns:

The 2D $XY$ position vector of the robot’s approximate COM.

Return type:

NDArray[np.float64]

compute_terrain_bounds() None[source]

Compute and store the actual world height bounds (min and max Z-coordinates) for the generated terrain.

This function must be called after the model has been initialized with the terrain data.

Returns:

None

Return type:

None

constraint_cost(diff_margin: float = 0.01, penalty_factor: float = 1.0, bonus_factor: float = 0.025) tuple[float, float][source]

Compute penalty and bonus for violating or satisfying angular difference constraints between paired joints.

This mechanism ensures the robot’s paired joints (e.g., 1A and 1C) maintain a coupled angular difference. * Penalty: Applied when the absolute angular difference $|A - C|$ exceeds a threshold, $pi - ext{diff_margin}$. The penalty grows quadratically with proximity to $pi$. * Bonus: A small bonus is awarded for each pair where the difference is maintained below the threshold.

Parameters:
  • diff_margin (float) – The angular margin before $pi$ where the quadratic penalty begins to apply.

  • penalty_factor (float) – Scaling factor for the quadratic penalty on constraint violation.

  • bonus_factor (float) – Constant bonus awarded per constraint pair that is satisfied (difference is small).

Returns:

A tuple containing the total constraint penalty (cost) and the total constraint satisfaction bonus.

Return type:

tuple[float, float]

contact_cost() float[source]

Compute the cost based on excessive contact forces experienced by the robot’s foot geometries.

Returns:

The weighted total contact cost.

Return type:

float

control_cost(action: ndarray[tuple[int, ...], dtype[float32]]) float[source]

Compute the control cost based on the change in the action vector (motor command smoothness).

This cost encourages smooth, low-frequency control signals.

Parameters:

action (NDArray[np.float32]) – The current normalized action vector supplied by the agent.

Returns:

The weighted control cost.

Return type:

float

distance_cost(goal_position: ndarray[tuple[int, ...], dtype[float64]]) float[source]

Compute the progress reward based on the change in distance to the goal.

Parameters:

goal_position (NDArray[np.float64]) – The 3D position vector ($[x, y, z]$) of the current goal target.

Returns:

The distance progress reward (scalar float).

Return type:

float

get_obs() ndarray[tuple[int, ...], dtype[float64]][source]

Generate the full observation vector for the agent.

The observation vector is a concatenation of local robot state and goal-relative information, adhering to the structure defined in self.observation_structure. The components, in order, are:

  1. Actuated joint positions ($qpos$).

  2. Actuated joint velocities ($qvel$).

  3. Robot center of mass (COM) position ($XY$ plane).

  4. Robot base velocity (average of block velocities, $XYZ$).

  5. Distance to goal (scalar).

  6. Unit vector to goal ($XY$).

Returns:

The concatenated observation vector.

Return type:

NDArray[np.float64]

get_reset_info() dict[str, float64][source]

Generate the initial info dictionary returned during a reset().

Returns:

A dictionary containing initial state and performance information.

Return type:

dict[str, np.float64]

get_reward(action: ndarray[tuple[int, ...], dtype[float32]], x_velocity: float = 0.0, y_velocity: float = 0.0) tuple[float, dict[str, float]][source]

Compute the total reward and detailed breakdown for the current step.

The total reward is calculated by summing various components:

  • Positive Components: Forward velocity (projection onto goal vector), Lateral velocity (cost, weighted positively here), Distance progress, and Constraint satisfaction bonus.

  • Negative Components (Costs): Control cost, contact cost, constraint violation cost, acceleration cost, joint velocity cost, and stagnation penalty.

The reward is constrained by caps placed on individual cost components.

Parameters:
  • action (NDArray[np.float32]) – The normalized action vector used in the current step.

  • x_velocity (float) – The average velocity of the robot’s COM along the X-axis.

  • y_velocity (float) – The average velocity of the robot’s COM along the Y-axis (lateral).

Returns:

A tuple containing the total scalar reward and a dictionary of all reward/cost components.

Return type:

tuple[float, dict[str, float]]

get_spawn_height(x, y) float[source]

Get the actual world Z-coordinate height of the terrain at the given $(x, y)$ world coordinates.

Parameters:
  • x (float) – World $X$-coordinate of the desired spawn location.

  • y (float) – World $Y$-coordinate of the desired spawn location.

Returns:

The terrain height in meters at the given $(x, y)$ position.

Return type:

float

joint_velocity_cost() float[source]

Compute the cost associated with high joint velocities. This penalizes excessive rotational speeds in the robot’s joints.

Returns:

The weighted joint velocity cost.

Return type:

float

reset_model() ndarray[tuple[int, ...], dtype[float64]][source]

Set the task-specific initial state (qpos, qvel) of the robot and return the initial observation. Clears step count, previous actions, joint velocities, and checkpoint tracking.

Returns:

The initial observation state of the environment.

Return type:

NDArray[np.float64]

scale_action(norm_action: ndarray[tuple[int, ...], dtype[float32]]) ndarray[tuple[int, ...], dtype[float32]][source]

Scale the normalized action vector to the desired control increment range.

The scaled action is then used by do_simulation() as the increment to the current actuator control targets.

Parameters:

norm_action (NDArray[np.float32]) – Normalized action vector in the range $[-1, 1]$.

Returns:

Scaled action vector, representing the desired control target increment.

Return type:

NDArray[np.float32]

set_curriculum_manager(curriculum_manager) None[source]

Set the curriculum manager instance for the environment.

The curriculum manager controls task difficulty, such as defining the sampling distribution for goal positions during reset_model().

Parameters:

curriculum_manager (object) – An instance of a curriculum manager class (e.g., CurriculumManager).

Returns:

None

Return type:

None

step(action: ndarray[tuple[int, ...], dtype[float32]]) tuple[ndarray[tuple[int, ...], dtype[float64]], float64, bool, bool, dict[str, float64]][source]

Advance the environment by one timestep, running the physics simulation and computing all metrics. This method implements the core Gymnasium step logic.

Parameters:

action (NDArray[np.float32]) – The normalized action vector (motor command) supplied by the agent.

Returns:

A tuple containing the next observation, the step reward, a terminated flag, a truncated flag, and an info dictionary detailing reward components and state.

Return type:

tuple[NDArray[np.float64], float, bool, bool, dict[str, np.float64]]

vec_to_goal() ndarray[tuple[int, ...], dtype[float64]][source]

Compute the 2D unit vector pointing from the robot’s Center of Mass (COM) to the current goal position.

Returns:

The 2D unit vector (length 1) to the goal position.

Return type:

NDArray[np.float64]

GoalCurriculumManager Module: Dynamic Goal Sampling for Reinforcement Learning.

This module defines the GoalCurriculumManager class, which implements an automated curriculum learning strategy for goal-based tasks. It dynamically adjusts the difficulty of the task by expanding or contracting the maximum distance goals are sampled from, based on the agent’s recent success rate.

The manager ensures goal positions respect minimum distance constraints (dead zone) and tracks performance metrics such as success rate and curriculum progress.

class hsa_gym.envs.curriculum_manager.GoalCurriculumManager(initial_range: tuple[float, float] = (1.5, 2.0), target_range: tuple[float, float] = (1.5, 4.5), success_threshold: float = 0.75, failure_threshold: float = 0.4, expansion_step: float = 0.3, window_size: int = 100, min_episodes_before_expand: int = 50, dead_zone_radius: float = 1.2)[source]

Manages the curriculum for goal positions in the environment based on agent performance.

The curriculum dynamically expands or contracts the maximum sampling distance for goals (radius) based on a rolling average of the agent’s recent success rate, ensuring the agent learns progressively harder tasks.

get_curriculum_info() dict[str, float][source]

Get current curriculum statistics.

Returns:

A dictionary containing the maximum goal distance, current success rate, episode counts, and curriculum progress relative to the target range.

Return type:

dict[str, float]

load(filepath: str) bool[source]

Load the curriculum manager state from a file.

The function verifies that key configuration parameters match the currently initialized manager to prevent loading incompatible states.

Parameters:

filepath (str) – The full path to the file containing the saved state.

Returns:

True if the state was loaded successfully, False otherwise.

Return type:

bool

record_episode(success: bool) None[source]

Record episode outcome and execute the curriculum logic to update the goal distance range.

Curriculum adjustment occurs only if the number of episodes since the last change exceeds self.min_episodes.

  • Expansion: If success rate $ge$ self.success_threshold, self.current_max_distance increases by self.expansion_step.

  • Contraction: If success rate $<$ self.failure_threshold, self.current_max_distance decreases by self.expansion_step.

Parameters:

success (bool) – Boolean indicating if the episode was successful (True) or not (False).

Returns:

None

Return type:

None

sample_goal_distance() float[source]

Samples a goal distance (radius) uniformly within the current curriculum range.

The sampling range is $[ ext{min_distance}, ext{current_max_distance}]$.

Returns:

A randomly sampled goal distance in meters.

Return type:

float

sample_goal_position() ndarray[tuple[int, ...], dtype[float64]][source]

Sample a random goal position $(x, y, z)$ within the current curriculum range, avoiding the dead zone.

The position is sampled by choosing a distance (radius) using sample_goal_distance() and a random angle, ensuring the distance from the origin $(0, 0)$ is always greater than self.dead_zone_radius.

Returns:

A 3D array $[ ext{x}, ext{y}, 0.1]$ representing the sampled goal position.

Return type:

NDArray[np.float64]

save(filepath: str) None[source]

Save the current state of the curriculum manager to a file using pickle.

The saved state includes the current maximum goal distance, success history, and episode counters, alongside the configuration parameters for verification.

Parameters:

filepath (str) – The full path to the file where the state should be saved.

Returns:

None

Return type:

None

Training Script

PPO Training Script for HSA Environment with Curriculum Learning.

This module provides the main entry point (main function) for training a Stable Baselines3 (SB3) PPO agent on the HSAEnv. It handles configuration loading, environment vectorization, observation/reward normalization, dynamic checkpoint resumption, and implementation of a custom goal-based curriculum manager integrated via Gym wrappers and callbacks.

class rl_scripts.ppo_train_position.CurriculumWrapper(env: Env, curriculum_manager: GoalCurriculumManager)[source]

Gym Wrapper to integrate the GoalCurriculumManager logic into the environment’s step and reset lifecycle.

This wrapper ensures the environment’s internal goal sampling uses the curriculum manager’s state and records episode outcomes to trigger curriculum expansion/contraction.

Parameters:
reset(**kwargs)[source]

Resets the environment.

step(action)[source]

Steps the environment and updates the curriculum manager upon episode termination.

class rl_scripts.ppo_train_position.RewardComponentLogger(verbose: int = 0)[source]

Custom Stable Baselines3 callback to log individual reward and cost components (e.g., forward reward, control cost) contained within the environment’s info dictionary at every step.

It also logs curriculum progress and checkpoint status.

Parameters:

verbose (int) – Verbosity level (0 for silent, 1 for info).

class rl_scripts.ppo_train_position.SaveCurriculumCallback(curriculum_manager: GoalCurriculumManager | None, save_freq: int, save_path: str, name_prefix: str = 'curriculum', verbose: int = 0)[source]

Custom Stable Baselines3 callback to save the state of the GoalCurriculumManager alongside model checkpoints, allowing for seamless curriculum resumption.

Parameters:
  • curriculum_manager (GoalCurriculumManager or None) – The curriculum manager instance, or None if curriculum is disabled.

  • save_freq (int) – How often (in environment steps) to save the file.

  • save_path (str) – Directory where the file should be saved.

  • name_prefix (str) – Prefix for the saved filename.

  • verbose (int) – Verbosity level.

class rl_scripts.ppo_train_position.SaveVecNormalizeCallback(save_freq: int, save_path: str, name_prefix: str = 'vec_normalize', verbose: int = 0)[source]

Custom Stable Baselines3 callback to save the VecNormalize observation/reward statistics file alongside model checkpoints.

Parameters:
  • save_freq (int) – How often (in environment steps, counts across all environments) to save the file.

  • save_path (str) – Directory where the file should be saved.

  • name_prefix (str) – Prefix for the saved filename (default: ‘vec_normalize’).

  • verbose (int) – Verbosity level.

rl_scripts.ppo_train_position.extract_step_number(checkpoint_path: str) int[source]

Extract the step number (timestep count) from a checkpoint filename.

The step number is assumed to be the largest sequence of digits in the filename.

Parameters:

checkpoint_path (str) – The full file path of the checkpoint or VecNormalize file.

Returns:

The extracted total timestep count.

Return type:

int

Raises:

ValueError – If no step number could be extracted from the filename.

rl_scripts.ppo_train_position.get_latest_checkpoint(checkpoint_dir: str = 'checkpoints/') str | None[source]

Get the path to the latest PPO model checkpoint file from a directory.

It filters out ‘final’ models and uses the extracted step number for sorting.

Parameters:

checkpoint_dir (str) – Directory containing model checkpoints.

Returns:

The file path of the latest checkpoint, or None if no valid checkpoints are found.

Return type:

str or None

rl_scripts.ppo_train_position.get_latest_curriculum(checkpoint_dir: str = 'checkpoints/') str | None[source]

Get the path to the latest saved curriculum state file.

Parameters:

checkpoint_dir (str) – Directory containing curriculum state files.

Returns:

The file path of the latest curriculum state file, or None if none are found.

Return type:

str or None

rl_scripts.ppo_train_position.get_latest_vecnormalize(checkpoint_dir: str = 'checkpoints/') str | None[source]

Get the path to the latest VecNormalize statistics file from a directory.

Parameters:

checkpoint_dir (str) – Directory containing VecNormalize files.

Returns:

The file path of the latest VecNormalize file, or None if none are found.

Return type:

str or None

rl_scripts.ppo_train_position.load_config(config_path: str = './configs/ppo_position.yaml') dict[source]

Load training configuration parameters from a YAML file.

Parameters:

config_path (str) – Path to the YAML configuration file.

Returns:

A dictionary containing the parsed configuration settings.

Return type:

dict

rl_scripts.ppo_train_position.main()[source]

The main function to set up, resume, and run the Stable Baselines3 PPO training loop.

rl_scripts.ppo_train_position.make_curriculum_env(env_kwargs: dict, curriculum_manager: GoalCurriculumManager, max_episode_steps: int)[source]

Factory function to create a single environment instance wrapped with both TimeLimit and the CurriculumWrapper.

Parameters:
  • env_kwargs (dict) – Dictionary of keyword arguments for initializing HSAEnv.

  • curriculum_manager (GoalCurriculumManager) – The pre-initialized curriculum manager instance.

  • max_episode_steps (int) – The maximum number of steps before environment truncation.

Returns:

A function (_init) that returns the wrapped environment instance.

Return type:

callable

Evaluation Script

PPO Model Evaluation and Feasibility Analysis Script.

This module provides tools for loading a trained Stable Baselines3 (SB3) PPO model and evaluating its performance in the HSAEnv. The core function, analyze_actions, collects detailed time-series data on actions, joint positions, velocities, and constraint violations across multiple evaluation episodes.

The results are saved as image files (plots) to diagnose control feasibility, action smoothness, and adherence to physical limits.

rl_scripts.action_eval.analyze_actions(checkpoint_dir: str, model_path: str, num_episodes: int = 5) None[source]

Core function to evaluate a trained model and analyze its kinematic and control outputs.

This function runs the model over several episodes and collects time-series data for: * Action distribution and smoothness (change). * Actuated joint positions and velocities. * Paired joint constraint difference.

It then generates and saves several diagnostic plots and prints summary statistics.

Parameters:
  • checkpoint_dir (str) – Directory containing the model and configuration files.

  • model_path (str) – Path to the specific PPO model checkpoint (.zip) to load.

  • num_episodes (int) – Number of episodes to run for data collection.

Returns:

None

Return type:

None

rl_scripts.action_eval.extract_step_number(filepath: str) int[source]

Extract the step number (timestep count) from a checkpoint or stat filename.

The step number is assumed to be the largest sequence of digits found in the filename.

Parameters:

filepath (str) – The full path of the checkpoint or stat file.

Returns:

The extracted total timestep count, or 0 if no numbers are found.

Return type:

int

rl_scripts.action_eval.find_matching_vecnormalize(checkpoint_dir: str, model_path: str) str | None[source]

Find the VecNormalize statistics file that corresponds to the loaded model checkpoint.

The search attempts to find: 1. An exact match based on the model’s step number. 2. A generic ‘vec_normalize_final.pkl’ file. 3. The latest available VecNormalize file in the directory.

Parameters:
  • checkpoint_dir (str) – Directory containing checkpoints and VecNormalize stats.

  • model_path (str) – Path to the model checkpoint file being loaded.

Returns:

Path to the matching VecNormalize file, or None if not found.

Return type:

str or None

rl_scripts.action_eval.load_config(checkpoint_dir: str) dict[source]

Load the environment configuration used for the training run from the archived YAML file.

Parameters:

checkpoint_dir (str) – Directory where the used_config.yaml file is stored.

Returns:

A dictionary containing the environment configuration.

Return type:

dict

rl_scripts.action_eval.make_env(config: dict, render_mode: str = 'human') Wrapper[source]

Instantiate and wrap a single HSAEnv environment based on configuration parameters.

The environment is wrapped with gymnasium.wrappers.TimeLimit.

Parameters:
  • config (dict) – The configuration dictionary loaded from used_config.yaml.

  • render_mode (str) – The rendering mode for the environment (“human” or “rgb_array”).

Returns:

The wrapped environment instance ready for evaluation.

Return type:

gymnasium.Wrapper

Terrain Generation Utilities

Procedural Terrain Generation Utilities for MuJoCo Heightfields.

This module provides various functions to generate 2D NumPy arrays representing heightfields (terrains) for use in MuJoCo simulations. The array values correspond to height, typically normalized or scaled relative to the maximum height defined in the MuJoCo XML model.

Functions include generation methods for: * Flat surfaces, inclines, and ramps. * Stochastic surfaces like Perlin noise and craters. * Structured environments like spiral tracks and corridors.

The main entry point is generate_terrain, which dispatches to the specific generation function based on the requested type.

hsa_gym.utils.terrain_generators.ensure_flat_spawn_zone(terrain: ndarray[tuple[int, ...], dtype[float64]], spawn_center_x: float = 0.5, spawn_center_y: float = 0.5, spawn_radius: float = 0.15) ndarray[tuple[int, ...], dtype[float64]][source]

Ensure a flat area in the terrain for the robot’s initial spawn location.

The function sets the height to 0.0 within a specified circular region and applies a smooth, linear blend transition zone around the perimeter to connect the flat area with the surrounding terrain.

Parameters:
  • terrain (NDArray[np.float64]) – The input heightmap array.

  • spawn_center_x (float) – Center of the flat zone as a fraction (0.0 to 1.0) of the terrain width.

  • spawn_center_y (float) – Center of the flat zone as a fraction (0.0 to 1.0) of the terrain height.

  • spawn_radius (float) – Radius of the flat zone as a fraction of the terrain size.

Returns:

The modified heightmap array with the flat spawn zone.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_corridor_terrain(width: int, height: int, corridor_width: float = 0.85, corridor_axis: str = 'y', corridor_center: float = 0.5, wall_height: float = 0.5) ndarray[tuple[int, ...], dtype[float64]][source]

Generate flat terrain with thick walls creating a narrow corridor along a specified axis.

The method estimates the number of cells needed to match the requested corridor_width based on a fixed WORLD_SIZE of 10.0 meters.

Parameters:
  • width (int) – Terrain grid width.

  • height (int) – Terrain grid height.

  • corridor_width (float) – Desired width of the corridor in meters (e.g., 0.85m).

  • corridor_axis (str) – Axis along which the corridor extends ('x' or 'y'). The walls run perpendicular to this axis.

  • corridor_center (float) – Center of the corridor as a fraction (0-1) of the terrain size across the axis perpendicular to the corridor.

  • wall_height (float) – Height of the walls.

Returns:

The heightmap array with the corridor structure.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_crater_terrain(width: int, height: int, num_craters: int = 20, crater_radius_range: tuple[int, int] = (2, 5), crater_depth_range: tuple[float, float] = (0.05, 0.15)) ndarray[tuple[int, ...], dtype[float64]][source]

Generate terrain with random craters (small, Gaussian-shaped depressions).

Parameters:
  • width (int) – Terrain grid width.

  • height (int) – Terrain grid height.

  • num_craters (int) – Number of craters to generate.

  • crater_radius_range (tuple[int, int]) – Tuple of (min_radius, max_radius) in grid units.

  • crater_depth_range (tuple[float, float]) – Tuple of (min_depth, max_depth) in height units.

Returns:

The heightmap array with craters.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_flat_terrain(width: int, height: int) ndarray[tuple[int, ...], dtype[float64]][source]

Generate a completely flat terrain heightmap at zero height.

Parameters:
  • width (int) – Terrain grid width (number of cells).

  • height (int) – Terrain grid height (number of cells).

Returns:

The heightmap array (all zeros).

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_flat_with_incline(width: int, height: int, incline_start_x: float = 0.3, incline_end_x: float = 0.7, incline_start_y: float = 0.3, incline_end_y: float = 0.7, angle: float = 15, direction: str = 'x') ndarray[tuple[int, ...], dtype[float64]][source]

Generate flat terrain with a localized incline or ramp created within a specified rectangular region.

Parameters:
  • width (int) – Terrain grid width.

  • height (int) – Terrain grid height.

  • incline_start_x (float) – Starting X position of the incline region as a fraction (0-1).

  • incline_end_x (float) – Ending X position of the incline region as a fraction (0-1).

  • incline_start_y (float) – Starting Y position of the incline region as a fraction (0-1).

  • incline_end_y (float) – Ending Y position of the incline region as a fraction (0-1).

  • angle (float) – Slope angle in degrees.

  • direction (str) – Direction of the slope within the region: 'x', 'y', or 'diagonal'.

Returns:

The heightmap array with the ramp feature.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_flat_with_ramp(width: int, height: int, ramp_center_x: float = 0.5, ramp_center_y: float = 0.5, ramp_length: float = 0.3, ramp_width: float = 0.3, ramp_height: float = 0.2, direction: str = 'x', smooth_edges: bool = True) ndarray[tuple[int, ...], dtype[float64]][source]

Generate flat terrain with a smooth, localized ramp feature.

Parameters:
  • width (int) – Terrain grid width.

  • height (int) – Terrain grid height.

  • ramp_center_x (float) – Center X position of the ramp as a fraction (0-1).

  • ramp_center_y (float) – Center Y position of the ramp as a fraction (0-1).

  • ramp_length (float) – Length of the ramp (distance over which height changes) as a fraction of terrain size.

  • ramp_width (float) – Width of the ramp as a fraction of terrain size.

  • ramp_height (float) – Maximum height of the ramp.

  • direction (str) – Direction of the slope: 'x', '-x', 'y', or '-y'.

  • smooth_edges (bool) – If True, applies a smooth falloff along the ramp’s width.

Returns:

The heightmap array with the localized ramp.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_incline_terrain(width: int, height: int, angle: float = 10, direction: str = 'x') ndarray[tuple[int, ...], dtype[float64]][source]

Generate a full-width terrain with a uniform slope across the entire grid.

Parameters:
  • width (int) – Terrain grid width.

  • height (int) – Terrain grid height.

  • angle (float) – Slope angle in degrees.

  • direction (str) – Direction of the slope: 'x', 'y', or 'diagonal'.

Returns:

The heightmap array with the incline.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_perlin_noise(width: int, height: int, scale: float = 0.1) ndarray[tuple[int, ...], dtype[float64]][source]

Generate a terrain heightmap using 2D Perlin noise for smooth, fractal features.

Parameters:
  • width (int) – Terrain grid width (number of cells).

  • height (int) – Terrain grid height (number of cells).

  • scale (float) – Controls the density and scale of the noise features.

Returns:

The heightmap array.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_poles_terrain(width: int, height: int, num_poles: int = 8, pole_radius_range: tuple[int, int] = (3, 6), pole_height_range: tuple[float, float] = (0.3, 0.6), min_spacing: int = 8) ndarray[tuple[int, ...], dtype[float64]][source]

Generate terrain with large cylindrical poles/pillars placed randomly with minimum spacing.

Parameters:
  • width (int) – Terrain grid width.

  • height (int) – Terrain grid height.

  • num_poles (int) – Number of poles to place.

  • pole_radius_range (tuple[int, int]) – Tuple of (min_radius, max_radius) in grid units.

  • pole_height_range (tuple[float, float]) – Tuple of (min_height, max_height) in height units.

  • min_spacing (int) – Minimum required distance (in grid cells) between pole centers.

Returns:

The heightmap array with poles.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_spiral_track(width: int, height: int, start_radius: float = 0.5, end_radius: float = 4.0, num_turns: float = 2.0, track_width: float = 0.8, wall_height: float = 0.5) ndarray[tuple[int, ...], dtype[float64]][source]

Generate a spiral track starting from the origin (center of the grid) and spiraling outward to the edge.

The path is carved out of high walls, leaving a flat, navigable track based on an Archimedean spiral ($r = a + b heta$).

Parameters:
  • width (int) – Grid width.

  • height (int) – Grid height.

  • start_radius (float) – Starting radius of the spiral in meters (inner radius).

  • end_radius (float) – Ending radius of the spiral in meters (outer edge).

  • num_turns (float) – Number of complete $360^circ$ rotations the spiral makes.

  • track_width (float) – Width of the flat track region in meters.

  • wall_height (float) – Height of the walls surrounding the track.

Returns:

The heightmap array with the spiral track structure.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_stairs_terrain(width: int, height: int, step_height: float = 0.02, pixels_per_step: int = 5) ndarray[tuple[int, ...], dtype[float64]][source]

Generate a terrain consisting of a straight staircase along the X-axis.

Parameters:
  • width (int) – Terrain grid width (number of cells).

  • height (int) – Terrain grid height (number of cells).

  • step_height (float) – The change in height for each step/tread.

  • pixels_per_step (int) – The number of grid cells (pixels) defining the length of each step.

Returns:

The heightmap array.

Return type:

NDArray[np.float64]

hsa_gym.utils.terrain_generators.generate_terrain(terrain_type: str, width: int = 50, height: int = 50, ensure_flat_spawn: bool = True, **kwargs) ndarray[tuple[int, ...], dtype[float64]][source]

Main entry function to dispatch terrain generation based on the type requested.

This function calls the appropriate generation helper and applies the ensure_flat_spawn_zone utility if required.

Parameters:
  • terrain_type (str) – Specifies the type of terrain to generate: 'wavy', 'perlin', 'stairs', 'flat', 'incline', 'craters', 'poles', 'flat_with_incline', 'flat_with_ramp', 'corridor', or 'spiral'.

  • width (int) – Terrain grid width (number of cells).

  • height (int) – Terrain grid height (number of cells).

  • ensure_flat_spawn (bool) – If True, calls ensure_flat_spawn_zone() to flatten the center area.

  • kwargs (dict) – Additional keyword arguments passed to the specific terrain generation function (e.g., angle, num_craters).

Returns:

The generated heightmap array.

Return type:

NDArray[np.float64]

Raises:

ValueError – If an unknown terrain_type is provided.

hsa_gym.utils.terrain_generators.generate_wavy_field(width: int, height: int, scale: float = 0.5, amp: float = 0.5) ndarray[tuple[int, ...], dtype[float64]][source]

Generate a terrain heightmap based on a 2D sine-cosine wave pattern.

Parameters:
  • width (int) – Terrain grid width (number of cells).

  • height (int) – Terrain grid height (number of cells).

  • scale (float) – Controls the frequency/wavelength of the wave pattern.

  • amp (float) – Controls the amplitude (maximum height/depth) of the waves.

Returns:

The heightmap array.

Return type:

NDArray[np.float64]