protomotions.agents.evaluators package#

class protomotions.agents.evaluators.MimicEvaluator(agent, fabric, config)[source]#

Bases: BaseEvaluator

Evaluator for Mimic agent’s motion tracking performance.

__init__(agent, fabric, config)[source]#

Initialize the evaluator.

Parameters:
  • agent (Any) – The agent to evaluate

  • fabric (Any) – Lightning Fabric instance for distributed training

cleanup_after_evaluation()[source]#

Restore env and motion manager state after evaluation.

evaluate_episode(env_ids, max_steps)[source]#

Run a single episode batch, optionally with EMA action smoothing.

When eval_action_ema_alpha is set, actions are low-pass filtered to simulate deployment conditions. Motions that fail under EMA get higher sampling weight, creating curriculum pressure toward smooth policies.

initialize_eval()[source]#

Initialize evaluation tracking and cache env state for restoration.

property motion_lib: MotionLib#

Motion library (from agent).

property motion_manager: MimicMotionManager#

Motion manager (from env).

process_eval_results()[source]#

Process results and update motion sampling weights.

run_evaluation()[source]#

Run evaluation across multiple motions.

class protomotions.agents.evaluators.SmoothnessCalculator(
device,
dt,
window_sec=0.4,
high_jerk_threshold=6500.0,
)[source]#

Bases: object

Calculator for motion smoothness metrics like normalized jerk.

This class computes smoothness metrics from collected motion data, particularly using rigid body positions to derive velocity via finite differences and then computing normalized jerk.

__init__(
device,
dt,
window_sec=0.4,
high_jerk_threshold=6500.0,
)[source]#

Initialize the smoothness evaluator.

Parameters:
  • device (<Mock object at 0x7faa73349990>[]) – Device to perform computations on

  • dt (float) – Time step duration in seconds (must be explicitly provided)

  • window_sec (float) – Default window size in seconds for rolling window computation

  • high_jerk_threshold (float) – Threshold for classifying windows as having high jerk

compute_normalized_jerk_from_pos(
rigid_body_pos_metric,
num_bodies,
window_sec=0.4,
eps=0.1,
)[source]#

Compute normalized jerk from rigid body position data using sliding windows.

Similar to motion_visualizer_smoothness.py, computes normalized jerk over rolling windows rather than the entire motion sequence.

The normalized jerk is computed as: NJ = (T^5 * ∫|jerk|^2 dt) / (path_length^2)

Using T^5 makes the metric dimensionless and FPS-invariant, allowing fair comparison across motions sampled at different frame rates.

Parameters:
  • rigid_body_pos_metric (MotionMetrics) – MotionMetrics containing rigid body positions Shape: [num_motions, max_frames, num_bodies*3]

  • num_bodies (int) – Number of rigid bodies

  • window_sec (float) – Window size in seconds for rolling window computation

  • eps (float) – Small epsilon for numerical stability

Returns:

Mean normalized jerk per motion [num_motions] per_body_per_motion_nj: Mean normalized jerk per body per motion [num_motions, num_bodies] windowed_nj_per_motion: List of windowed NJ tensors per motion [num_windows, num_bodies]

Return type:

per_motion_nj

compute_smoothness_metrics(
metrics,
num_bodies,
window_sec=None,
)[source]#

Compute smoothness metrics from collected motion data using sliding windows.

Parameters:
  • metrics (Dict[str, MotionMetrics]) – Dictionary of collected MotionMetrics

  • num_bodies (int) – Number of rigid bodies in the robot

  • window_sec (float | None) – Window size in seconds (uses default if None)

Returns:

Dictionary of smoothness metrics for logging

Return type:

Dict[str, float]

class protomotions.agents.evaluators.MotionMetrics(
num_motions,
motion_lens,
max_motion_len,
num_sub_features=1,
device=None,
dtype=<Mock object>,
)[source]#

Bases: object

Store and compute metrics for motion data.

Stores raw data in the shape [num_motions, max_motion_len, num_sub_features] and supports basic reduction operations for computing final metrics.

