#
# SPDX-FileCopyrightText: Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0#
"""Blocks for interleaving and utility functions"""
import numpy as np
import tensorflow as tf
from importlib_resources import files, as_file
from sionna.phy import config, Block
from sionna.phy.fec.turbo import coeffs
[docs]
class RowColumnInterleaver(Block):
# pylint: disable=line-too-long
r"""Interleaves a sequence of inputs via row/column swapping.
Parameters
----------
row_depth: int
The row depth, i.e., how many values per row can be stored.
axis: int
The dimension that should be interleaved.
inverse: `bool`, (default `False`)
If `True`, the inverse permutation is performed.
precision : `None` (default) | 'single' | 'double'
Precision used for internal calculations and outputs.
If set to `None`, :py:attr:`~sionna.phy.config.precision` is used.
Input
-----
x: `tf.DType`
Tensor of arbitrary shape and arbitrary dtype.
Output
------
: `tf.DType`
Tensor of same shape and dtype as ``inputs``.
Note
----
If the sequence length is not a multiple of ``row_depth``, additional
filler bits are used for the last row that will be removed internally.
However, for the last positions the interleaving distance may be
slightly degraded.
"""
def __init__(self,
row_depth,
axis=-1,
inverse=False,
precision=None,
**kwargs):
super().__init__(precision=precision, **kwargs)
# store perm_seq
self._perm_seq = None # initialized during build
self._perm_seq_inv = None # initialized during build
if not isinstance(axis, int):
raise TypeError("axis must be int.")
self._axis = axis
if not isinstance(row_depth, int):
raise TypeError("row_depth must be int.")
self._row_depth = row_depth
if not isinstance(inverse, bool):
raise TypeError("inverse must be bool.")
self._inverse = inverse
# should not be changed, only required for associated deinterleaver
self._keep_state = True
###############################
# Public methods and properties
###############################
@property
def axis(self):
"""Axis to be permuted"""
return self._axis
@property
def row_depth(self):
"""Row depth of the row-column interleaver"""
return self._row_depth
@property
def perm_seq(self):
"""Permutation sequence"""
return self._perm_seq
@property
def perm_seq_inv(self):
"""Inverse permutation sequence"""
return self._perm_seq_inv
@property
def keep_state(self):
"""Row-column interleaver always uses same internal state."""
return True
#################
# Utility methods
#################
def _generate_perm_rc(self, n_seq, r_depth):
"""Generates a row/column permutation to initialize an RC-interleaver.
If required last positions use "filler" positions.
Args:
N_seq (int): An integer defining the sequence length to interleave.
r_depth (int): An integer defining the depth of the interleaver.
"""
# round to next multiple of r_depth
n = tf.cast((tf.math.ceil(n_seq/r_depth)*r_depth), tf.int32)
nb_rows = tf.cast(n/r_depth, tf.int64)
ind = tf.range(n, dtype=tf.int32)
# rearange in row/colum format
ind_rc = tf.reshape(ind, [nb_rows,-1])
# and interleave via row/column swapping
ind_cr = tf.transpose(ind_rc, (1,0))
# read out indices in column/row ordering
perm_seq_filler= tf.reshape(ind_cr, [-1])
# remove filler positions
mask = tf.math.less(perm_seq_filler, n_seq)
perm_seq = tf.boolean_mask(perm_seq_filler, mask)
perm_seq_inv= tf.argsort(perm_seq)
return perm_seq, perm_seq_inv
########################
# Sionna Block functions
########################
def build(self, input_shape):
"""check shapes for consistency"""
if self._axis >= len(input_shape):
raise ValueError("Axis does match input shape")
# Interleaver can't build pattern for dynamic shapes
if input_shape[self._axis] is None:
raise ValueError("Permutation axis cannot be None (dynamic).")
# and generate permutation patterns
p, pi = self._generate_perm_rc(input_shape[self._axis], self._row_depth)
self._perm_seq = p
self._perm_seq_inv = pi
def call(self, x, /, *, inverse=None, **kwargs):
"""interleaving function
This function returns the permuted version of inputs.
Note that the deinterleaver will provide a seed per default, this will
be ignored by the **kwargs argument.
Args:
inputs (tf.float32): Tensor of arbitrary shape. Must have at least
rank two.
Returns:
`tf.float32`: Tensor of same shape as the input.
"""
input_shape = x.shape
# re-init if shape has changed, update perm_seq
if x.shape[self._axis] != self._perm_seq.shape[0]:
self.build(x.shape)
# if not explicitly provided use internal value
if inverse is None:
inverse = self._inverse
if inverse:
x_int = tf.gather(x, self._perm_seq_inv, axis=self._axis)
else:
x_int = tf.gather(x, self._perm_seq, axis=self._axis)
return tf.ensure_shape(x_int, input_shape)
[docs]
class RandomInterleaver(Block):
# pylint: disable=line-too-long
r"""Random interleaver permuting a sequence of input symbols.
Parameters
----------
seed: int
Integer defining the random seed used if option ``keep_state`` is
True.
keep_batch_constant: `bool`, (default `True`)
If set to True each sample in the batch uses the same permutation.
Otherwise, unique permutations per batch sample are generate (slower).
inverse: `bool`, (default `False`)
If `True`, the inverse permutation is performed.
keep_state: `bool`, (default `True`)
If `True`, the permutation is fixed for multiple calls (defined by
``seed`` attribute).
axis: int, (default -1)
The dimension that should be interleaved.
First dimension (`axis=0`) is not allowed.
precision : `None` (default) | 'single' | 'double'
Precision used for internal calculations and outputs.
If set to `None`, :py:attr:`~sionna.phy.config.precision` is used.
Input
-----
x: `tf.DType`
Tensor of arbitrary shape and dtype.
seed: `int`
An integer defining the state of the random number
generator. If explicitly given, the global internal seed is
replaced by this seed. Can be used to realize random
interleaver/deinterleaver pairs (call with same random seed).
Output
------
: tf.DType
Tensor of same shape and dtype as the input ``x``.
Note
----
The interleaver block is stateless, i.e., the seed is either random
during each call or must be explicitly provided during init/call.
This simplifies XLA/graph execution.
This is NOT the 5G interleaver sequence.
"""
def __init__(self,
seed=None,
keep_batch_constant=True,
inverse=False,
keep_state=True,
axis=-1,
precision=None,
**kwargs):
super().__init__(precision=precision, **kwargs)
# verify and store attributes
if not isinstance(keep_batch_constant, bool):
raise TypeError("keep_batch_constant must be bool.")
self._keep_batch_constant = keep_batch_constant
if not isinstance(axis, int):
raise TypeError("axis must be int.")
self._axis=axis
# a global seed is stored and used if called with keep_state=True
if seed is not None:
if not isinstance(seed, int):
raise TypeError("seed must be int.")
else:
# generate random seed if no value is provided
seed = int(np.random.uniform(0, 2**31-1))
# if keep_state==True this seed is used to generate scrambling sequences
self._seed = (1337, seed)
if not isinstance(inverse, bool):
raise TypeError("inverse must be boolean")
self._inverse = inverse
if not isinstance(keep_state, bool):
raise TypeError("keep_state must be boolean")
self._keep_state = keep_state
if self._keep_state is False and self._inverse is True:
print("Note: keep_state=False and, thus, a new realization of " \
"the interleaver is generated during each call. Thus, " \
"the inverse interleaver does not correspond to a previous " \
"interleaver call.")
###############################
# Public methods and properties
###############################
@property
def seed(self):
"""Seed to generate random sequence"""
return self._seed[1] # only return the non-fixed seed
@property
def axis(self):
"""Axis to be permuted"""
return self._axis
@property
def keep_state(self):
"""Generate new random seed per call"""
return self._keep_state
[docs]
def find_s_min(self, seed, seq_length, s_min_stop=0):
r"""Find :math:`S` parameter such that :math:`\pi(i)-\pi(j)>S` for all
:math:`i-j<S`. This can be used to find optimized interleaver patterns.
``s_min_stop`` is an additional stopping condition, i.e., stop if
current :math:`S` is already smaller than ``s_min_stop``.
Please note that this is a Numpy utility function and usually not part
of the graph.
Input
-----
seed: int
seed to draw random permutation that shall be analyzed.
seq_length: int
length of permutation sequence to be analyzed.
s_min_stop: int, (default 0)
Enables early stop if already current s_min< ``s_min_stop`` .
Output
------
: float
The S-parameter for the given ``seed``.
"""
if not isinstance(seed, int):
raise TypeError("seed must be int.")
if not isinstance(seq_length, int):
raise TypeError("seq_length must be int.")
if not isinstance(s_min_stop, int):
raise TypeError("s_min_stop must be int.")
seed = (1337, seed)
perm_seq = self._generate_perm_full(seed, seq_length, batch_size=1)
perm_seq = tf.squeeze(perm_seq, axis=0).numpy()
s_min = seq_length
for i in range(len(perm_seq)): # search for all positions in perm_seq
for j in range(-s_min,s_min,1): # search dist
if j==0: # ignore identity
continue
if i+j>=0 and i+j<seq_length:
d = np.abs(perm_seq[i] - perm_seq[i+j])
if d<=np.abs(j):
s_min = np.min([s_min, np.abs(j)])
if d<s_min and np.abs(j)<s_min:
s_min = np.min([s_min, d])
# early stop
if s_min<=s_min_stop:
break
return int(s_min)
#################
# Utility methods
#################
def _generate_perm_full(self, seed, seq_length, batch_size, inverse=False):
"""Generates a random permutation for the interleaver.
Args:
seed (int): A shape [2] Tensor, the seed to the random number
generator.
seq_length (int): The length of the sequence to be permuted.
batch_size (int): The batch size (=number of independent
permutations).
inverse (bool): Defaults to False. If `True`, the inverse permutation
for the given seed is generated.
"""
rand_seq = tf.random.stateless_uniform([batch_size, seq_length],
seed,
minval=0,
maxval=1,
dtype=tf.float32)
perm_seq = tf.argsort(rand_seq, axis=-1)
if inverse:
# cast to tf.float32 due to improved performance
perm_seq = tf.cast(perm_seq, tf.float32)
perm_seq = tf.argsort(perm_seq, axis=-1)
return perm_seq
#########################
# Sionna Block functions
#########################
# pylint: disable=(unused-argument)
def build(self, input_shape, **kwargs):
"""Build block and check consistency of dimensions."""
if self._axis >= len(input_shape):
raise ValueError("Axis does not match input shape.")
def call(self, x, /, *, seed=None, inverse=None):
"""Interleaving function.
This function returns the permuted version of ``inputs``.
Args:
inputs (List): ``[x, seed]``, where
``x`` (tf.float32): Tensor of arbitrary shape. Must have at
least rank two.
``seed`` (int): An integer defining the state of the random number
generator. If explicitly given, the global internal seed is
replaced by this seed. Can be used the realize random
interleaver/deinterleaver pairs (call with same random seed).
Returns:
`tf.float`: Tensor of same shape as the input.
Raises:
InvalidArgumentError
When rank(``x``)<2.
AssertionError
If ``seed`` is not None or int.
Note:
In case of inverse interleaving (e.g., at the receiver),
``keep_state`` should be `True` as otherwise a new permutation is
generated and the output is not equal to the original sequence.
Alternatively, an explicit seed must be provided as function
argument.
"""
input_shape = x.shape
if inverse is None:
inverse = self._inverse
else:
if not isinstance(inverse, bool):
raise TypeError("inverse must be bool")
# use seed if explicit seed is provided
if seed is not None:
seed = (tf.constant(1337), tf.cast(seed, tf.int32))
# only generate a new random sequence if keep_state==False
elif self._keep_state:
# use sequence as defined by seed
seed = self._seed
else:
if inverse:
raise ValueError(
"Inverse interleaving not possible for " \
"random seeds per call (keep_state=False) without " \
"explicitly providing the seed as inputs.")
# generate new seed for each call
# Note: not necessarily random if XLA is active
seed = config.tf_rng.uniform([2],
minval=0,
maxval=2**31-1,
dtype=tf.int32)
# select if each sample in batch needs own perm (computational complex!)
if self._keep_batch_constant:
batch_size = 1
else:
# special case: no batch dim
if len(tf.shape(x))==1:
batch_size = 1
else:
batch_size = tf.shape(x)[0]
perm_seq = self._generate_perm_full(seed,
tf.shape(x)[self._axis],
batch_size,
inverse)
if self._keep_batch_constant:
# broadcast single sequence over complete batch
perm_seq = tf.squeeze(perm_seq, axis=0) # remove batch_dim
x = tf.gather(x, perm_seq, batch_dims=0, axis=self._axis)
elif len(tf.shape(x))==1: # special case: no batch dim
perm_seq = tf.squeeze(perm_seq, axis=0) # remove batch_dim
x = tf.gather(x, perm_seq, batch_dims=0, axis=self._axis)
else:
x = tf.gather(x, perm_seq, batch_dims=1, axis=self._axis)
return tf.ensure_shape(x, input_shape)
[docs]
class Deinterleaver(Block):
"""Deinterleaver that reverts the interleaver for a given input sequence.
Parameters
----------
interleaver: Interleaver
Associated Interleaver which shall be deinterleaved by this block.
Can be either
:class:`~sionna.phy.fec.interleaving.RandomInterleaver` or
:class:`~sionna.phy.fec.interleaving.RowColumnInterleaver`.
precision : `None` (default) | 'single' | 'double'
Precision used for internal calculations and outputs.
If set to `None`, :py:attr:`~sionna.phy.config.precision` is used.
Input
-----
x: tf.DType
2+D tensor of arbitrary shape.
seed: int
An integer defining the state of the random number
generator. If explicitly given, the global internal seed is
replaced by this seed. Can be used to realize random
interleaver/deinterleaver pairs (call with same random seed).
Output
------
: tf.DType
2+D tensor of same shape and dtype as the input ``x``.
Note
----
This block provides a wrapper of the inverse interleaver function.
"""
def __init__(self,
interleaver,
precision=None,
**kwargs):
if not isinstance(interleaver,
(RandomInterleaver,
RowColumnInterleaver,
Turbo3GPPInterleaver)):
raise ValueError("interleaver is not a valid interleaver instance.")
self._interleaver = interleaver
# if dtype is None, use same dtype as associated interleaver
if precision is None:
precision = self._interleaver.precision
super().__init__(precision=precision, **kwargs)
if self._interleaver._keep_state is False:
print("Warning: deinterleaver requires interleaver to have " \
"keep_state=True or to explicitly provide the seed as inputs.")
#########################################
# Public methods and properties
#########################################
@property
def interleaver(self):
"""Associated interleaver instance"""
return self._interleaver
########################
# Sionna block functions
########################
def build(self, input_shape):
"""build block"""
pass
def call(self, x, seed=None):
"""deinterleaving function.
This function returns the permuted version of inputs.
Args:
x (tf.float32): Tensor of arbitrary shape. Must have at least
rank two.
seed (int): An integer defining the state of the random number
generator. If explicitly given, the global internal seed is
replaced by this seed. Can be used the realize random
interleaver/deinterleaver pairs (call with same random seed).
Returns:
`tf.float32`: Tensor of same shape as the input.
"""
input_dt = x.dtype
x = self._interleaver(x, seed=seed, inverse=True)
# cast to original dtype to avoid different dtypes
# due to call of interleaver block which casts to its internal precision
return tf.cast(x, input_dt)
[docs]
class Turbo3GPPInterleaver(Block):
# pylint: disable=line-too-long
"""Interleaver for 3GPP Turbo codes
Interleaver as used in the 3GPP Turbo codes [3GPPTS36212_I]_ and, thus,
the maximum length is given as 6144 elements (only for the dimension as
specific by ``axis``).
Parameters
----------
inverse: `bool`, (default `False`)
If `True`, the inverse permutation is performed.
axis: int, (default -1)
The dimension that should be interleaved.
First dimension (`axis=0`) is not allowed.
precision : `None` (default) | 'single' | 'double'
Precision used for internal calculations and outputs.
If set to `None`, :py:attr:`~sionna.phy.config.precision` is used.
Input
-----
x: tf.DType
2+D tensor of arbitrary shape and dtype.
Output
------
: tf.DType
2+D tensor of same shape and dtype as the input ``x``.
Note
----
Note that this implementation slightly deviates from the 3GPP
standard [3GPPTS36212_I]_ in a sense that zero-padding is introduced
for cases when the exact interleaver length is not supported by the
standard.
"""
def __init__(self,
inverse=False,
axis=-1,
precision=None,
**kwargs):
super().__init__(precision=precision, **kwargs)
if not isinstance(axis, int):
raise TypeError("axis must be int.")
self._axis=axis
self._keep_state = True # only required for deinterleaver
self.frame_size = None
if not isinstance(inverse, bool):
raise TypeError("inverse must be boolean")
self._inverse = inverse
# load interleaver patterns as defined in the 3GPP standard
self.coeffs_dict = {}
source = files(coeffs).joinpath("turbo_coeffs.csv")
with as_file(source) as coeffs.csv:
csv_reader = np.genfromtxt(coeffs.csv, delimiter=",")
for (line_count, row) in enumerate(csv_reader):
if line_count >0: #igonore first line (=header)
self.coeffs_dict[int(row[1])] = (int(row[2]), int(row[3]))
###############################
# Public methods and properties
###############################
@property
def axis(self):
"""Axis to be permuted"""
return self._axis
#################
# Utility methods
#################
def _generate_perm_full(self, frame_size, inverse=False):
"""Generates a random permutation for the interleaver.
Args:
frame_size (int): The length of the sequence to be permuted.
batch_size (int): The batch size (=number of independent
permutations).
inverse (bool): Defaults to False. If `True`, the inverse
permutation for the given seed is generated.
"""
k = frame_size
if k not in self.coeffs_dict:
geqk_sizes = sorted([x for x in self.coeffs_dict if x >= k])
if len(geqk_sizes)==0:
print("Input frame size too large for 3GPP Turbo Interleaver.")
else:
k = geqk_sizes[0]
f1, f2 = self.coeffs_dict[k]
perm_seq = [(f1 * i + f2* (i**2))%k for i in range(k)]
if frame_size < k:
perm_seq = [x for x in perm_seq if x < frame_size]
perm_seq = tf.convert_to_tensor(perm_seq)
if inverse:
# cast to tf.float32 due to improved sorting performance
perm_seq = tf.cast(perm_seq, tf.float32)
perm_seq = tf.argsort(perm_seq, axis=-1)
return perm_seq
#########################
# Sionna block functions
#########################
def build(self, input_shape):
"""Build Block and check consistency of dimensions."""
if not self.axis < len(input_shape):
raise ValueError("Axis does not match input shape.")
frame_size = input_shape[self._axis]
if frame_size >= 6145:
msg = "3GPP Turbo Interleaver is defined for block lengths up "\
"to 6144."
raise ValueError(msg)
# **kwargs to avoid issues if seed is provided during deinterleaving
# pylint: disable=unused-argument
def call(self, x, /, *, inverse=None, **kwargs):
"""Interleaving function.
This function returns the permuted version of ``inputs``.
"""
input_shape = x.shape
frame_size = input_shape[self._axis]
if inverse is None:
inverse = self._inverse
perm_seq = self._generate_perm_full(frame_size, inverse)
x = tf.gather(x, perm_seq, axis=self._axis)
# set explicitly for xla mode
return tf.ensure_shape(x, input_shape)