Source code for sionna.sys.scheduling

#
# SPDX-FileCopyrightText: Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0#
"""
Scheduling algorithms for Sionna SYS
"""


import tensorflow as tf
from sionna.phy import Block
from sionna.phy.utils import insert_dims


[docs] class PFSchedulerSUMIMO(Block): # pylint: disable=line-too-long r""" Schedules users according to a proportional fairness (PF) metric in a single-user (SU) multiple-input multiple-output (MIMO) system, i.e., at most one user is scheduled per time-frequency resource. Fixing the time slot :math:`t`, :math:`\tilde{R}_t(u,i)` is the :emphasis:`achievable` rate for user :math:`u` on the time-frequency resource :math:`i` during the current slot. Let :math:`T_{t-1}(u)` denote the throughput :emphasis:`achieved` by user :math:`u` up to and including slot :math:`t-1`. Resource :math:`i` is assigned to the user with the highest PF metric, as defined in [Jalali00]_: .. math:: \operatorname{argmax}_{u} \frac{\tilde{R}_{t}(u,i)}{T_{t-1}(u)}. All streams within a scheduled resource element are assigned to the selected user. Let :math:`R_t(u)` be the rate achieved by user :math:`u` in slot :math:`t`. The throughput :math:`T` by each user :math:`u` is updated via geometric discounting: .. math:: T_t(u) = \beta \, T_{t-1}(u) + (1-\beta) \, R_t(u) where :math:`\beta\in(0,1)` is the discount factor. Parameters ---------- num_ut : `int` Number of user terminals num_freq_res : `int` Number of available frequency resources. A frequency resource is the smallest frequency unit that can be allocated to a user, typically a physical resource block (PRB). num_ofdm_sym : `int` Number of OFDM symbols in a slot batch_size : `list` | `int` | `None` (default) Batch size or shape. It can account for multiple sectors in which scheduling is performed simultaneously. If `None`, the batch size is set to []. num_streams_per_ut : `int` (default: 1) Number of streams per user beta : `float` (default: 0.98) Discount factor for computing the time-averaged achieved rate. Must be within (0,1). precision : `None` (default) | "single" | "double" Precision used for internal calculations and outputs. If set to `None`, :attr:`~sionna.phy.config.Config.precision` is used. Input ----- rate_last_slot : [batch_size, num_ut] Rate achieved by each user in the last slot rate_achievable_curr_slot : [batch_size, num_ofdm_sym, num_freq_res, num_ut], `tf.float` Achievable rate for each user across the OFDM grid in the current slot Output ------ is_scheduled: [batch_size, num_ofdm_sym, num_freq_res, num_ut, num_streams_per_ut], `tf.bool` Whether a user is scheduled for transmission for each available resource """ def __init__(self, num_ut, num_freq_res, num_ofdm_sym, batch_size=None, num_streams_per_ut=1, beta=.98, precision=None): super().__init__(precision=precision) if batch_size is None: batch_size = [] elif (not isinstance(batch_size, list)) and \ (isinstance(batch_size, int) or (len(batch_size) == 0)): batch_size = [batch_size] self._batch_size = batch_size self._num_ut = int(num_ut) self._num_freq_res = int(num_freq_res) self._num_ofdm_sym = int(num_ofdm_sym) self._num_streams_per_ut = int(num_streams_per_ut) self.beta = beta self._rate_achieved_past = tf.Variable( tf.cast(tf.fill(list(batch_size) + [num_ut], 1), self.rdtype)) self._pf_metric = tf.Variable( tf.zeros(list(batch_size) + [num_ofdm_sym, num_freq_res, num_ut], self.rdtype)) @property def rate_achieved_past(self): r""" [batch_size, num_ut], `tf.float` (read-only) : :math:`\beta`-discounted time-averaged achieved rate for each user """ return self._rate_achieved_past @property def pf_metric(self): r""" [batch_size, num_ofdm_sym, num_freq_res, num_ut], `tf.float` (read-only) : Proportional fairness (PF) metric in the last slot """ return self._pf_metric @property def beta(self): r""" `float`: Get/set the discount factor for computing the time-averaged achieved rate. Must be within (0,1). """ return self._beta @beta.setter def beta(self, value): tf.debugging.assert_equal( 0. < value < 1., True, message="Discount factor 'beta' must be within (0;1)") self._beta = tf.cast(value, self.rdtype) def call(self, rate_last_slot, rate_achievable_curr_slot): # ------------------------ # # Validate and cast inputs # # ------------------------ # tf.debugging.assert_equal( rate_last_slot.shape, self._batch_size + [self._num_ut], message="Inconsistent 'rate_last_slot' shape") tf.debugging.assert_equal( rate_achievable_curr_slot.shape, self._batch_size + [self._num_ofdm_sym, self._num_freq_res, self._num_ut], message="Inconsistent 'rate_achievable_curr_slot' shape") # [batch_size, num_ut] rate_last_slot = tf.cast(rate_last_slot, self.rdtype) # [batch_size, num_ofdm_sym, num_ut, num_freq_res] rate_achievable_curr_slot = tf.cast(rate_achievable_curr_slot, self.rdtype) # ---------------------------- # # Update average achieved rate # # ---------------------------- # # [batch_size, num_ut] self._rate_achieved_past.assign( self.beta * self._rate_achieved_past + (1 - self.beta) * rate_last_slot) # [batch_size, 1, 1, num_ut] rate_achieved_past = insert_dims(self._rate_achieved_past, 2, axis=-2) # ----------------- # # Compute PF metric # # ----------------- # # [batch_size, num_ofdm_sym, num_freq_res, num_ut] self._pf_metric.assign( rate_achievable_curr_slot / rate_achieved_past) # ------------ # # Schedule UTs # # ------------ # # Assign each time/frequency resource to the user with highest PF metric # [batch_size, num_ofdm_sym, num_freq_res] scheduled_ut = tf.argmax(self._pf_metric, axis=-1) # [batch_size, num_ofdm_sym, num_freq_res, num_ut] is_scheduled = tf.one_hot(scheduled_ut, depth=self._num_ut) # [batch_size, num_ofdm_sym, num_freq_res, num_ut, 1] is_scheduled = tf.expand_dims(is_scheduled, axis=-1) # [batch_size, num_ofdm_sym, num_freq_res, num_ut, num_streams] is_scheduled = tf.tile(is_scheduled, [1]*(3 + len(self._batch_size)) + [self._num_streams_per_ut]) return tf.cast(is_scheduled, tf.bool)