__init__(
num_motions,
motion_lens,
max_motion_len,
num_sub_features=1,
device=None,
dtype=<Mock object>,
)[source]#

Initialize the metrics tracker.

Parameters:
  • num_motions (int) – Number of motions to track

  • motion_lens (MockTensor) – Number of frames of each motion sequence

  • max_motion_len (int) – conservative max number of frames allocated for data storage for shape consistency across different GPUs when aggregating

  • num_sub_features (int) – Number of sub-features per data point (default: 1)

  • device (<Mock object at 0x7faa73222f50>[]) – Device to store the tensors on

  • dtype (<Mock object at 0x7faa73221250>[]) – Data type for the tensors

compute_finite_difference_jitter_reduce_each_motion(
num_bodies,
aggregate_method='mean',
order=2,
field_description='data',
)[source]#

Generic method to compute jitter using finite differences of specified order. Output is padded to match input length (padded with zeros at the beginning).

Parameters:
  • num_bodies (int) – Number of rigid bodies (to reshape the flattened data)

  • aggregate_method (str) – How to aggregate across bodies (“mean”, “max”, “sum”)

  • order (int) – Order of finite differences (1 for velocity-like, 2 for acceleration-like)

  • field_description (str) – Description of the field for error messages

Returns:

Jitter values with shape [num_motions, max_motion_len] (same as input)

Return type:

torch.Tensor

compute_jitter_reduce_each_motion(
num_bodies,
aggregate_method='mean',
)[source]#

Compute jitter (2nd order finite differences of positions) and reduce across body dimensions.

This method is specifically designed for rigid_body_pos data with shape [num_motions, max_motion_len, num_bodies*3]. It computes the L2 norm of 2nd order finite differences (pos[t+1] - 2*pos[t] + pos[t-1]) for each body, then aggregates across all bodies using the specified method. Output is zero-padded at the beginning to match input length.

Parameters:
  • num_bodies (int) – Number of rigid bodies (to reshape the flattened data)

  • aggregate_method (str) – How to aggregate across bodies (“mean”, “max”, “sum”)

Returns:

Jitter values with shape [num_motions, max_motion_len] (same as input)

Return type:

torch.Tensor

compute_rotation_jitter_reduce_each_motion(
num_bodies,
aggregate_method='mean',
)[source]#

Compute rotation jitter (1st order finite differences of angular velocities) and reduce across body dimensions.

This method is specifically designed for rigid_body_ang_vel data with shape [num_motions, max_motion_len, num_bodies*3]. It computes the L2 norm of 1st order finite differences (ang_vel[t+1] - ang_vel[t]) for each body, then aggregates across all bodies using the specified method. Output is zero-padded at the beginning to match input length.

Parameters:
  • num_bodies (int) – Number of rigid bodies (to reshape the flattened data)

  • aggregate_method (str) – How to aggregate across bodies (“mean”, “max”, “sum”)

Returns:

Rotation jitter values with shape [num_motions, max_motion_len] (same as input)

Return type:

torch.Tensor

copy_from(other)[source]#

Copy data from another MotionMetrics object.

copy_from_motion_ids(other, motion_ids)[source]#

Copy data from another MotionMetrics object for specific motions.

get_unfilled_mask()[source]#

Get a mask of the unfilled values in the data.

jitter_mean_reduce_each_motion(
num_bodies,
aggregate_method='mean',
)[source]#

Compute jitter and then take the mean over time for each motion.

Parameters:
  • num_bodies (int) – Number of rigid bodies

  • aggregate_method (str) – How to aggregate across bodies (“mean”, “max”, “sum”)

Returns:

Mean jitter value for each motion [num_motions]

Return type:

torch.Tensor

max_mean_reduce()[source]#
max_reduce_each_motion(with_frame=False)[source]#

Reduce the data by taking the max of each motion.

mean_max_reduce()[source]#

First reduce each motion by taking the mean over valid frames, then take the max across all motions.

Returns:

Maximum of the per-motion means (worst performing motion)

Return type:

torch.Tensor

mean_mean_reduce()[source]#
mean_min_reduce()[source]#

First reduce each motion by taking the mean over valid frames, then take the min across all motions.

