# SPDX-FileCopyrightText: Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Layers for interleaving and utility functions"""
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer
from importlib_resources import files, as_file
from sionna.fec.turbo import coeffs
[docs]class RowColumnInterleaver(Layer):
# pylint: disable=line-too-long
r"""RowColumnInterleaver(row_depth, axis=-1, inverse=False, dtype=tf.float32, **kwargs)
Interleaves a sequence of inputs via row/column swapping.
The class inherits from the Keras layer class and can be used as layer in a
Keras model.
row_depth: int
The row depth, i.e., how many values per row can be stored.
axis: int
The dimension that should be interleaved. First dimension
(`axis=0`) is not allowed.
inverse: bool
A boolean defaults to False. If True, the inverse permutation is
dtype: tf.DType
Defaults to `tf.float32`. Defines the datatype for internal
calculations and the output dtype.
inputs: tf.DType
2+D tensor of arbitrary shape and arbitrary dtype. Must have at
least rank two.
: tf.DType
2+D tensor of same shape and dtype as ``inputs``.
If ``axis`` is not an integer.
If ``row_depth`` is not an integer.
If ``axis`` > number of input dimensions.
If the sequence length is not a multiple of ``row_depth``, additional
filler bits are used for the last row that will be removed internally.
However, for the last positions the interleaving distance may be
slightly degraded.
To permute the batch dimension, expand_dims at `axis=0`, interleave and
remove new dimension.
def __init__(self,
super().__init__(dtype=dtype, **kwargs)
# store perm_seq
self._perm_seq = None # initalized during build
self._perm_seq_inv = None # initalized during build
assert isinstance(axis, int), "axis must be int."
self._axis = axis
assert isinstance(row_depth, int), "row_depth must be int."
self._row_depth = row_depth
assert isinstance(inverse, bool), "inverse must be bool."
self._inverse = inverse
# cannot be changed, only required for associated interleaver
self._keep_state = True
# Public methods and properties
def axis(self):
"""Axis to be permuted."""
return self._axis
def row_depth(self):
"""Row depth of the row-column interleaver."""
return self._row_depth
def perm_seq(self):
"""Permutation sequence."""
return self._perm_seq
def perm_seq_inv(self):
"""Inverse permutation sequence."""
return self._perm_seq_inv
def keep_state(self):
"""Row-column interleaver always uses same internal state."""
return True
[docs] def call_inverse(self, inputs):
"""Implements deinterleaver function corresponding to call().
inputs: tf.DType
2+D tensor of arbitrary shape and arbitrary dtype. Must have at
least rank two.
: tf.DType
2+D tensor of same shape and dtype as ``inputs``.
input_shape = inputs.shape
x = tf.gather(inputs, self._perm_seq_inv, axis=self._axis)
x = tf.ensure_shape(x, input_shape)
return x
# Utility methods
def _generate_perm_rc(self, n_seq, r_depth):
"""Generates a row/column permutation to initialize an rc-interleaver.
If required last positions use "filler" positions.
N_seq (int): An integer defining the sequence length to interleave.
r_depth (int): An integer defining the depth of the interleaver.
# round to next multiple of r_depth
n = tf.cast((tf.math.ceil(n_seq/r_depth)*r_depth), tf.int32)
nb_rows = tf.cast(n/r_depth, tf.int64)
ind = tf.range(n, dtype=tf.int32)
# rearange in row/colum format
ind_rc = tf.reshape(ind, [nb_rows,-1])
# and interleave via row/column swapping
ind_cr = tf.transpose(ind_rc, (1,0))
# read out indices in column/row ordering
perm_seq_filler= tf.reshape(ind_cr, [-1])
# remove filler positions
mask = tf.math.less(perm_seq_filler, n_seq)
perm_seq = tf.boolean_mask(perm_seq_filler, mask)
perm_seq_inv= tf.argsort(perm_seq)
return perm_seq, perm_seq_inv
# Keras layer functions
def build(self, input_shape):
assert self._axis < len(input_shape), "Axis does match input shape"
# init rand sequences during build
assert input_shape[self._axis] is not None, "Unknown shape at req. dim"
p, pi = self._generate_perm_rc(input_shape[self._axis], self._row_depth)
self._perm_seq = p
self._perm_seq_inv = pi
def call(self, inputs):
"""interleaving function
This function returns the permuted version of inputs.
inputs (tf.float32): Tensor of arbitrary shape. Must have at least
rank two.
`tf.float32`: Tensor of same shape as the input.
input_shape = inputs.shape
# re-init if shape has changed, update perm_seq
if inputs.shape[self._axis] != self._perm_seq.shape[0]:
if self._inverse:
x = tf.gather(inputs, self._perm_seq_inv, axis=self._axis)
x = tf.gather(inputs, self._perm_seq, axis=self._axis)
x = tf.ensure_shape(x, input_shape)
return x
[docs]class RandomInterleaver(Layer):
# pylint: disable=line-too-long
"""RandomInterleaver(seed=None, keep_batch_constant=True, inverse=False, keep_state=True, axis=-1, dtype=tf.float32, **kwargs)
Random interleaver permuting a sequence of input symbols.
The class inherits from the Keras layer class and can be used as layer in a
Keras model.
seed: int
Integer defining the random seed used if option ``keep_state`` is
keep_batch_constant: bool
Defaults to True. If set to True each sample in the batch uses the
same permutation. Otherwise, unique permutations per batch sample
are generate (slower).
inverse: bool
A boolean defaults to False. If True, the inverse permutation is
keep_state: bool
A boolean defaults to True. If True, the permutation is fixed for
multiple calls (defined by ``seed`` attribute).
axis: int
Defaults to `-1`. The dimension that should be interleaved.
First dimension (`axis=0`) is not allowed.
dtype: tf.DType
Defaults to `tf.float32`. Defines the datatype for internal
calculations and the output dtype.
(x, seed):
Either Tuple ``(x, seed)`` or ``x`` only (no tuple) if the internal
seed should be used:
x: tf.DType
2+D tensor of arbitrary shape and dtype.
seed: int
An integer defining the state of the random number
generator. If explicitly given, the global internal seed is
replaced by this seed. Can be used to realize random
interleaver/deinterleaver pairs (call with same random seed).
: tf.DType
2+D tensor of same shape and dtype as the input ``x``.
If ``axis`` is not `int`.
If ``seed`` is not `None` or `int`.
If ``axis`` > number of input dimensions.
If ``inverse`` is not bool.
If ``keep_state`` is not bool.
If ``keep_batch_constant`` is not bool.
When rank(``x``)<2.
To permute the batch dimension, expand_dims at ``axis=0``, interleave
and remove new dimension.
The interleaver layer is stateless, i.e., the seed is either random
during each call or must be explicitly provided during init/call.
This simplifies XLA/graph execution.
This is NOT the 5G interleaver sequence.
def __init__(self,
super().__init__(dtype=dtype, **kwargs)
# verify and store attributes
assert isinstance(keep_batch_constant, bool), \
"keep_batch_constant must be bool."
self._keep_batch_constant = keep_batch_constant
assert isinstance(axis, int), "axis must be int."
assert axis!=0, "Cannot permute batch_dim."
# a global seed is stored and used if called with keep_state=True
if seed is not None:
assert isinstance(seed, int), "seed must be int."
# generate random seed if no value is provided
seed = int(np.random.uniform(0, 2**31-1))
# if keep_state==True this seed is used to generate scrambling sequences
self._seed = (1337, seed)
assert isinstance(inverse, bool), "inverse must be boolean"
self._inverse = inverse
assert isinstance(keep_state, bool), "keep_state must be boolean"
self._keep_state = keep_state
if self._keep_state is False and self._inverse is True:
print("Note: keep_state=False and, thus, a new realization of " \
"the interleaver is generated during each call. Thus, " \
"the inverse interleaver does not correspond to a previous " \
"interleaver call.")
# Public methods and properties
def seed(self):
"""Seed to generate random sequence."""
return self._seed[1] # only return the non-fixed seed
def axis(self):
"""Axis to be permuted."""
return self._axis
def keep_state(self):
"""Generate new random seed per call."""
return self._keep_state
[docs] def find_s_min(self, seed, seq_length, s_min_stop=0):
r"""Find :math:`S` parameter such that :math:`\pi(i)-\pi(j)>S` for all
:math:`i-j<S`. This can be used to find optimized interleaver patterns.
``s_min_stop`` is an additional stopping condition, i.e., stop if
current :math:`S` is already smaller than ``s_min_stop``.
Please note that this is a Numpy utility function and usually not part
of the graph.
seed: int
seed to draw random permutation that shall be analyzed.
seq_length: int
length of permutation sequence to be analyzed.
s_min_stop: int
Defaults to 0. Enables early stop if already current s_min< ``s_min_stop`` .
: float
The S-parameter for the given ``seed``.
assert isinstance(seed, int), "seed must be int."
assert isinstance(seq_length, int), "seq_length must be int."
assert isinstance(s_min_stop, int), "s_min_stop must be int."
seed = (1337, seed)
perm_seq = self._generate_perm_full(seed, seq_length, batch_size=1)
perm_seq = tf.squeeze(perm_seq, axis=0).numpy()
s_min = seq_length
for i in range(len(perm_seq)): # search for all positions in perm_seq
for j in range(-s_min,s_min,1): # search dist
if j==0: # ignore identity
if i+j>=0 and i+j<seq_length:
d = np.abs(perm_seq[i] - perm_seq[i+j])
if d<=np.abs(j):
s_min = np.min([s_min, np.abs(j)])
if d<s_min and np.abs(j)<s_min:
s_min = np.min([s_min, d])
# early stop
if s_min<=s_min_stop:
return int(s_min)
[docs] def call_inverse(self, inputs):
"""Implements deinterleaver function corresponding to call().
(x, seed):
Either Tuple ``(x, seed)`` or ``x`` only (no tuple) if the internal
seed should be used:
x: tf.DType
2+D tensor of arbitrary shape and dtype.
seed: int
An integer defining the state of the random number
generator. If explicitly given, the global internal seed is
replaced by this seed. Can be used to realize random
interleaver/deinterleaver pairs (call with same random seed).
: tf.DType
2+D tensor of same shape and dtype as the input ``x``.
When rank(``x``)<2.
If ``keep_state`` is False and no explicit seed is provided.
In case of inverse interleaving (e.g., at the receiver),
``keep_state`` should be True as otherwise a new permutation is
generated and the output is not equal to the original sequence.
Alternatively, an explicit seed must be provided as function
if isinstance(inputs, (tuple, list)):
if len(inputs)==1: # if user wants to call with call([x])
seed = None
x = inputs
elif len(inputs)==2:
x, seed = inputs
raise TypeError("inputs cannot have more than 2 entries.")
seed = None
x = inputs
input_shape = x.shape
tf.debugging.assert_greater(tf.rank(x), 1)
# use seed if explicit seed is provided
if seed is not None:
seed = (tf.constant(1337), tf.cast(seed, tf.int32))
elif self._keep_state:
# use sequence as defined by seed
seed = self._seed
# This mode is not supported for
raise ValueError("Deinterleaving not possible for random " \
"seeds per call (keep_state=False) without explicitly " \
"providing the seed as inputs.")
# select if each sample in batch needs own perm (computational complex!)
if self._keep_batch_constant:
batch_size = 1
batch_size = tf.shape(x)[0]
perm_seq = self._generate_perm_full(seed,
inverse=True) # activate inverse
if self._keep_batch_constant:
# broadcast single sequence over complete batch
perm_seq = tf.squeeze(perm_seq, axis=0) # remove batch_dim
x = tf.gather(x, perm_seq, batch_dims=0, axis=self._axis)
x = tf.gather(x, perm_seq, batch_dims=1, axis=self._axis)
# set explicitly for keras models
x = tf.ensure_shape(x, input_shape)
return x
# Utility methods
def _generate_perm_full(self, seed, seq_length, batch_size, inverse=False):
"""Generates a random permutation for the interleaver.
seed (int): A shape [2] Tensor, the seed to the random number
seq_length (int): The length of the sequence to be permuted.
batch_size (int): The batch size (=number of independent
inverse (bool): Defaults to False. If True, the inverse permutation
for the given seed is generated.
rand_seq = tf.random.stateless_uniform([batch_size, seq_length],
perm_seq = tf.argsort(rand_seq, axis=-1)
if inverse:
# cast to tf.float32 due to improved performance
perm_seq = tf.cast(perm_seq, tf.float32)
perm_seq = tf.argsort(perm_seq, axis=-1)
return perm_seq
# Keras layer functions
def build(self, input_shape):
"""Build Keras layer and check consistency of dimensions."""
if isinstance(input_shape, list):
assert self._axis < len(input_shape), "Axis does not match input shape."
assert len(input_shape) > 1, "At least two dims are required."
def call(self, inputs):
"""Interleaving function.
This function returns the permuted version of ``inputs``.
inputs (List): ``[x, seed]``, where
``x`` (tf.float32): Tensor of arbitrary shape. Must have at
least rank two.
``seed`` (int): An integer defining the state of the random number
generator. If explicitly given, the global internal seed is
replaced by this seed. Can be used the realize random
interleaver/deinterleaver pairs (call with same random seed).
`tf.float32`: Tensor of same shape as the input.
When rank(``x``)<2.
If ``seed`` is not None or int.
In case of inverse interleaving (e.g., at the receiver),
``keep_state`` should be True as otherwise a new permutation is
generated and the output is not equal to the original sequence.
Alternatively, an explicit seed must be provided as function
if isinstance(inputs, (tuple, list)):
if len(inputs)==1: # if user wants to call with call([x])
seed = None
x = inputs
elif len(inputs)==2:
x, seed = inputs
raise TypeError("inputs cannot have more than 2 entries.")
seed = None
x = inputs
input_shape = x.shape
tf.debugging.assert_greater(tf.rank(x), 1)
# use seed if explicit seed is provided
if seed is not None:
seed = (tf.constant(1337), tf.cast(seed, tf.int32))
# only generate a new random sequence if keep_state==False
elif self._keep_state:
# use sequence as defined by seed
seed = self._seed
# generate new seed for each call
# Note: not necessarily random if XLA is active
seed = tf.random.uniform([2],
# select if each sample in batch needs own perm (computational complex!)
if self._keep_batch_constant:
batch_size = 1
batch_size = tf.shape(x)[0]
perm_seq = self._generate_perm_full(seed,
if self._keep_batch_constant:
# broadcast single sequence over complete batch
perm_seq = tf.squeeze(perm_seq, axis=0) # remove batch_dim
x = tf.gather(x, perm_seq, batch_dims=0, axis=self._axis)
x = tf.gather(x, perm_seq, batch_dims=1, axis=self._axis)
# set explicitly for keras models
x = tf.ensure_shape(x, input_shape)
return x
[docs]class Deinterleaver(Layer):
"""Deinterleaver(interleaver, dtype=None, **kwargs)
Deinterleaver that reverts the interleaver for a given input sequence.
The class inherits from the Keras layer class and can be used as layer in a
Keras model.
interleaver: Interleaver
Associated Interleaver which shall be deinterleaved by this layer.
Can be either
:class:`~sionna.fec.interleaving.RandomInterleaver` or
dtype: None or tf.DType
Defaults to `None`. Defines the datatype for internal calculations
and the output dtype. If no explicit dtype is provided the dtype
from the associated interleaver is used.
(x, seed):
Either Tuple ``(x, seed)`` or ``x`` only (no tuple) if the internal
seed should be used:
x: tf.DType
2+D tensor of arbitrary shape.
seed: int
An integer defining the state of the random number
generator. If explicitly given, the global internal seed is
replaced by this seed. Can be used to realize random
interleaver/deinterleaver pairs (call with same random seed).
: tf.DType
2+D tensor of same shape and dtype as the input ``x``.
If ``interleaver`` is not a valid instance of Interleaver.
This layer provides a wrapper of the inverse interleaver function.
def __init__(self,
if not isinstance(interleaver,
raise ValueError("interleaver is not a valid interleaver instance.")
self._interleaver = interleaver
# if dtype is None, use same dtype as associated interleaver
if dtype is None:
dtype = self._interleaver.dtype
super().__init__(dtype=dtype, **kwargs)
if self._interleaver._keep_state is False:
print("Warning: deinterleaver requires interleaver to have " \
"keep_state=True or to explicitly provide the seed as inputs.")
# Public methods and properties
def interleaver(self):
"""Associated interleaver instance."""
return self._interleaver
# Utility methods
# Keras layer functions
def build(self, input_shape):
"""build layer"""
def call(self, inputs):
"""deinterleaving function.
This function returns the permuted version of inputs.
inputs (tf.float32): Tensor of arbitrary shape. Must have at least
rank two.
`tf.float32`: Tensor of same shape as the input.
x = self._interleaver.call_inverse(inputs)
x = tf.cast(x, super().dtype) # cast output to correct dtype
return x
[docs]class Turbo3GPPInterleaver(Layer):
# pylint: disable=line-too-long
"""Turbo3GPPInterleaver(inverse=False, axis=-1, dtype=tf.float32, **kwargs)
Interleaver as used in the 3GPP Turbo codes [3GPPTS36212_I]_ and, thus,
the maximum length is given as 6144 elements (only for the dimension as
specific by ``axis``).
The class inherits from the Keras layer class and can be used as layer in a
Keras model.
inverse: bool
A boolean defaults to False. If True, the inverse permutation is
axis: int
Defaults to `-1`. The dimension that should be interleaved.
First dimension (`axis=0`) is not allowed.
dtype: tf.DType
Defaults to `tf.float32`. Defines the datatype for internal
calculations and the output dtype.
x: tf.DType
2+D tensor of arbitrary shape and dtype.
: tf.DType
2+D tensor of same shape and dtype as the input ``x``.
If ``axis`` is not `int`.
If ``axis`` > number of input dimensions.
If ``inverse`` is not bool.
When rank(``x``)<2.
Note that this implementation slightly deviates from the 3GPP
standard [3GPPTS36212_I]_ in a sense that zero-padding is introduced
for cases when the exact interleaver length is not supported by the
def __init__(self,
super().__init__(dtype=dtype, **kwargs)
assert isinstance(axis, int), "axis must be int."
assert axis!=0, "Cannot permute batch dimension."
self._keep_state = True # only required for deinterleaver
self.frame_size = None
assert isinstance(inverse, bool), "inverse must be boolean"
self._inverse = inverse
# load interleaver patterns as defined in the 3GPP standard
self.coeffs_dict = {}
source = files(coeffs).joinpath("turbo_coeffs.csv")
with as_file(source) as coeffs.csv:
csv_reader = np.genfromtxt(coeffs.csv, delimiter=",")
for (line_count, row) in enumerate(csv_reader):
if line_count >0: #igonore first line (=header)
self.coeffs_dict[int(row[1])] = (int(row[2]), int(row[3]))
# Public methods and properties
def axis(self):
"""Axis to be permuted."""
return self._axis
[docs] def find_s_min(self, frame_size, s_min_stop=0):
r"""Find :math:`S` parameter such that :math:`\pi(i)-\pi(j)>S` for all
:math:`i-j<S`. This can be used to find optimized interleaver patterns.
``s_min_stop`` is an additional stopping condition, i.e., stop if
current :math:`S` is already smaller than ``s_min_stop``.
Please note that this is a Numpy utility function and usually not part
of the graph.
frame_size: int
length of interleaver.
s_min_stop: int
Defaults to 0. Enables early stop if already current
: float
The S-parameter for the given ``frame_size``.
assert isinstance(s_min_stop, int), "s_min_stop must be int."
assert isinstance(frame_size, int), "frame_size must be int."
assert(frame_size<6145), "Interleaver not defined for this frame_size."
perm_seq = self._generate_perm_full(frame_size)
perm_seq = perm_seq.numpy()
s_min = frame_size
for i in range(len(perm_seq)): # search for all positions in perm_seq
for j in range(-s_min,s_min,1): # search dist
if j==0: # ignore identity
if i+j>=0 and i+j<frame_size:
d = np.abs(perm_seq[i] - perm_seq[i+j])
if d<=np.abs(j):
s_min = np.min([s_min, np.abs(j)])
if d<s_min and np.abs(j)<s_min:
s_min = np.min([s_min, d])
# early stop
if s_min<=s_min_stop:
return int(s_min)
[docs] def call_inverse(self, inputs):
"""Implements deinterleaver function corresponding to call().
x: tf.DType
2+D tensor of arbitrary shape and dtype.
: tf.DType
2+D tensor of same shape and dtype as the input ``x``.
When rank(``x``)<2.
if isinstance(inputs, (tuple, list)):
if len(inputs)==1: # if user wants to call with call([x])
x = inputs
raise TypeError("inputs cannot have more than 1 entry.")
x = inputs
input_shape = x.shape
frame_size = input_shape[self._axis]
# activate inverse
perm_seq = self._generate_perm_full(frame_size, inverse=True)
x = tf.gather(x, perm_seq, batch_dims=0, axis=self._axis)
# set explicitly for keras models
x = tf.ensure_shape(x, input_shape)
return x
# Utility methods
def _generate_perm_full(self, frame_size, inverse=False):
"""Generates a random permutation for the interleaver.
frame_size (int): The length of the sequence to be permuted.
batch_size (int): The batch size (=number of independent
inverse (bool): Defaults to False. If True, the inverse permutation
for the given seed is generated.
k = frame_size
if k not in self.coeffs_dict:
geqk_sizes = sorted([x for x in self.coeffs_dict if x >= k])
if len(geqk_sizes)==0:
print("Input frame size too large for 3GPP Turbo Interleaver.")
k = geqk_sizes[0]
f1, f2 = self.coeffs_dict[k]
perm_seq = [(f1 * i + f2* (i**2))%k for i in range(k)]
if frame_size < k:
perm_seq = [x for x in perm_seq if x < frame_size]
perm_seq = tf.convert_to_tensor(perm_seq)
if inverse:
# cast to tf.float32 due to improved sorting performance
perm_seq = tf.cast(perm_seq, tf.float32)
perm_seq = tf.argsort(perm_seq, axis=-1)
return perm_seq
# Keras layer functions
def build(self, input_shape):
"""Build Keras layer and check consistency of dimensions."""
if isinstance(input_shape, list):
assert self.axis < len(input_shape), "Axis does not match input shape."
assert len(input_shape) > 1, "At least two dims are required."
frame_size = input_shape[self._axis]
assert(frame_size< 6145), \
"3GPP Turbo Interleaver is defined for block lengths up to 6144."
def call(self, inputs):
"""Interleaving function.
This function returns the permuted version of ``inputs``.
if isinstance(inputs, (tuple, list)):
if len(inputs)==1: # if user wants to call with call([x])
x = inputs
raise TypeError("inputs cannot have more than 1 entry.")
x = inputs
input_shape = x.shape
frame_size = input_shape[self._axis]
perm_seq = self._generate_perm_full(frame_size, self._inverse)
x = tf.gather(x, perm_seq, batch_dims=0, axis=self._axis)
# set explicitly for keras models
x = tf.ensure_shape(x, input_shape)
return x