Source code for sionna.phy.fec.interleaving

#
# SPDX-FileCopyrightText: Copyright (c) 2021-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
"""Blocks for interleaving and utility functions."""

from typing import Optional, Union
import warnings
import numpy as np
import torch
from importlib_resources import files, as_file

from sionna.phy import config, Block

__all__ = [
    "RowColumnInterleaver",
    "RandomInterleaver",
    "Turbo3GPPInterleaver",
    "Deinterleaver",
]


[docs] class RowColumnInterleaver(Block): r"""Interleaves a sequence of inputs via row/column swapping. :param row_depth: The row depth, i.e., how many values per row can be stored. :param axis: The dimension that should be interleaved. :param inverse: If `True`, the inverse permutation is performed. :param precision: Precision used for internal calculations and outputs. If `None`, :attr:`~sionna.phy.config.Config.precision` is used. :param device: Device for computation (e.g., 'cpu', 'cuda:0'). If `None`, :attr:`~sionna.phy.config.Config.device` is used. :input x: torch.Tensor. Tensor of arbitrary shape and arbitrary dtype. :output x_int: torch.Tensor. Tensor of same shape and dtype as ``x``. .. rubric:: Notes If the sequence length is not a multiple of ``row_depth``, additional filler bits are used for the last row that will be removed internally. However, for the last positions the interleaving distance may be slightly degraded. .. rubric:: Examples .. code-block:: python import torch from sionna.phy.fec.interleaving import RowColumnInterleaver interleaver = RowColumnInterleaver(row_depth=4) x = torch.arange(12).reshape(1, 12).float() y = interleaver(x) print(y) # tensor([[ 0., 4., 8., 1., 5., 9., 2., 6., 10., 3., 7., 11.]]) """ def __init__( self, row_depth: int, axis: int = -1, inverse: bool = False, precision: Optional[str] = None, device: Optional[str] = None, **kwargs, ): super().__init__(precision=precision, device=device, **kwargs) if not isinstance(axis, int): raise TypeError("axis must be int.") self._axis = axis if not isinstance(row_depth, int): raise TypeError("row_depth must be int.") self._row_depth = row_depth if not isinstance(inverse, bool): raise TypeError("inverse must be bool.") self._inverse = inverse # Permutation sequences initialized during build self._perm_seq: Optional[torch.Tensor] = None self._perm_seq_inv: Optional[torch.Tensor] = None # Required for associated deinterleaver self._keep_state = True @property def axis(self) -> int: """Axis to be permuted.""" return self._axis @property def row_depth(self) -> int: """Row depth of the row-column interleaver.""" return self._row_depth @property def perm_seq(self) -> Optional[torch.Tensor]: """Permutation sequence.""" return self._perm_seq @property def perm_seq_inv(self) -> Optional[torch.Tensor]: """Inverse permutation sequence.""" return self._perm_seq_inv @property def keep_state(self) -> bool: """Row-column interleaver always uses the same internal state.""" return True def _generate_perm_rc( self, n_seq: int, r_depth: int ) -> tuple[torch.Tensor, torch.Tensor]: """Generates a row/column permutation to initialize an RC-interleaver. If required, last positions use filler positions. :param n_seq: Sequence length to interleave. :param r_depth: Depth of the interleaver. :output perm_seq: Forward permutation tensor. :output perm_seq_inv: Inverse permutation tensor. """ # Round to next multiple of r_depth n = int(np.ceil(n_seq / r_depth) * r_depth) nb_rows = n // r_depth ind = torch.arange(n, dtype=torch.int64, device=self.device) # Rearrange in row/column format ind_rc = ind.reshape(nb_rows, -1) # Interleave via row/column swapping (transpose) ind_cr = ind_rc.t() # Read out indices in column/row ordering perm_seq_filler = ind_cr.reshape(-1) # Remove filler positions mask = perm_seq_filler < n_seq perm_seq = perm_seq_filler[mask] perm_seq_inv = torch.argsort(perm_seq) return perm_seq, perm_seq_inv
[docs] def build(self, input_shape: tuple) -> None: """Build block and check dimensions. :param input_shape: Shape of input tensor. """ if self._axis >= len(input_shape) or self._axis < -len(input_shape): raise ValueError("Axis does not match input shape.") # Normalize negative axis axis = self._axis if self._axis >= 0 else len(input_shape) + self._axis # Interleaver can't build pattern for dynamic shapes if input_shape[axis] is None: raise ValueError("Permutation axis cannot be None (dynamic).") # Generate permutation patterns p, pi = self._generate_perm_rc(input_shape[axis], self._row_depth) self._perm_seq = p self._perm_seq_inv = pi
def call( self, x: torch.Tensor, /, *, inverse: Optional[bool] = None, **kwargs, ) -> torch.Tensor: """Interleaving function. This function returns the permuted version of ``x``. :param x: Tensor of arbitrary shape. :param inverse: If provided, overrides the init parameter. :output x_int: Interleaved tensor of same shape as ``x``. """ input_shape = x.shape # Normalize axis axis = self._axis if self._axis >= 0 else len(input_shape) + self._axis # Re-init if shape has changed if self._perm_seq is None or x.shape[axis] != self._perm_seq.shape[0]: self._built = False self.build(x.shape) self._built = True # Use internal value if not explicitly provided if inverse is None: inverse = self._inverse # Ensure permutation is on the same device as input perm = self._perm_seq_inv if inverse else self._perm_seq if perm.device != x.device: perm = perm.to(x.device) x_int = torch.index_select(x, axis, perm) return x_int
[docs] class RandomInterleaver(Block): r"""Random interleaver permuting a sequence of input symbols. :param seed: Integer defining the random seed used if ``keep_state`` is `True`. :param keep_batch_constant: If `True`, each sample in the batch uses the same permutation. Otherwise, unique permutations per batch sample are generated (slower). :param inverse: If `True`, the inverse permutation is performed. :param keep_state: If `True`, the permutation is fixed for multiple calls (defined by ``seed`` attribute). :param axis: The dimension that should be interleaved. First dimension (``axis=0``) is not allowed. :param precision: Precision used for internal calculations and outputs. If `None`, :attr:`~sionna.phy.config.Config.precision` is used. :param device: Device for computation (e.g., 'cpu', 'cuda:0'). If `None`, :attr:`~sionna.phy.config.Config.device` is used. :input x: torch.Tensor. Tensor of arbitrary shape and dtype. :input seed: `int`. An integer defining the state of the random number generator. If explicitly given, the global internal seed is replaced by this seed. Can be used to realize random interleaver/deinterleaver pairs (call with same random seed). :output x_int: torch.Tensor. Tensor of same shape and dtype as the input ``x``. .. rubric:: Notes The interleaver block is stateless, i.e., the seed is either random during each call or must be explicitly provided during init/call. This is NOT the 5G interleaver sequence. .. rubric:: Examples .. code-block:: python import torch from sionna.phy.fec.interleaving import RandomInterleaver interleaver = RandomInterleaver(seed=42, keep_state=True) x = torch.arange(10).reshape(1, 10).float() y = interleaver(x) print(y) """ def __init__( self, seed: Optional[int] = None, keep_batch_constant: bool = True, inverse: bool = False, keep_state: bool = True, axis: int = -1, precision: Optional[str] = None, device: Optional[str] = None, **kwargs, ): super().__init__(precision=precision, device=device, **kwargs) if not isinstance(keep_batch_constant, bool): raise TypeError("keep_batch_constant must be bool.") self._keep_batch_constant = keep_batch_constant if not isinstance(axis, int): raise TypeError("axis must be int.") self._axis = axis if seed is not None: if not isinstance(seed, int): raise TypeError("seed must be int.") else: # Generate random seed if no value is provided seed = self.py_rng.randint(0, 2**31 - 1) self._seed = seed if not isinstance(inverse, bool): raise TypeError("inverse must be boolean.") self._inverse = inverse if not isinstance(keep_state, bool): raise TypeError("keep_state must be boolean.") self._keep_state = keep_state if self._keep_state is False and self._inverse is True: warnings.warn( "keep_state=False and, thus, a new realization of " "the interleaver is generated during each call. Thus, " "the inverse interleaver does not correspond to a previous " "interleaver call." ) @property def seed(self) -> int: """Seed to generate random sequence.""" return self._seed @property def axis(self) -> int: """Axis to be permuted.""" return self._axis @property def keep_state(self) -> bool: """Generate new random seed per call.""" return self._keep_state
[docs] def find_s_min( self, seed: int, seq_length: int, s_min_stop: int = 0 ) -> int: r"""Find :math:`S` parameter such that :math:`\pi(i)-\pi(j)>S` for all :math:`i-j<S`. This can be used to find optimized interleaver patterns. ``s_min_stop`` is an additional stopping condition, i.e., stop if current :math:`S` is already smaller than ``s_min_stop``. :param seed: Seed to draw random permutation that shall be analyzed. :param seq_length: Length of permutation sequence to be analyzed. :param s_min_stop: Enables early stop if already current s_min < ``s_min_stop``. :output s_min: The S-parameter for the given ``seed``. """ if not isinstance(seed, int): raise TypeError("seed must be int.") if not isinstance(seq_length, int): raise TypeError("seq_length must be int.") if not isinstance(s_min_stop, int): raise TypeError("s_min_stop must be int.") perm_seq = self._generate_perm_full(seed, seq_length, batch_size=1) perm_seq = perm_seq.squeeze(0).cpu().numpy() s_min = seq_length for i in range(len(perm_seq)): for j in range(-s_min, s_min, 1): if j == 0: continue if 0 <= i + j < seq_length: d = np.abs(perm_seq[i] - perm_seq[i + j]) if d <= np.abs(j): s_min = min(s_min, np.abs(j)) if d < s_min and np.abs(j) < s_min: s_min = min(s_min, d) # Early stop if s_min <= s_min_stop: break return int(s_min)
def _generate_perm_full( self, seed: int, seq_length: int, batch_size: int, inverse: bool = False, target_device: Optional[torch.device] = None, ) -> torch.Tensor: """Generates a random permutation for the interleaver. :param seed: Seed for the random number generator. :param seq_length: Length of the sequence to be permuted. :param batch_size: Number of independent permutations. :param inverse: If `True`, the inverse permutation is generated. :param target_device: Device for the output tensor. :output perm_seq: Permutation tensor of shape ``[batch_size, seq_length]``. """ device = target_device if target_device is not None else self.device # Generator must be on CPU for CUDA tensors gen_device = "cpu" if str(device).startswith("cuda") else device gen = torch.Generator(device=gen_device) gen.manual_seed(seed) rand_seq = torch.rand( batch_size, seq_length, generator=gen, device=gen_device, dtype=torch.float32, ) perm_seq = torch.argsort(rand_seq, dim=-1) if inverse: perm_seq = torch.argsort(perm_seq, dim=-1) # Move to target device if needed if perm_seq.device != device: perm_seq = perm_seq.to(device) return perm_seq
[docs] def build(self, input_shape: tuple, **kwargs) -> None: """Build block and check consistency of dimensions. :param input_shape: Shape of input tensor. """ if self._axis >= len(input_shape) or self._axis < -len(input_shape): raise ValueError("Axis does not match input shape.")
def call( self, x: torch.Tensor, /, *, seed: Optional[int] = None, inverse: Optional[bool] = None, ) -> torch.Tensor: """Interleaving function. This function returns the permuted version of ``x``. :param x: Tensor of arbitrary shape. :param seed: An integer defining the state of the random number generator. If explicitly given, the global internal seed is replaced by this seed. Can be used to realize random interleaver/deinterleaver pairs (call with same random seed). :param inverse: If provided, overrides the init parameter. :output x_int: Interleaved tensor of same shape as ``x``. :raises TypeError: If ``inverse`` is not `None` or `bool`. :raises ValueError: If inverse interleaving is requested with ``keep_state=False`` without explicitly providing the seed. .. rubric:: Notes In case of inverse interleaving (e.g., at the receiver), ``keep_state`` should be `True` as otherwise a new permutation is generated and the output is not equal to the original sequence. Alternatively, an explicit seed must be provided as function argument. """ input_shape = x.shape if inverse is None: inverse = self._inverse else: if not isinstance(inverse, bool): raise TypeError("inverse must be bool.") # Determine seed to use if seed is not None: use_seed = seed elif self._keep_state: use_seed = self._seed else: if inverse: raise ValueError( "Inverse interleaving not possible for " "random seeds per call (keep_state=False) without " "explicitly providing the seed as inputs." ) # Generate new seed for each call use_seed = self.py_rng.randint(0, 2**31 - 1) # Normalize axis axis = self._axis if self._axis >= 0 else len(input_shape) + self._axis # Select batch size for permutation generation if self._keep_batch_constant: batch_size = 1 else: # Special case: no batch dim if len(x.shape) == 1: batch_size = 1 else: batch_size = x.shape[0] perm_seq = self._generate_perm_full( use_seed, x.shape[axis], batch_size, inverse, target_device=x.device ) if self._keep_batch_constant: # Broadcast single sequence over complete batch perm_seq = perm_seq.squeeze(0) x_int = torch.index_select(x, axis, perm_seq) elif len(x.shape) == 1: # Special case: no batch dim perm_seq = perm_seq.squeeze(0) x_int = torch.index_select(x, axis, perm_seq) elif len(x.shape) == 2: # 2D case: batch x seq_len - use gather directly # perm_seq is [batch_size, seq_len] x_int = torch.gather(x, axis, perm_seq) else: # Per-batch permutation using gather for higher dims # Move axis to position 1 for gather x_t = x.movedim(axis, 1) # perm_seq is [batch_size, seq_len], expand to match x_t shape perm_expanded = perm_seq for _ in range(len(x_t.shape) - 2): perm_expanded = perm_expanded.unsqueeze(-1) perm_expanded = perm_expanded.expand(-1, -1, *x_t.shape[2:]) x_int = torch.gather(x_t, 1, perm_expanded) x_int = x_int.movedim(1, axis) return x_int
[docs] class Turbo3GPPInterleaver(Block): """Interleaver for 3GPP Turbo codes. Interleaver as used in the 3GPP Turbo codes :cite:p:`3GPPTS36212` and, thus, the maximum length is given as 6144 elements (only for the dimension as specified by ``axis``). :param inverse: If `True`, the inverse permutation is performed. :param axis: The dimension that should be interleaved. First dimension (``axis=0``) is not allowed. :param precision: Precision used for internal calculations and outputs. If `None`, :attr:`~sionna.phy.config.Config.precision` is used. :param device: Device for computation (e.g., 'cpu', 'cuda:0'). If `None`, :attr:`~sionna.phy.config.Config.device` is used. :input x: torch.Tensor. 2+D tensor of arbitrary shape and dtype. :output x_int: torch.Tensor. 2+D tensor of same shape and dtype as the input ``x``. .. rubric:: Notes Note that this implementation slightly deviates from the 3GPP standard :cite:p:`3GPPTS36212` in a sense that zero-padding is introduced for cases when the exact interleaver length is not supported by the standard. .. rubric:: Examples .. code-block:: python import torch from sionna.phy.fec.interleaving import Turbo3GPPInterleaver interleaver = Turbo3GPPInterleaver() x = torch.arange(40).reshape(1, 40).float() y = interleaver(x) print(y.shape) # torch.Size([1, 40]) """ def __init__( self, inverse: bool = False, axis: int = -1, precision: Optional[str] = None, device: Optional[str] = None, **kwargs, ): super().__init__(precision=precision, device=device, **kwargs) if not isinstance(axis, int): raise TypeError("axis must be int.") self._axis = axis self._keep_state = True # Required for deinterleaver if not isinstance(inverse, bool): raise TypeError("inverse must be boolean.") self._inverse = inverse # Load interleaver patterns as defined in the 3GPP standard self.coeffs_dict: dict[int, tuple[int, int]] = {} # Import coeffs from turbo module from sionna.phy.fec.turbo import coeffs source = files(coeffs).joinpath("turbo_coeffs.csv") with as_file(source) as coeffs_file: csv_reader = np.genfromtxt(coeffs_file, delimiter=",") for line_count, row in enumerate(csv_reader): if line_count > 0: # Ignore header line self.coeffs_dict[int(row[1])] = (int(row[2]), int(row[3])) @property def axis(self) -> int: """Axis to be permuted.""" return self._axis @property def keep_state(self) -> bool: """Always `True` for the Turbo3GPP interleaver.""" return self._keep_state def _generate_perm_full( self, frame_size: int, inverse: bool = False, target_device: Optional[torch.device] = None, ) -> torch.Tensor: """Generates a permutation for the 3GPP Turbo interleaver. :param frame_size: Length of the sequence to be permuted. :param inverse: If `True`, the inverse permutation is generated. :param target_device: Device for the output tensor. :output perm_seq: Permutation tensor of shape ``[frame_size]``. """ device = target_device if target_device is not None else self.device k = frame_size if k not in self.coeffs_dict: geqk_sizes = sorted([x for x in self.coeffs_dict if x >= k]) if len(geqk_sizes) == 0: raise ValueError( "Input frame size too large for 3GPP Turbo Interleaver." ) else: k = geqk_sizes[0] f1, f2 = self.coeffs_dict[k] perm_seq = [(f1 * i + f2 * (i**2)) % k for i in range(k)] if frame_size < k: perm_seq = [x for x in perm_seq if x < frame_size] perm_seq = torch.tensor(perm_seq, dtype=torch.int64, device=device) if inverse: perm_seq = torch.argsort(perm_seq) return perm_seq
[docs] def build(self, input_shape: tuple) -> None: """Build block and check consistency of dimensions. :param input_shape: Shape of input tensor. """ if self._axis >= len(input_shape) or self._axis < -len(input_shape): raise ValueError("Axis does not match input shape.") # Normalize axis axis = self._axis if self._axis >= 0 else len(input_shape) + self._axis frame_size = input_shape[axis] if frame_size >= 6145: raise ValueError( "3GPP Turbo Interleaver is defined for block lengths up to 6144." )
def call( self, x: torch.Tensor, /, *, inverse: Optional[bool] = None, **kwargs, ) -> torch.Tensor: """Interleaving function. This function returns the permuted version of ``x``. :param x: Tensor of arbitrary shape. :param inverse: If provided, overrides the init parameter. :output x_int: Interleaved tensor of same shape as ``x``. """ input_shape = x.shape # Normalize axis axis = self._axis if self._axis >= 0 else len(input_shape) + self._axis frame_size = input_shape[axis] if inverse is None: inverse = self._inverse perm_seq = self._generate_perm_full(frame_size, inverse, target_device=x.device) x_int = torch.index_select(x, axis, perm_seq) return x_int
[docs] class Deinterleaver(Block): """Deinterleaver that reverts the interleaver for a given input sequence. :param interleaver: Associated interleaver which shall be deinterleaved by this block. Can be either :class:`~sionna.phy.fec.interleaving.RandomInterleaver`, :class:`~sionna.phy.fec.interleaving.RowColumnInterleaver`, or :class:`~sionna.phy.fec.interleaving.Turbo3GPPInterleaver`. :param precision: Precision used for internal calculations and outputs. If `None`, inherits from ``interleaver``. :param device: Device for computation (e.g., 'cpu', 'cuda:0'). If `None`, inherits from ``interleaver``. :input x: torch.Tensor. 2+D tensor of arbitrary shape. :input seed: `int`. An integer defining the state of the random number generator. If explicitly given, the global internal seed is replaced by this seed. Can be used to realize random interleaver/deinterleaver pairs (call with same random seed). :output x_out: torch.Tensor. 2+D tensor of same shape and dtype as the input ``x``. .. rubric:: Notes This block provides a wrapper of the inverse interleaver function. .. rubric:: Examples .. code-block:: python import torch from sionna.phy.fec.interleaving import RandomInterleaver, Deinterleaver interleaver = RandomInterleaver(seed=42, keep_state=True) deinterleaver = Deinterleaver(interleaver) x = torch.arange(10).reshape(1, 10).float() y = interleaver(x) z = deinterleaver(y) print(torch.allclose(x, z)) # True """ def __init__( self, interleaver: Union[ RandomInterleaver, RowColumnInterleaver, Turbo3GPPInterleaver ], precision: Optional[str] = None, device: Optional[str] = None, **kwargs, ): if not isinstance( interleaver, (RandomInterleaver, RowColumnInterleaver, Turbo3GPPInterleaver), ): raise ValueError( "interleaver is not a valid interleaver instance." ) # If precision/device is None, use same as associated interleaver if precision is None: precision = interleaver.precision if device is None: device = interleaver.device super().__init__(precision=precision, device=device, **kwargs) # Assign interleaver after super().__init__() for nn.Module compatibility self._interleaver = interleaver if self._interleaver.keep_state is False: warnings.warn( "Deinterleaver requires interleaver to have " "keep_state=True or to explicitly provide the seed as inputs." ) @property def interleaver( self, ) -> Union[RandomInterleaver, RowColumnInterleaver, Turbo3GPPInterleaver]: """Associated interleaver instance.""" return self._interleaver
[docs] def build(self, input_shape: tuple) -> None: """Build block.""" pass
def call( self, x: torch.Tensor, /, *, seed: Optional[int] = None, ) -> torch.Tensor: """Deinterleaving function. This function returns the permuted version of ``x``. :param x: Tensor of arbitrary shape. :param seed: An integer defining the state of the random number generator. If explicitly given, the global internal seed is replaced by this seed. Can be used to realize random interleaver/deinterleaver pairs (call with same random seed). :output x_out: Deinterleaved tensor of same shape as ``x``. """ input_dtype = x.dtype if isinstance(self._interleaver, RandomInterleaver): x_out = self._interleaver(x, seed=seed, inverse=True) else: x_out = self._interleaver(x, inverse=True) # Cast to original dtype to avoid different dtypes return x_out.to(input_dtype)