# pylint: disable=line-too-long
#
# SPDX-FileCopyrightText: Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0#
"""
Link adaptation for Sionna SYS
"""
import tensorflow as tf
from sionna.phy import Block
from sionna.phy.utils import find_true_position, insert_dims, \
scalar_to_shaped_tensor, tensor_values_are_in_set, \
lin_to_db, db_to_lin
from sionna.sys.utils import is_scheduled_in_slot
[docs]
class InnerLoopLinkAdaptation(Block):
# pylint: disable=line-too-long
r"""
Class for inner loop link adaptation (ILLA). It computes the highest
available modulation and coding scheme (MCS) whose
associated transport block error rate (TBLER) does not exceed the specified
``bler_target``:
.. math::
\max \left\{ \text{MCS}: \ \text{TBLER}(\text{MCS}, \text{SINR}_{\text{eff}}) \le \text{BLER}_{\text{target}} \right\}
where :math:`\text{SINR}_{\text{eff}}` is the effective SINR value provided
as input.
If no such MCS exists, the lowest available MCS index is returned. If a user
is not scheduled, ``fill_mcs_value`` is returned.
Parameters
----------
phy_abstraction : :class:`~sionna.sys.PHYAbstraction`
An instance of :class:`~sionna.sys.PHYAbstraction`
bler_target : `float` (default: 0.1)
BLER target
fill_mcs_value : `int` (default: 0)
MCS value assigned to non-scheduled users
Input
-----
sinr : [..., num_ofdm_symbols, num_subcarriers, num_ut, num_streams_per_ut], `tf.float` | `None` (default)
SINR for each OFDM symbol, subcarrier, user and stream.
If `None`, then ``sinr_eff`` and ``num_allocated_re`` are both
required.
sinr_eff : [..., num_ut], `tf.float` | `None` (default)
Estimated effective SINR for each user.
If `None`, then ``sinr`` is required.
num_allocated_re : [..., num_ut], `tf.int32` | `None` (default)
Number of allocated resources in a slot, computed across OFDM symbols,
subcarriers and streams, for each user.
If `None`, then ``sinr`` is required.
mcs_table_index : [..., num_ut], `tf.int32` | `int` (default: 1)
MCS table index for each user. For further details, refer to the
:ref:`mcs_table_cat_note`.
mcs_category : [..., num_ut], `tf.int32` | `int` (default: 0)
MCS table category for each user. For further details, refer to the
:ref:`mcs_table_cat_note`.
return_lowest_available_mcs : `bool` (default: `False`)
If `True`, the lowest MCS available in ``phy_abstraction`` BLER tables
is returned for each user. Only used for internal purposes.
Output
------
mcs_index : [..., num_ut]
Highest available MCS whose BLER does not exceed the target, or the
lowest available MCS if no such MCS exists, for each user
Example
-------
.. code-block:: python
from sionna.sys import PHYAbstraction, InnerLoopLinkAdaptation
bler_target = 0.1
# Initialize the PHY abstraction object
phy_abs = PHYAbstraction()
# Initialize the ILLA object
illa = InnerLoopLinkAdaptation(phy_abs,
bler_target=0.1)
# Effective SINR for each user
sinr_eff = tf.Variable([0.1, 10, 100])
# N. allocated resource elements for each user
num_allocated_re = tf.Variable([20, 30, 30])
# Compute the MCS index for each user
mcs_index = illa(sinr_eff=sinr_eff,
num_allocated_re=num_allocated_re,
mcs_table_index=1,
mcs_category=0)
print('Selected MCS index =', mcs_index.numpy())
% Selected MCS index = [ 3 16 27]
"""
def __init__(self,
phy_abstraction,
bler_target=0.1,
fill_mcs_value=0):
super().__init__(precision=phy_abstraction.precision)
self._phy_abstraction = phy_abstraction
self._fill_mcs_value = tf.cast(fill_mcs_value, tf.int32)
self._bler_target = tf.Variable(tf.cast(bler_target, self.rdtype))
@property
def bler_target(self):
r"""
`tf.float` : Get/set the BLER target for each user
"""
return self._bler_target
@bler_target.setter
def bler_target(self, value):
self._bler_target.assign(tf.cast(value, self.rdtype))
def call(self,
sinr=None,
sinr_eff=None,
num_allocated_re=None,
mcs_table_index=1,
mcs_category=0,
return_lowest_available_mcs=False,
**kwargs):
tf.debugging.assert_equal(
(sinr is not None) ^
((sinr_eff is not None) and (num_allocated_re is not None)),
True,
message="Either 'sinr' or "
"('sinr_eff','num_allocated_re') is required as input")
# N. available MCS indices
num_mcs = self._phy_abstraction.bler_table_interp.shape[2]
# Check which UTs are scheduled
ut_is_scheduled = is_scheduled_in_slot(
sinr=sinr,
num_allocated_re=num_allocated_re)
# Cast and reshape inputs
if sinr is not None:
sinr = tf.cast(sinr, self.rdtype)
batch_dims = sinr.shape[:-4]
num_ut = sinr.shape[-2]
else:
sinr_eff = tf.cast(sinr_eff, self.rdtype)
batch_dims = sinr_eff.shape[:-1]
num_ut = sinr_eff.shape[-1]
# ----------------------- #
# Tile across MCS indices #
# ----------------------- #
# [..., num_mcs, num_ut]
mcs_index_all = tf.range(num_mcs, dtype=tf.int32)
mcs_index_all = insert_dims(mcs_index_all, len(batch_dims),
axis=0)[..., tf.newaxis]
mcs_index_all = tf.tile(mcs_index_all, batch_dims + [1, num_ut])
# [..., num_mcs, num_ut]
mcs_table_index = scalar_to_shaped_tensor(
mcs_table_index, tf.int32, batch_dims + [num_ut])
mcs_table_index = tf.tile(
tf.expand_dims(mcs_table_index, axis=-2),
[1]*(len(mcs_table_index.shape)-1) + [num_mcs, 1])
# [..., num_mcs, num_ut]
mcs_category = scalar_to_shaped_tensor(
mcs_category, tf.int32, batch_dims + [num_ut])
mcs_category = tf.tile(
tf.expand_dims(mcs_category, axis=-2),
[1]*(len(mcs_category.shape)-1) + [num_mcs, 1])
if num_allocated_re is not None:
# [..., num_ut]
num_allocated_re = tf.cast(num_allocated_re, tf.int32)
num_allocated_re = tf.expand_dims(
num_allocated_re, axis=-2)
num_allocated_re = tf.tile(num_allocated_re,
[1]*len(batch_dims) + [num_mcs, 1])
# -------------- #
# Effective SINR #
# -------------- #
# Expand across all possible MCS indices
if sinr is not None:
# [..., num_mcs, num_ofdm_symbols, num_subcarriers, num_ut, num_streams_per_ut]
sinr = tf.expand_dims(sinr, axis=-5)
sinr = tf.tile(
sinr, [1]*len(batch_dims) + [num_mcs] + [1]*4)
else:
# [..., num_mcs, num_ut]
sinr_eff = tf.expand_dims(sinr_eff, axis=-2)
sinr_eff = tf.tile(sinr_eff,
[1]*len(batch_dims) + [num_mcs, 1])
# ----- #
# TBLER #
# ----- #
# [..., num_mcs, num_ut]
*_, tbler_per_mcs, _ = self._phy_abstraction(
mcs_index_all,
sinr=sinr,
sinr_eff=sinr_eff,
num_allocated_re=num_allocated_re,
mcs_table_index=mcs_table_index,
mcs_category=mcs_category,
check_mcs_index_validity=False)
# ---------- #
# Select MCS #
# ---------- #
# Check that each user has at least one available MCS
num_unavailable_mcs = tf.reduce_sum(
tf.cast(tbler_per_mcs > 1, tf.int32), axis=-2)
tf.debugging.assert_equal(
tf.reduce_any(num_unavailable_mcs == num_mcs),
False,
message='No MCS index available for some users')
# Find the highest MCS with BLER <= bler_target
# If no such MCS is found, then returns -1
# [..., num_ut]
mcs_index = find_true_position(
tbler_per_mcs <= self.bler_target,
side='last',
axis=-2)
# Lowest available MCS
# [..., num_ut]
lowest_available_mcs = find_true_position(
(tbler_per_mcs >= 0) & (tbler_per_mcs <= 1),
side='first',
axis=-2)
# If all MCS have BLER > bler_target, select lowest available MCS
mcs_index = tf.where(mcs_index != -1,
mcs_index,
lowest_available_mcs)
# A non-scheduled user receives MCS=_fill_mcs_value
mcs_index = tf.where(ut_is_scheduled,
mcs_index,
self._fill_mcs_value)
if return_lowest_available_mcs:
return mcs_index, lowest_available_mcs
return mcs_index
[docs]
class OuterLoopLinkAdaptation(Block):
# pylint: disable=line-too-long
r"""
Class for outer-loop link adaptation (OLLA).
The modulation and coding scheme (MCS) index for a user is determined as the
highest index whose corresponding transport block error rate (TBLER) remains
below the specified ``bler_target``.
The SINR value used for TBLER computation is given by the last effective
SINR feedback, :math:`\text{SINR}_{\text{eff}}` [dB], reduced by an offset
value, :math:`\Delta_{\mathrm{offset}}`:
.. math::
\max \left\{ \text{MCS}: \ \text{TBLER}(\text{MCS}, \text{SINR}_{\text{eff}}-\Delta_{\text{offset}}) \le \text{BLER}_{\text{target}} \right\}
The value of :math:`\Delta_{\text{offset}}` is adjusted depending on the HARQ feedback [Pedersen05]_:
.. math::
\Delta_{\mathrm{offset}} = \left\{
\begin{array}{l}
\Delta_{\mathrm{offset}} - \Delta_{\mathrm{down}} \quad \text{if HARQ=ACK} \\
\Delta_{\mathrm{offset}} + \Delta_{\mathrm{up}} \quad \text{if HARQ=NACK}
\end{array}
\right.
where the relationship between
:math:`\Delta_{\mathrm{up}}` and :math:`\Delta_{\mathrm{down}}` is given by
[Sampath97]_:
.. math::
\frac{\Delta_{\mathrm{up}}}{\Delta_{\mathrm{down}}} = \frac{1 - \mathrm{BLER}_{\mathrm{target}}}{\mathrm{BLER}_{\mathrm{target}}}.
Parameters
----------
phy_abstraction : :class:`~sionna.sys.PHYAbstraction`
An instance of :class:`~sionna.sys.PHYAbstraction`
num_ut : `int`
Number of user terminals
bler_target : `float` (default: 0.1)
BLER target value, within 0 and 1
delta_up : `float` (default: 1.)
Increment applied to the SINR offset [dB] when a NACK is received for a
user
batch_size : `list` | `int` | `None` (default)
Batch size or shape. It accounts for multiple users for which link
adaptation is performed simultaneously. If `None`, the batch size is
set to [].
sinr_eff_init : [..., num_ut], `tf.float` | `float` (default: 1.)
Initial value of effective SINR for each user. Non-positive values are
treated as missing and replaced by ``sinr_eff_init_fill``.
If `float`, the same value is assigned to all users.
sinr_eff_init_fill : `float` (default: 1.)
Value replacing non-positive ``sinr_eff_init`` values
offset_min : `float` (default: -20.)
Minimum SINR [dB] offset value
offset_max : `float` (default: 20.)
Maximum SINR [dB] offset value
Input
-----
num_allocated_re : [..., num_ut], `tf.int32`
Number of allocated resources in the upcoming slot, computed across OFDM
symbols, subcarriers and streams, for each user
harq_feedback : [..., num_ut], -1 | 0 | 1
If 0 (1, resp.), then a NACK (ACK, resp.) is received. If -1, feedback
is missing.
sinr_eff : [..., num_ut], `tf.float` | `None` (default)
Estimated effective SINR for each user. Non-positive values are treated as
missing.
mcs_table_index : [..., num_ut], `tf.int32` | `int` (default: 1)
MCS table index for each user. For further details, refer to the
:ref:`mcs_table_cat_note`.
mcs_category : [..., num_ut], `tf.int32` | `int` (default: 0)
MCS table category for each user. For further details, refer to the
:ref:`mcs_table_cat_note`.
Output
------
mcs_index : [..., num_ut]
Selected MCS index for each user
"""
def __init__(self,
phy_abstraction,
num_ut,
bler_target=0.1,
delta_up=1.,
batch_size=None,
sinr_eff_init=1.,
sinr_eff_init_fill=1.,
offset_min=-20.,
offset_max=20.):
super().__init__(precision=phy_abstraction.precision)
tf.debugging.assert_non_negative(
sinr_eff_init_fill,
message="'sinr_eff_init_fill' must be positive")
if batch_size is None:
batch_size = []
elif (not isinstance(batch_size, list)) and \
(isinstance(batch_size, int) or (len(batch_size) == 0)):
batch_size = [batch_size]
self._batch_size = batch_size
self._num_ut = num_ut
self._phy_abstraction = phy_abstraction
self._illa = InnerLoopLinkAdaptation(phy_abstraction,
bler_target=bler_target)
self._bler_target = tf.Variable(tf.cast(bler_target, self.rdtype))
self._delta_up = tf.Variable(tf.cast(delta_up, self.rdtype))
self._delta_down = tf.Variable(self._get_delta_down())
self._sinr_eff_db_last = None
self._offset = None
self._offset_min = tf.Variable(tf.cast(offset_min, self.rdtype))
self._offset_max = tf.Variable(tf.cast(offset_max, self.rdtype))
sinr_eff_init = scalar_to_shaped_tensor(
sinr_eff_init,
self.rdtype,
self._batch_size + [self._num_ut])
sinr_eff_init_fill = tf.cast(sinr_eff_init_fill, self.rdtype)
# Convert effective SINR to dB and fill N/A values (<=0)
self._sinr_eff_db_last = tf.Variable(
tf.where(
sinr_eff_init > 0,
lin_to_db(sinr_eff_init, precision=self.precision),
lin_to_db(sinr_eff_init_fill, precision=self.precision)))
# Reset SINR offset to 0
self._offset = tf.Variable(
tf.zeros(sinr_eff_init.shape, dtype=self.rdtype))
def _get_delta_down(self):
return tf.cast(
self.delta_up * self.bler_target / (1 - self.bler_target),
self.rdtype)
[docs]
def reset(self,
sinr_eff_init=1.,
sinr_eff_init_fill=.1):
"""
Resets the values of ``sinr_eff_db_last`` and ``offset``
"""
sinr_eff_init = scalar_to_shaped_tensor(
sinr_eff_init,
self.rdtype,
self._batch_size + [self._num_ut])
sinr_eff_init_fill = tf.cast(sinr_eff_init_fill, self.rdtype)
# Convert effective SINR to dB and fill N/A values (<=0)
self._sinr_eff_db_last.assign(
tf.where(
sinr_eff_init > 0,
lin_to_db(sinr_eff_init, precision=self.precision),
lin_to_db(sinr_eff_init_fill, precision=self.precision)))
# Reset SINR offset to 0
self._offset.assign(
tf.zeros(sinr_eff_init.shape, dtype=self.rdtype))
@property
def offset(self):
r"""
[..., num_ut], `tf.float` (read-only) : Effective SINR [dB] offset for each user
"""
return self._offset
@property
def offset_max(self):
"""
`tf.float` : Get/set the maximum ``offset`` value
"""
return self._offset_max
@offset_max.setter
def offset_max(self, value):
self._offset_max.assign(tf.cast(value, self.rdtype))
@property
def offset_min(self):
"""
`tf.float` : Get/set the minimum ``offset`` value
"""
return self._offset_min
@offset_min.setter
def offset_min(self, value):
self._offset_min.assign(tf.cast(value, self.rdtype))
@property
def bler_target(self):
r"""
`tf.float` : Get/set the BLER target for each user
"""
return self._bler_target
@bler_target.setter
def bler_target(self, value):
self._bler_target.assign(tf.cast(value, self.rdtype))
self._delta_down.assign(self._get_delta_down())
@property
def sinr_eff_db_last(self):
r"""
[..., num_ut], `tf.float` : Get/set the last observed effective SINR
[dB] value for each user
"""
return self._sinr_eff_db_last
@sinr_eff_db_last.setter
def sinr_eff_db_last(self, value):
self._sinr_eff_db_last.assign(tf.cast(value, self.rdtype))
@property
def delta_down(self):
r"""
`float` (read-only) : Decrement applied to the SINR offset when an
ACK is received for a user. Computed as ``delta_up * bler_target
/ (1 - bler_target)``.
"""
return self._delta_down
@property
def delta_up(self):
r"""
`float` : Get/set the increment applied to the SINR offset when a
NACK is received for a user
"""
return self._delta_up
@delta_up.setter
def delta_up(self, value):
tf.debugging.assert_positive(
value,
message="'delta_up' must be positive")
self._delta_up.assign(tf.cast(value, self.rdtype))
self._delta_down.assign(self._get_delta_down())
def call(self,
num_allocated_re,
harq_feedback=None,
sinr_eff=None,
mcs_table_index=1,
mcs_category=0):
tf.debugging.assert_equal(
tensor_values_are_in_set(harq_feedback, [-1, 0, 1]),
True,
message="'harq_feedback' must contain values in "
"[-1 (N/A), 0 (NACK), 1 (ACK)]")
# Cast and reshape inputs
shape = num_allocated_re.shape
if harq_feedback is None:
harq_feedback = tf.cast(tf.fill(shape, -1), tf.int32)
else:
harq_feedback = tf.cast(harq_feedback, tf.int32)
if sinr_eff is None:
sinr_eff = tf.zeros(shape, dtype=self.rdtype)
else:
sinr_eff = tf.cast(sinr_eff, self.rdtype)
num_allocated_re = tf.cast(num_allocated_re, tf.int32)
mcs_table_index = scalar_to_shaped_tensor(mcs_table_index,
tf.int32,
shape)
mcs_category = scalar_to_shaped_tensor(mcs_category,
tf.int32,
shape)
# ---------------------------- #
# Update effective SINR offset #
# ---------------------------- #
self._offset.assign(
tf.where(harq_feedback == 1,
self._offset - self._delta_down,
tf.where(harq_feedback == 0,
self._offset + self._delta_up,
self._offset)))
# Project offset to [offset_min; offset_max]
self._offset.assign(tf.maximum(self._offset, self._offset_min))
self._offset.assign(tf.minimum(self._offset, self._offset_max))
# ----------------------------------- #
# Update last observed effective SINR #
# ----------------------------------- #
self.sinr_eff_db_last = tf.where(sinr_eff > 0,
lin_to_db(sinr_eff,
precision=self.precision),
self._sinr_eff_db_last)
# -------------------------- #
# Offset SINR and apply ILLA #
# -------------------------- #
sinr_eff_offset = db_to_lin(self._sinr_eff_db_last - self._offset,
precision=self.precision)
mcs_index = self._illa(
sinr_eff=sinr_eff_offset,
num_allocated_re=num_allocated_re,
mcs_table_index=mcs_table_index,
mcs_category=mcs_category,
return_lowest_available_mcs=False)
return mcs_index