Returns:

Minimum of the per-motion means (best performing motion)

Return type:

torch.Tensor

mean_reduce_each_motion()[source]#

Reduce the data by taking the mean of each motion.

merge_from(other)[source]#

Merge data from another MotionMetrics object.

min_mean_reduce()[source]#
min_reduce_each_motion()[source]#

Reduce the data by taking the min of each motion.

ops_mean_reduce(op)[source]#

first reduce the data by taking the op of each motion, then mean reduce across motions.

reset()[source]#

Reset all stored data and frame counts.

rotation_jitter_mean_reduce_each_motion(
num_bodies,
aggregate_method='mean',
)[source]#

Compute rotation jitter and then take the mean over time for each motion.

Parameters:
  • num_bodies (int) – Number of rigid bodies

  • aggregate_method (str) – How to aggregate across bodies (“mean”, “max”, “sum”)

Returns:

Mean rotation jitter value for each motion [num_motions]

Return type:

torch.Tensor

to(device)[source]#

Move metrics to specified device.

update(motion_ids, values, frame_indices=None)[source]#

Update the metrics data for specified motions.

Parameters:
  • motion_ids (MockTensor) – Tensor of motion IDs to update [batch_size]

  • values (MockTensor) – Tensor of values to update [batch_size, num_sub_features]

  • frame_indices (MockTensor | None) – Optional tensor of frame indices [batch_size] If None, will use the current count for each motion

class protomotions.agents.evaluators.AggregateMetric[source]#

Bases: object

Base class for metrics computed post-hoc over accumulated MotionMetrics trajectories.

Unlike evaluation_components (MdpComponents) which run per-step with threshold-based failure detection, AggregateMetrics run after the full episode to compute summary statistics like smoothness or jerk.

Subclasses must implement the compute() method.

compute(metrics)[source]#

Compute aggregate metrics from collected motion data.

Parameters:

metrics (Dict[str, MotionMetrics]) – Dictionary of MotionMetrics objects containing trajectory data

Returns:

Dictionary of scalar metrics with “eval/” prefix for logging

Return type:

Dict[str, float]

class protomotions.agents.evaluators.SmoothnessAggregateMetric(
evaluator,
window_sec=0.4,
high_jerk_threshold=6500.0,
)[source]#

Bases: AggregateMetric

Aggregate metric for computing motion smoothness from rigid body trajectories.

Computes normalized jerk and high-jerk frame percentage over sliding windows.

__init__(
evaluator,
window_sec=0.4,
high_jerk_threshold=6500.0,
)[source]#

Initialize the smoothness aggregate metric.

Parameters:
  • evaluator – The parent evaluator instance

  • window_sec (float) – Window size in seconds for smoothness computation

  • high_jerk_threshold (float) – Threshold for classifying high jerk frames

compute(metrics)[source]#

Compute smoothness metrics from collected motion data.

Parameters:

metrics (Dict[str, MotionMetrics]) – Dictionary of MotionMetrics

Returns:

Dictionary of smoothness metrics with “eval/” prefix

Return type:

Dict[str, float]

class protomotions.agents.evaluators.ActionSmoothnessAggregateMetric(evaluator, dt=None)[source]#

Bases: AggregateMetric

Aggregate metric for computing action smoothness.

Measures how much actions change between consecutive timesteps. High action deltas indicate jerky/unstable control.

__init__(evaluator, dt=None)[source]#

Initialize the action smoothness aggregate metric.

Parameters:
  • evaluator – The parent evaluator instance

  • dt (float) – Simulation timestep (defaults to env.dt)

compute(metrics)[source]#

Compute action smoothness metrics from collected action data.

Metrics computed: - action_delta_mean: Mean absolute action change per step (rad) - action_delta_max: Max absolute action change per step across all joints (rad) - action_rate_mean: Mean action rate of change (rad/s)

Parameters:

metrics (Dict[str, MotionMetrics]) – Dictionary of MotionMetrics (must contain “actions”)

Returns:

Dictionary of action smoothness metrics with “eval/” prefix

Return type:

Dict[str, float]

Submodules#