Source code for protomotions.agents.common.common

# SPDX-FileCopyrightText: Copyright (c) 2025 The ProtoMotions Developers
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Common neural network components and utilities for agents.

This module provides shared building blocks used across different agent architectures,
including observation normalization, weight initialization, and specialized layers.

Key Classes:
    - NormObsBase: Base class for modules with observation normalization
    - ObsProcessor: General observation processor for reshaping and normalizing
    - Embedding: Embedding layer for discrete inputs

Key Functions:
    - weight_init: Initialize network weights
    - get_params: Extract parameters from optimizer groups
"""

import torch
from torch import nn, Tensor
from protomotions.utils.hydra_replacement import get_class
from copy import copy
from typing import List, Dict, Optional
from tensordict import TensorDict
from tensordict.nn import TensorDictModuleBase

from protomotions.agents.utils.normalization import RunningMeanStd
from protomotions.agents.common.config import (
    NormObsBaseConfig,
    ObsProcessorConfig,
    ModuleContainerConfig,
    ModuleOperationConfig,
    ModuleOperationForwardConfig,
    ModuleOperationPermuteConfig,
    ModuleOperationReshapeConfig,
    ModuleOperationSqueezeConfig,
    ModuleOperationUnsqueezeConfig,
    ModuleOperationExpandConfig,
    ModuleOperationSphereProjectionConfig,
)


[docs] def get_params(obj) -> List[nn.Parameter]: """Extract parameters from optimizer parameter groups. Handles both flat lists of parameters and grouped parameters (as used by optimizers). Args: obj: Either a list of nn.Parameter or a list of parameter groups (dicts). Returns: Flat list of all parameters. """ as_list = list(obj) if isinstance(as_list[0], Tensor): return as_list else: params = [] for group in as_list: params = params + list(group["params"]) return params
[docs] def weight_init(m, orthogonal=False): """Initialize weights for neural network modules. Applies appropriate initialization to linear layers and other modules. Linear layers get orthogonal or default initialization with zero bias. Args: m: Neural network module to initialize. orthogonal: If True, use orthogonal initialization for linear layers. """ if isinstance(m, nn.Linear): if orthogonal: nn.init.orthogonal_(m.weight.data) if hasattr(m.bias, "data"): m.bias.data.fill_(0.0) elif hasattr(m, "reset_parameters"): m.reset_parameters()
[docs] class NormObsBase(nn.Module): """Base class for modules with observation normalization. Provides running mean/std normalization of observations using exponential moving averages. Normalization statistics are updated during training and frozen during evaluation. Uses lazy initialization - input shape is inferred on first forward pass. This is a simple tensor-to-tensor module. Subclasses handle TensorDict extraction/insertion. Args: config: Configuration specifying output dimensions and normalization parameters. Attributes: running_obs_norm: RunningMeanStd module for observation normalization (lazy). num_out: Output dimension. """
[docs] def __init__(self, config: NormObsBaseConfig): super().__init__() self.config = config self.build_norm()
[docs] def build_norm(self): if self.config.normalize_obs: # Use lazy initialization - shape inferred on first forward self.running_obs_norm = RunningMeanStd( fabric=None, shape=None, # Lazy: inferred from first input device="cpu", clamp_value=self.config.norm_clamp_value, )
[docs] def forward(self, obs: Tensor) -> Tensor: """Forward pass that normalizes observations. Args: obs: Observation tensor to normalize. Returns: Normalized observation tensor. """ if self.config.normalize_obs: norm_obs = self.running_obs_norm.normalize(obs) if self.training: self.running_obs_norm.record_moments(obs) else: norm_obs = obs return norm_obs
[docs] def apply_module_operations( obs: Tensor, module_operations: List[ModuleOperationConfig], normalizer: NormObsBase, forward_model: Optional[nn.Module] = None, ) -> Dict[str, Tensor]: batch_size = obs.shape[0] norm_obs = None for operation in module_operations: if isinstance(operation, ModuleOperationPermuteConfig): obs = obs.permute(*operation.new_order) elif isinstance(operation, ModuleOperationReshapeConfig): new_shape = copy(operation.new_shape) if isinstance(new_shape[0], str) and new_shape[0] == "batch_size": # Use actual tensor dimension for ONNX tracing compatibility new_shape[0] = batch_size elif isinstance(new_shape[0], str) and "batch_size" in new_shape[0]: # Handle expressions like "batch_size * 2" - only works for concrete values try: new_shape[0] = eval(new_shape[0].replace("batch_size", str(int(batch_size)))) except (TypeError, ValueError): # During tracing, batch_size may not be convertible to int # Fall back to using -1 which PyTorch will infer new_shape[0] = -1 obs = obs.reshape(*new_shape) elif isinstance(operation, ModuleOperationSqueezeConfig): obs = obs.squeeze(dim=operation.squeeze_dim) elif isinstance(operation, ModuleOperationUnsqueezeConfig): obs = obs.unsqueeze(dim=operation.unsqueeze_dim) elif isinstance(operation, ModuleOperationExpandConfig): obs = obs.expand(*operation.expand_shape) elif isinstance(operation, ModuleOperationSphereProjectionConfig): obs = torch.nn.functional.normalize(obs, dim=-1) elif isinstance(operation, ModuleOperationForwardConfig): obs_shape = obs.shape if len(obs_shape) > 2: obs = obs.reshape(-1, obs_shape[-1]) if normalizer is not None: obs = norm_obs = normalizer(obs) if len(obs_shape) > 2: obs = obs.reshape(*obs_shape[:-1], -1) if norm_obs is not None: norm_obs = norm_obs.reshape(*obs_shape[:-1], -1) if forward_model is not None: obs = forward_model(obs) else: raise NotImplementedError(f"Operation {operation} not implemented") return_dict = {"output": obs} if norm_obs is not None: return_dict["norm_obs"] = norm_obs return return_dict
[docs] class ObsProcessor(TensorDictModuleBase): """General observation processor - applies operations and normalization. ForwardConfig applies normalization but skips the forward model (no MLP). """ config: ObsProcessorConfig
[docs] def __init__(self, config: ObsProcessorConfig): TensorDictModuleBase.__init__(self) self.config = config assert len(config.out_keys) == 1, "ObsProcessor requires exactly one output key" self.norm = NormObsBase(config) if config.normalize_obs else None self.in_keys = config.in_keys self.out_keys = config.out_keys
[docs] def forward(self, tensordict: TensorDict, *args, **kwargs) -> TensorDict: obs = torch.cat([tensordict[key] for key in self.in_keys], dim=-1) result = apply_module_operations( obs, self.config.module_operations, forward_model=None, normalizer=self.norm ) tensordict[self.out_keys[0]] = result["output"] return tensordict
[docs] class ModuleContainer(TensorDictModuleBase): """Generic container that runs a list of modules sequentially on a TensorDict. With TensorDict, the distinction between "sequential", "parallel input", and "parallel output" is implicit in how keys flow between modules. """ config: ModuleContainerConfig
[docs] def __init__(self, config: ModuleContainerConfig): TensorDictModuleBase.__init__(self) self.config = config self.models = nn.ModuleList() # Build models and validate data flow # Container in_keys are "available" at the start (like sources) available_keys = set(self.config.in_keys) for i, model_cfg in enumerate(config.models): model = get_class(model_cfg._target_)(config=model_cfg) self.models.append(model) # Validate: each model's in_keys must be available # (either from container in_keys or a previous model's out_keys) for in_key in model.in_keys: assert in_key in available_keys, ( f"ModuleContainer: model {i} ({type(model).__name__}) requires " f"input key '{in_key}' but it's not available.\n" f" Available keys: {sorted(available_keys)}\n" f" Model in_keys: {model.in_keys}" ) # Add this model's outputs to available keys for subsequent models available_keys.update(model.out_keys) # Validate: every container out_key must have been produced for out_key in self.config.out_keys: assert out_key in available_keys, ( f"ModuleContainer: container promises out_key '{out_key}' " f"but no model produces it.\n" f" Available keys after all models: {sorted(available_keys)}" ) self.in_keys = self.config.in_keys self.out_keys = self.config.out_keys
[docs] def forward(self, tensordict: TensorDict, *args, **kwargs) -> TensorDict: """Forward through all models sequentially.""" for model in self.models: tensordict = model(tensordict) return tensordict