#
# SPDX-FileCopyrightText: Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
"""Layer for Turbo Decoding."""
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer
from sionna.fec import interleaving
from sionna.fec.conv.decoding import BCJRDecoder
from sionna.fec.conv.utils import Trellis
from sionna.fec.turbo.utils import TurboTermination, polynomial_selector, puncture_pattern
[docs]
class TurboDecoder(Layer):
# pylint: disable=line-too-long
r"""TurboDecoder(encoder=None, gen_poly=None, rate=1/3, constraint_length=None, interleaver='3GPP', terminate=False, num_iter=6, hard_out=True, algorithm='map', output_dtype=tf.float32,**kwargs)
Turbo code decoder based on BCJR component decoders [Berrou]_.
Takes as input LLRs and returns LLRs or hard decided bits, i.e., an
estimate of the information tensor.
This decoder is based on the :class:`~sionna.fec.conv.decoding.BCJRDecoder`
and, thus, internally instantiates two
:class:`~sionna.fec.conv.decoding.BCJRDecoder` layers.
The class inherits from the Keras layer class and can be used as layer in
a Keras model.
Parameters
----------
encoder: :class:`~sionna.fec.turbo.encoding.TurboEncoder`
If ``encoder`` is provided as input, the following input parameters
are not required and will be ignored: `gen_poly`, `rate`,
`constraint_length`, `terminate`, `interleaver`. They will be inferred
from the ``encoder`` object itself.
If ``encoder`` is `None`, the above parameters must be provided
explicitly.
gen_poly: tuple
Tuple of strings with each string being a 0, 1 sequence. If `None`,
``rate`` and ``constraint_length`` must be provided.
rate: float
Rate of the Turbo code. Valid values are 1/3 and 1/2. Note that
``gen_poly``, if provided, is used to encode the underlying
convolutional code, which traditionally has rate 1/2.
constraint_length: int
Valid values are between 3 and 6 inclusive. Only required if
``encoder`` and ``gen_poly`` are `None`.
interleaver: str
`"3GPP"` or `"Random"`. If `"3GPP"`, the internal interleaver for Turbo
codes as specified in [3GPPTS36212_Turbo]_ will be used. Only required
if ``encoder`` is `None`.
terminate: bool
If `True`, the two underlying convolutional encoders are assumed
to have terminated to all zero state.
num_iter: int
Number of iterations for the Turbo decoding to run. Each iteration of
Turbo decoding entails one BCJR decoder for each of the underlying
convolutional code components.
hard_out: boolean
Defaults to `True` and indicates whether to output hard or soft
decisions on the decoded information vector. `True` implies a hard-
decoded information vector of 0/1's is output. `False` implies
decoded LLRs of the information is output.
algorithm: str
Defaults to `map`. Indicates the implemented BCJR algorithm,
where `map` denotes the exact MAP algorithm, `log` indicates the
exact MAP implementation, but in log-domain, and
`maxlog` indicates the approximated MAP implementation in log-domain,
where :math:`\log(e^{a}+e^{b}) \sim \max(a,b)`.
output_dtype: tf.DType
Defaults to `tf.float32`. Defines the output datatype of the layer.
Input
-----
inputs: tf.float32
2+D tensor of shape `[...,n]` containing the (noisy) channel
output symbols where `n` is the codeword length
Output
------
: tf.float32
2+D tensor of shape `[...,coderate*n]` containing the estimates of the
information bit tensor
Note
----
For decoding, input `logits` defined as
:math:`\operatorname{log} \frac{p(x=1)}{p(x=0)}` are assumed for
compatibility with the rest of Sionna. Internally,
log-likelihood ratios (LLRs) with definition
:math:`\operatorname{log} \frac{p(x=0)}{p(x=1)}` are used.
"""
def __init__(self,
encoder=None,
gen_poly=None,
rate=1/3,
constraint_length=None,
interleaver='3GPP',
terminate=False,
num_iter=6,
hard_out=True,
algorithm='map',
output_dtype=tf.float32,
**kwargs):
super().__init__(**kwargs)
if encoder is not None:
self._coderate = encoder._coderate
self._gen_poly = encoder._gen_poly
self._terminate = encoder.terminate
self._trellis = encoder.trellis
assert self._trellis.rsc is True
self.rsc = True
self.internal_interleaver = encoder.internal_interleaver
else:
if gen_poly is not None:
assert all(isinstance(poly, str) for poly in gen_poly), \
"Each polynomial must be a string."
assert all(len(poly)==len(gen_poly[0]) for poly in gen_poly), \
"Each polynomial must be of same length."
assert all(all(
char in ['0','1'] for char in poly) for poly in gen_poly),\
"Each polynomial must be a string of 0's and 1's."
self._gen_poly = gen_poly
else:
valid_constraint_length = (3, 4, 5, 6)
assert constraint_length in valid_constraint_length, \
"Constraint length must be between 3 and 6."
self._gen_poly = polynomial_selector(constraint_length)
valid_rates = (1/2, 1/3)
assert rate in valid_rates
self._coderate = rate
tf.debugging.assert_type(terminate, tf.bool)
self._terminate = terminate
assert interleaver in ('3GPP', 'random')
if interleaver == '3GPP':
self.internal_interleaver = interleaving.Turbo3GPPInterleaver()
else:
self.internal_interleaver = interleaving.RandomInterleaver(
keep_batch_constant=True,
keep_state=True,
axis=-1)
self.rsc = True
self._trellis = Trellis(self._gen_poly, rsc=self.rsc)
assert isinstance(hard_out, bool), 'hard_out must be bool.'
self._coderate_conv = 1/len(self._gen_poly)
self._mu = len(self._gen_poly[0])-1
self.punct_pattern = puncture_pattern(self._coderate,
self._coderate_conv)
# num. of input bit streams, only 1 in current implementation
self._conv_k = self._trellis.conv_k
self._mu = self._trellis._mu
# num. of output bits for conv_k input bits
self._conv_n = self._trellis.conv_n
self._ni = 2**self._conv_k
self._no = 2**self._conv_n
self._ns = self._trellis.ns
assert self._conv_k == 1
assert self._conv_n == 2
self._k = None # Length of Info-bit vector
self._n = None # Length of Turbo codeword, including termination bits
if self._terminate:
self.turbo_term = TurboTermination(self._mu+1,
conv_n=self._conv_n)
self._num_term_bits = 3 * self.turbo_term.get_num_term_syms()
else:
self._num_term_bits = 0
self._output_dtype = output_dtype
self.num_iter = num_iter
self._hard_out = hard_out
self.bcjrdecoder = BCJRDecoder(gen_poly=self._gen_poly,
rsc=self.rsc,
hard_out=False,
terminate=self._terminate,
algorithm=algorithm)
#########################################
# Public methods and properties
#########################################
@property
def gen_poly(self):
"""Generator polynomial used by the encoder"""
return self._gen_poly
@property
def constraint_length(self):
"""Constraint length of the encoder"""
return self._mu + 1
@property
def coderate(self):
"""Rate of the code used in the encoder"""
return self._coderate
@property
def trellis(self):
"""Trellis object used during encoding"""
return self._trellis
@property
def k(self):
"""Number of information bits per codeword"""
if self._k is None:
print("Note: The value of k cannot be computed before the first " \
"call().")
return self._k
@property
def n(self):
"""Number of codeword bits"""
if self._n is None:
print("Note: The value of n cannot be computed before the first " \
"call().")
return self._n
#########################
# Utility functions
#########################
[docs]
def depuncture(self, y):
"""
Given a tensor `y` of shape `[batch, n]`, depuncture() scatters `y`
elements into shape `[batch, 3*rate*n]` where the
extra elements are filled with 0.
For e.g., if input is `y`, rate is 1/2 and
`punct_pattern` is [1, 1, 0, 1, 0, 1], then the
output is [y[0], y[1], 0., y[2], 0., y[3], y[4], y[5], 0., ... ,].
"""
y_depunct = tf.scatter_nd(self._punct_indices,
tf.transpose(y),
shape=(self._depunct_len, tf.shape(y)[0]))
y_depunct = tf.transpose(y_depunct)
return y_depunct
def _convenc_cws(self, y_turbo):
"""
_convenc_cws() re-arranges Turbo Codeword to the two Convolutional
codewords format.
Given the channel output of a Turbo codeword y_turbo, this method
re-arranges y_turbo such that y1_cw contains the symbols corresponding
to Conv. Encoder 1 & similarly y2_cw contains the symbols corresponding
to Conv. Encoder 2
"""
y_turbo = self.depuncture(y_turbo)
prepunct_n = int(self._n * 3 * self._coderate)
# Separate Pre-termination & Termination parts of Y
msg_idx = tf.range(0, prepunct_n - self._num_term_bits)
term_idx = tf.range(prepunct_n-self._num_term_bits, prepunct_n)
# Pre-termination & Termination parts of Y
y_cw = tf.gather(y_turbo, msg_idx, axis=-1)
y_term = tf.gather(y_turbo, term_idx, axis=-1)
# Gather Encoder1 corresp. from Y (pre-termination part)
enc1_sys_idx = tf.expand_dims(tf.range(0, self._k*3, delta=3), 1)
enc1_cw_idx = tf.stack([enc1_sys_idx, enc1_sys_idx+1], axis=1)
enc1_cw_idx = tf.squeeze(tf.reshape(enc1_cw_idx, (-1, 2*self._k)))
y1_cw = tf.gather(y_cw, enc1_cw_idx, axis=-1)
# Gather systematic part of codeword from encoder1 & Inverse-interleave
y1_sys_cw = tf.gather(y_cw, enc1_sys_idx, axis=-1)
y2_sys_cw = self.internal_interleaver(
tf.squeeze(y1_sys_cw, -1))[:,:,None]
# Using above, gather Encoder2 corresp. from Y (pre-termination part)
y2_nonsys_cw = tf.gather(y_cw, enc1_sys_idx+2, axis=-1)
y2_cw = tf.squeeze(tf.stack([y2_sys_cw, y2_nonsys_cw], axis=-2))
y2_cw = tf.reshape(y2_cw, [-1, 2*self._k])
# Separate Termination bits to encoders 1 & 2
if self._terminate:
term_vec1, term_vec2 = self.turbo_term.term_bits_turbo2conv(y_term)
y1_cw = tf.concat([y1_cw, term_vec1],axis=1)
y2_cw = tf.concat([y2_cw, term_vec2],axis=-1)
return y1_cw, y2_cw
#########################
# Keras layer functions
#########################
def build(self, input_shape):
"""Build layer and check dimensions."""
# assert rank must be two
tf.debugging.assert_greater_equal(len(input_shape), 2)
self._n = input_shape[-1]
if self.coderate == 1/2:
assert self._n%2 == 0, "Codeword length should be a multiple of 2"
codefactor = self.coderate * 3
turbo_n = int(self._n * codefactor)
turbo_n_preterm = turbo_n - self._num_term_bits
assert turbo_n_preterm%3 == 0, "Invalid codeword length for a terminated Turbo code"
self._k = int(turbo_n_preterm/3)
# num of symbols for the convolutional codes.
self._convenc_numsyms = self._k
if self._terminate:
self._convenc_numsyms += self._mu
# generate puncturing mask
rate_factor = 3. * self._coderate
self._depunct_len = int(rate_factor * self._n)
punct_size = np.prod(self.punct_pattern.get_shape().as_list())
rep_times = int(self._depunct_len//punct_size)
mask_ = tf.tile(self.punct_pattern, [rep_times, 1])
extra_bits = int(self._depunct_len - rep_times*punct_size)
if extra_bits > 0:
extra_periods = int(extra_bits/3)
mask_ = tf.concat([mask_, self.punct_pattern[:extra_periods,:]],
axis=0)
mask_ = tf.squeeze(tf.reshape(mask_, (-1, )))
self._punct_indices = tf.cast(tf.where(mask_), tf.int32)
def call(self, inputs):
"""
Decoder for Turbo code.
Runs BCJR decoder on both the constituent convolutional codes
iteratively `num_iter` times. At the end, the resultant LLRs are
computed and the decoded message vector (termination bits are
excluded) is output.
"""
llr_max = 20.
tf.debugging.assert_type(inputs, tf.float32,
message="input must be tf.float32.")
output_shape = inputs.get_shape().as_list()
# allow different codeword lengths in eager mode
if output_shape[-1] != self._n:
self.build(output_shape)
llr_ch = tf.reshape(inputs, [-1, self._n])
output_shape[0] = -1
output_shape[-1] = self._k # assign k to the last dimension
# llr's inside TurboDecoder are not sign-inverted after input,
# unlike BCJR & LDPC decoders. They represent P(x=1)/P(x=0) as
# convention in Sionna.
y1_cw, y2_cw = self._convenc_cws(llr_ch)
sys_idx = tf.expand_dims(tf.range(0, self._k*2, delta=2), 1)
llr_ch = tf.gather(y1_cw, sys_idx, axis=-1)
llr_ch = tf.squeeze(llr_ch, -1)
llr_ch2 = tf.gather(y2_cw, sys_idx, axis=-1)
llr_ch2 = tf.squeeze(llr_ch2, -1)
llr_1e = tf.zeros((tf.shape(llr_ch)[0], self._convenc_numsyms),
dtype=tf.float32)
# define zero LLR's for termination info bits
term_info_bits = self._mu if self._terminate else 0
llr_terminfo = tf.zeros(
(tf.shape(llr_ch)[0], term_info_bits), tf.float32)
# needs to be initialized for XLA before entering the loop
llr_2i = tf.zeros_like(llr_ch2)
# run decoding loop
for _ in tf.range(self.num_iter):
# run 1st component decoder
llr_1i = self.bcjrdecoder((y1_cw, llr_1e))
llr_1i = llr_1i[...,:self._k]
llr_extr = llr_1i - llr_ch - llr_1e[...,:self._k]
#llr_extr = llr_1i - llr_1e[...,:self._k]
llr_2e = self.internal_interleaver(llr_extr)
llr_2e = tf.concat([llr_2e, llr_terminfo], axis=-1)
llr_2e = tf.clip_by_value(llr_2e,
clip_value_min=-llr_max,
clip_value_max=llr_max)
# run 2nd component decoder
llr_2i = self.bcjrdecoder((y2_cw, llr_2e))
llr_2i = llr_2i[...,:self._k]
llr_extr = llr_2i - llr_2e[...,:self._k] - llr_ch2
#llr_extr = llr_2i - llr_2e[...,:self._k]
llr_1e = self.internal_interleaver.call_inverse(llr_extr)
llr_1e = tf.clip_by_value(llr_1e,
clip_value_min=-llr_max,
clip_value_max=llr_max)
llr_1e = tf.concat([llr_1e, llr_terminfo], axis=-1)
# use latest output of 2nd decoder
output = self.internal_interleaver.call_inverse(llr_2i)
if self._hard_out: # hard decide decoder output if required
output = tf.less(0.0, output)
output = tf.cast(output, self._output_dtype)
output_reshaped = tf.reshape(output, output_shape)
return output_reshaped