Source code for protomotions.agents.evaluators.aggregate_metrics

# SPDX-FileCopyrightText: Copyright (c) 2025-2026 The ProtoMotions Developers
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Aggregate metrics for post-hoc trajectory analysis.

This module contains metrics computed over accumulated MotionMetrics trajectories
after an evaluation episode completes, as opposed to per-step evaluation components
(MdpComponents) which run during the episode with threshold-based failure detection.

Key Classes:
    - AggregateMetric: Base class for post-hoc metrics
    - SmoothnessAggregateMetric: Computes normalized jerk and high-jerk frame percentage
    - ActionSmoothnessAggregateMetric: Computes action rate of change metrics
"""

from typing import Dict
from protomotions.agents.evaluators.metrics import MotionMetrics
from protomotions.agents.evaluators.smoothness_calculator import SmoothnessCalculator


[docs] class AggregateMetric: """Base class for metrics computed post-hoc over accumulated MotionMetrics trajectories. Unlike evaluation_components (MdpComponents) which run per-step with threshold-based failure detection, AggregateMetrics run after the full episode to compute summary statistics like smoothness or jerk. Subclasses must implement the compute() method. """
[docs] def compute(self, metrics: Dict[str, MotionMetrics]) -> Dict[str, float]: """Compute aggregate metrics from collected motion data. Args: metrics: Dictionary of MotionMetrics objects containing trajectory data Returns: Dictionary of scalar metrics with "eval/" prefix for logging """ raise NotImplementedError
[docs] class SmoothnessAggregateMetric(AggregateMetric): """Aggregate metric for computing motion smoothness from rigid body trajectories. Computes normalized jerk and high-jerk frame percentage over sliding windows. """
[docs] def __init__( self, evaluator, window_sec: float = 0.4, high_jerk_threshold: float = 6500.0 ): """Initialize the smoothness aggregate metric. Args: evaluator: The parent evaluator instance window_sec: Window size in seconds for smoothness computation high_jerk_threshold: Threshold for classifying high jerk frames """ self.smoothness_calculator = SmoothnessCalculator( device=evaluator.device, dt=evaluator.env.dt, window_sec=window_sec, high_jerk_threshold=high_jerk_threshold, ) self.num_bodies = evaluator.env.robot_config.kinematic_info.num_bodies
[docs] def compute(self, metrics: Dict[str, MotionMetrics]) -> Dict[str, float]: """Compute smoothness metrics from collected motion data. Args: metrics: Dictionary of MotionMetrics Returns: Dictionary of smoothness metrics with "eval/" prefix """ smoothness_metrics = self.smoothness_calculator.compute_smoothness_metrics( metrics, self.num_bodies ) # Add logging for each smoothness metric result = {} for k, v in smoothness_metrics.items(): print(f"Smoothness metric: {k}, value: {v}") result[f"eval/{k}"] = v return result
[docs] class ActionSmoothnessAggregateMetric(AggregateMetric): """Aggregate metric for computing action smoothness. Measures how much actions change between consecutive timesteps. High action deltas indicate jerky/unstable control. """
[docs] def __init__(self, evaluator, dt: float = None): """Initialize the action smoothness aggregate metric. Args: evaluator: The parent evaluator instance dt: Simulation timestep (defaults to env.dt) """ self.dt = dt if dt is not None else evaluator.env.dt self.device = evaluator.device
[docs] def compute(self, metrics: Dict[str, MotionMetrics]) -> Dict[str, float]: """Compute action smoothness metrics from collected action data. Metrics computed: - action_delta_mean: Mean absolute action change per step (rad) - action_delta_max: Max absolute action change per step across all joints (rad) - action_rate_mean: Mean action rate of change (rad/s) Args: metrics: Dictionary of MotionMetrics (must contain "actions") Returns: Dictionary of action smoothness metrics with "eval/" prefix """ if "actions" not in metrics: return {} actions_data = metrics["actions"].data # [num_motions, max_frames, num_dofs] motion_lens = metrics["actions"].motion_lens # [num_motions] result = {} all_deltas = [] all_max_deltas = [] num_motions = actions_data.shape[0] for m in range(num_motions): n_frames = int(motion_lens[m].item()) if n_frames < 2: continue actions_m = actions_data[m, :n_frames] # [n_frames, num_dofs] # Compute action deltas between consecutive frames deltas = (actions_m[1:] - actions_m[:-1]).abs() # [n_frames-1, num_dofs] # Mean delta across all joints and frames all_deltas.append(deltas.mean().item()) # Max delta per frame, then mean across frames all_max_deltas.append(deltas.max(dim=-1)[0].mean().item()) if all_deltas: mean_delta = sum(all_deltas) / len(all_deltas) mean_max_delta = sum(all_max_deltas) / len(all_max_deltas) result["eval/action_delta_mean_rad"] = mean_delta result["eval/action_delta_max_rad"] = mean_max_delta result["eval/action_rate_mean_rad_s"] = mean_delta / self.dt # Convert to degrees for readability result["eval/action_delta_mean_deg"] = mean_delta * 180 / 3.14159 result["eval/action_delta_max_deg"] = mean_max_delta * 180 / 3.14159 print( f"Action smoothness: mean_delta={mean_delta:.4f} rad ({mean_delta * 180 / 3.14159:.2f}°), " f"max_delta={mean_max_delta:.4f} rad ({mean_max_delta * 180 / 3.14159:.2f}°)" ) return result