Source code for sionna.rt.paths

#
# SPDX-FileCopyrightText: Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
"""
Dataclass that stores paths
"""

import tensorflow as tf
import os
import numpy as np

from . import scene as scene_module
from sionna.utils.tensors import expand_to_rank, insert_dims
from sionna.constants import PI
from .utils import dot, r_hat

[docs]class Paths: # pylint: disable=line-too-long r""" Paths() Stores the simulated propagation paths Paths are generated for the loaded scene using :meth:`~sionna.rt.Scene.compute_paths`. Please refer to the documentation of this function for further details. These paths can then be used to compute channel impulse responses: .. code-block:: Python paths = scene.compute_paths() a, tau = paths.cir() where ``scene`` is the :class:`~sionna.rt.Scene` loaded using :func:`~sionna.rt.load_scene`. """ # Input # ------ # mask : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.bool # Set to `False` for non-existent paths. # When there are multiple transmitters or receivers, path counts may vary between links. This is used to identify non-existent paths. # For such paths, the channel coefficient is set to `0` and the delay to `-1`. # a : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, num_time_steps], tf.complex # Channel coefficients :math:`a_i` as defined in :eq:`T_tilde`. # If there are less than `max_num_path` valid paths between a # transmit and receive antenna, the irrelevant elements are # filled with zeros. # tau : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float # Propagation delay of each path [s]. # If :attr:`~sionna.rt.Scene.synthetic_array` is `True`, the shape of this tensor # is `[1, num_rx, num_tx, max_num_paths]` as the delays for the # individual antenna elements are assumed to be equal. # If there are less than `max_num_path` valid paths between a # transmit and receive antenna, the irrelevant elements are # filled with -1. # theta_t : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float # Zenith angles of departure :math:`\theta_{\text{T},i}` [rad]. # If :attr:`~sionna.rt.Scene.synthetic_array` is `True`, the shape of this tensor # is `[1, num_rx, num_tx, max_num_paths]` as the angles for the # individual antenna elements are assumed to be equal. # phi_t : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float # Azimuth angles of departure :math:`\varphi_{\text{T},i}` [rad]. # See description of ``theta_t``. # theta_r : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float # Zenith angles of arrival :math:`\theta_{\text{R},i}` [rad]. # See description of ``theta_t``. # phi_r : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float # Azimuth angles of arrival :math:`\varphi_{\text{T},i}` [rad]. # See description of ``theta_t``. # types : [batch_size, max_num_paths], tf.int # Type of path: # - 0 : LoS # - 1 : Reflected # - 2 : Diffracted # - 3 : Scattered # Types of paths LOS = 0 SPECULAR = 1 DIFFRACTED = 2 SCATTERED = 3 RIS = 4 def __init__(self, sources, targets, scene, types=None): dtype = scene.dtype num_sources = sources.shape[0] num_targets = targets.shape[0] rdtype = dtype.real_dtype self._a = tf.zeros([num_targets, num_sources, 0], dtype) self._tau = tf.zeros([num_targets, num_sources, 0], rdtype) self._theta_t = tf.zeros([num_targets, num_sources, 0], rdtype) self._theta_r = tf.zeros([num_targets, num_sources, 0], rdtype) self._phi_t = tf.zeros([num_targets, num_sources, 0], rdtype) self._phi_r = tf.zeros([num_targets, num_sources, 0], rdtype) self._mask = tf.fill([num_targets, num_sources, 0], False) self._targets_sources_mask = tf.fill([num_targets, num_sources, 0], False) self._vertices = tf.zeros([0, num_targets, num_sources, 0, 3], rdtype) self._objects = tf.fill([0, num_targets, num_sources, 0], -1) self._doppler = tf.zeros([num_targets, num_sources, 0], rdtype) if types is None: self._types = tf.fill([0], -1) else: self._types = types self._sources = sources self._targets = targets self._scene = scene # Is the direction reversed? self._reverse_direction = False # Normalize paths delays? self._normalize_delays = False
[docs] def to_dict(self): # pylint: disable=line-too-long r""" Returns the properties of the paths as a dictionary which values are tensors Output ------- : `dict` """ members_names = dir(self) members_objects = [getattr(self, attr) for attr in members_names] data = {attr_name[1:] : attr_obj for (attr_obj, attr_name) in zip(members_objects,members_names) if not callable(attr_obj) and not isinstance(attr_obj, scene_module.Scene) and not attr_name.startswith("__") and attr_name.startswith("_")} return data
[docs] def from_dict(self, data_dict): # pylint: disable=line-too-long r""" Set the paths from a dictionary which values are tensors The format of the dictionary is expected to be the same as the one returned by :meth:`~sionna.rt.Paths.to_dict()`. Input ------ data_dict : `dict` """ for attr_name in data_dict: attr_obj = data_dict[attr_name] setattr(self, '_' + attr_name, attr_obj)
[docs] def export(self, filename): r""" export(filename) Saves the paths as an OBJ file for visualisation, e.g., in Blender Input ------ filename : str Path and name of the file """ vertices = self.vertices objects = self.objects sources = self.sources targets = self.targets mask = self.targets_sources_mask # Content of the obj file r = '' offset = 0 for rx in range(vertices.shape[1]): tgt = targets[rx].numpy() for tx in range(vertices.shape[2]): src = sources[tx].numpy() for p in range(vertices.shape[3]): # If the path is masked, skip it if not mask[rx,tx,p]: continue # Add a comment to describe this path r += f'# Path {p} from tx {tx} to rx {rx}' + os.linesep # Vertices and intersected objects vs = vertices[:,rx,tx,p].numpy() objs = objects[:,rx,tx,p].numpy() depth = 0 # First vertex is the source r += f"v {src[0]:.8f} {src[1]:.8f} {src[2]:.8f}"+os.linesep # Add intersection points for v,o in zip(vs,objs): # Skip if no intersection if o == -1: continue r += f"v {v[0]:.8f} {v[1]:.8f} {v[2]:.8f}" + os.linesep depth += 1 r += f"v {tgt[0]:.8f} {tgt[1]:.8f} {tgt[2]:.8f}"+os.linesep # Add the connections for i in range(1, depth+2): v0 = i + offset v1 = i + offset + 1 r += f"l {v0} {v1}" + os.linesep # Prepare for the next path r += os.linesep offset += depth+2 # Save the file # pylint: disable=unspecified-encoding with open(filename, 'w') as f: f.write(r)
@property def mask(self): # pylint: disable=line-too-long """ [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.bool : Set to `False` for non-existent paths. When there are multiple transmitters or receivers, path counts may vary between links. This is used to identify non-existent paths. For such paths, the channel coefficient is set to `0` and the delay to `-1`. """ return self._mask @mask.setter def mask(self, v): self._mask = v @property def a(self): # pylint: disable=line-too-long """ [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, num_time_steps], tf.complex : Passband channel coefficients :math:`a_i` of each path as defined in :eq:`H_final`. """ return self._a @a.setter def a(self, v): self._a = v @property def tau(self): # pylint: disable=line-too-long """ [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Propagation delay :math:`\\tau_i` [s] of each path as defined in :eq:`H_final`. """ return self._tau @tau.setter def tau(self, v): self._tau = v @property def theta_t(self): # pylint: disable=line-too-long """ [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Zenith angles of departure [rad] """ return self._theta_t @theta_t.setter def theta_t(self, v): self._theta_t = v @property def phi_t(self): # pylint: disable=line-too-long """ [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Azimuth angles of departure [rad] """ return self._phi_t @phi_t.setter def phi_t(self, v): self._phi_t = v @property def theta_r(self): # pylint: disable=line-too-long """ [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Zenith angles of arrival [rad] """ return self._theta_r @theta_r.setter def theta_r(self, v): self._theta_r = v @property def phi_r(self): # pylint: disable=line-too-long """ [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Azimuth angles of arrival [rad] """ return self._phi_r @phi_r.setter def phi_r(self, v): self._phi_r = v @property def types(self): """ [batch_size, max_num_paths], tf.int : Type of the paths: - 0 : LoS - 1 : Reflected - 2 : Diffracted - 3 : Scattered - 4 : RIS """ return self._types @types.setter def types(self, v): self._types = v @property def sources(self): # pylint: disable=line-too-long """ [num_sources, 3], tf.float : Sources from which rays (paths) are emitted """ return self._sources @sources.setter def sources(self, v): self._sources = v @property def targets(self): # pylint: disable=line-too-long """ [num_targets, 3], tf.float : Targets at which rays (paths) are received """ return self._targets @targets.setter def targets(self, v): self._targets = v @property def normalize_delays(self): """ bool : Set to `True` to normalize path delays such that the first path between any pair of antennas of a transmitter and receiver arrives at ``tau = 0``. Defaults to `True`. """ return self._normalize_delays @normalize_delays.setter def normalize_delays(self, v): if v == self._normalize_delays: return if ~v and self._normalize_delays: self.tau += self._min_tau else: self.tau -= self._min_tau self.tau = tf.where(self.tau<0, tf.cast(-1, self.tau.dtype) , self.tau) self._normalize_delays = v @property def doppler(self): # pylint: disable=line-too-long """ [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Doppler shift for each path related to movement of objects. The Doppler shifts resulting from movements of the transmitters or receivers will be computed from the inputs to the function :func:`~sionna.rt.Paths.apply_doppler`. """ return self._doppler @doppler.setter def doppler(self, v): self._doppler = v
[docs] def apply_doppler(self, sampling_frequency, num_time_steps, tx_velocities=(0.,0.,0.), rx_velocities=(0.,0.,0.)): # pylint: disable=line-too-long r""" Apply Doppler shifts to all paths according to the velocities of objects in the scene as well as the provided transmitter and receiver velocities. This function replaces the last dimension of the tensor :attr:`~sionna.rt.Paths.a` storing the time evolution of the paths' coefficients with a dimension of size ``num_time_steps``. Time evolution of the channel coefficients is simulated by computing the Doppler shift due to movements of scene objects, transmitters, and receivers. To understand this process, let us consider a single propagation path undergoing :math:`n` scattering processes, such as reflection, diffuse scattering, or diffraction, as shown in the figure below. .. figure:: ../figures/doppler.png :align: center The object on which lies the :math:`i\text{th}` scattering point has the velocity vector :math:`\hat{\mathbf{v}}_i` and the outgoing ray direction at this point is denoted :math:`\hat{\mathbf{k}}_i`. The first and last point correspond to the transmitter and receiver, respectively. We therefore have .. math:: \hat{\mathbf{k}}_0 &= \hat{\mathbf{r}}(\theta_{\text{T}}, \varphi_{\text{T}})\\ \hat{\mathbf{k}}_{n} &= -\hat{\mathbf{r}}(\theta_{\text{R}}, \varphi_{\text{R}}) where :math:`(\theta_{\text{T}}, \varphi_{\text{T}})` are the AoDs, :math:`(\theta_{\text{R}}, \varphi_{\text{R}})` are the AoAs, and :math:`\hat{\mathbf{r}}(\theta,\varphi)` is defined in :eq:`spherical_vecs`. If the transmitter emits a signal with frequency :math:`f`, the receiver will observe the signal at frequency :math:`f'=f + f_\Delta`, where :math:`f_\Delta` is the Doppler shift, which can be computed as [Wiffen2018]_ .. math:: f' = f \prod_{i=0}^n \frac{1 - \frac{\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c}}{1 - \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i}{c}}. Under the assumption that :math:`\lVert \mathbf{v}_i \rVert\ll c`, we can apply the Taylor expansion :math:`(1-x)^{-1}\approx 1+x`, for :math:`x\ll 1`, to the previous equation to obtain .. math:: f' &\approx f \prod_{i=0}^n \left(1 - \frac{\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c}\right)\left(1 + \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i}{c}\right)\\ &\approx f \left(1 + \sum_{i=0}^n \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i -\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c} \right) where the second line results from ignoring terms in :math:`c^{-2}`. Solving for :math:`f_\Delta`, grouping terms with the same :math:`\mathbf{v}_i` together, and using :math:`f=c/\lambda`, we obtain .. math:: f_\Delta = \frac{1}{\lambda}\left(\mathbf{v}_{0}^\mathsf{T}\hat{\mathbf{k}}_0 - \mathbf{v}_{n+1}^\mathsf{T}\hat{\mathbf{k}}_n + \sum_{i=1}^n \mathbf{v}_{i}^\mathsf{T}\left(\hat{\mathbf{k}}_i-\hat{\mathbf{k}}_{i-1} \right) \right) \qquad \text{[Hz]}. Using this Doppler shift, the time-dependent path coefficient is computed as .. math :: a(t) = a e^{j2\pi f_\Delta t}. Note that this model is only valid as long as the AoDs, AoAs, and path delays do not change significantly. This is typically the case for very short time intervals. Large-scale mobility should be simulated by moving objects within the scene and recomputing the propagation paths. When this function is called multiple times, it overwrites the previous time step dimension. Input ------ sampling_frequency : float Frequency [Hz] at which the channel impulse response is sampled num_time_steps : int Number of time steps. tx_velocities : [batch_size, num_tx, 3] or broadcastable, tf.float | `None` Velocity vectors :math:`(v_\text{x}, v_\text{y}, v_\text{z})` of all transmitters [m/s]. Defaults to `[0,0,0]`. rx_velocities : [batch_size, num_tx, 3] or broadcastable, tf.float | `None` Velocity vectors :math:`(v_\text{x}, v_\text{y}, v_\text{z})` of all receivers [m/s]. Defaults to `[0,0,0]`. """ dtype = self._scene.dtype rdtype = dtype.real_dtype zeror = tf.zeros((), rdtype) two_pi = tf.cast(2.*PI, rdtype) tx_velocities = tf.cast(tx_velocities, rdtype) tx_velocities = expand_to_rank(tx_velocities, 3, 0) if tx_velocities.shape[2] != 3: raise ValueError("Last dimension of `tx_velocities` must equal 3") if rx_velocities is None: rx_velocities = [0.,0.,0.] rx_velocities = tf.cast(rx_velocities, rdtype) rx_velocities = expand_to_rank(rx_velocities, 3, 0) if rx_velocities.shape[2] != 3: raise ValueError("Last dimension of `rx_velocities` must equal 3") sampling_frequency = tf.cast(sampling_frequency, rdtype) if sampling_frequency <= 0.0: raise ValueError("The sampling frequency must be positive") num_time_steps = tf.cast(num_time_steps, tf.int32) if num_time_steps <= 0: msg = "The number of time samples must a positive integer" raise ValueError(msg) # Drop previous time step dimension, if any if tf.rank(self.a) == 7: self.a = self.a[...,0] # [batch_size, num_rx, num_tx, max_num_paths, 3] # or # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 3] k_t = r_hat(self.theta_t, self.phi_t) k_r = r_hat(self.theta_r, self.phi_r) if self._scene.synthetic_array: # [batch_size, num_rx, 1, num_tx, 1, max_num_paths, 3] k_t = tf.expand_dims(tf.expand_dims(k_t, axis=2), axis=4) # [batch_size, num_rx, 1, num_tx, 1, max_num_paths, 3] k_r = tf.expand_dims(tf.expand_dims(k_r, axis=2), axis=4) # Expand rank of the speed vector for broadcasting with k_r # [batch_dim, 1, 1, num_tx, 1, 1, 3] tx_velocities = insert_dims(insert_dims(tx_velocities, 2,1), 2,4) # [batch_dim, num_rx, 1, 1, 1, 1, 3] rx_velocities = insert_dims(rx_velocities, 4, 2) # Generate time steps # [num_time_steps] ts = tf.range(num_time_steps, dtype=rdtype) ts = ts / sampling_frequency # Compute the Doppler shift # [batch_dim, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] # or # [batch_dim, num_rx, 1, num_tx, 1, max_num_paths] tx_ds = two_pi*dot(tx_velocities, k_t)/self._scene.wavelength rx_ds = two_pi*dot(rx_velocities, k_r)/self._scene.wavelength ds = tx_ds + rx_ds # Add Doppler shifts due to movement of scene objects if self._scene.synthetic_array: # Create dummy dims for tx and rx antennas # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] ds += two_pi*self.doppler[..., tf.newaxis, :, tf.newaxis, :] else: ds += two_pi*self.doppler # Expand for the time sample dimension # [batch_dim, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 1] # or # [batch_dim, num_rx, 1, num_tx, 1, max_num_paths, 1] ds = tf.expand_dims(ds, axis=-1) # Expand time steps for broadcasting # [1, 1, 1, 1, 1, 1, num_time_steps] ts = expand_to_rank(ts, tf.rank(ds), 0) # [batch_dim, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, # num_time_steps] # or # [batch_dim, num_rx, 1, num_tx, 1, max_num_paths, num_time_steps] ds = ds*ts exp_ds = tf.exp(tf.complex(zeror, ds)) # Apply Doppler shift # Expand with time dimension # [batch_dim, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 1] a = tf.expand_dims(self.a, axis=-1) # Manual broadcast last dimension a = tf.repeat(a, exp_ds.shape[6], -1) a = a*exp_ds self.a = a
@property def reverse_direction(self): r""" bool : If set to `True`, swaps receivers and transmitters """ return self._reverse_direction @reverse_direction.setter def reverse_direction(self, v): if v == self._reverse_direction: return if tf.rank(self.a) == 6: self.a = tf.transpose(self.a, perm=[0,3,4,1,2,5]) else: self.a = tf.transpose(self.a, perm=[0,3,4,1,2,5,6]) if self._scene.synthetic_array: self.tau = tf.transpose(self.tau, perm=[0,2,1,3]) self._min_tau = tf.transpose(self._min_tau, perm=[0,2,1,3]) self.theta_t = tf.transpose(self.theta_t, perm=[0,2,1,3]) self.phi_t = tf.transpose(self.phi_t, perm=[0,2,1,3]) self.theta_r = tf.transpose(self.theta_r, perm=[0,2,1,3]) self.phi_r = tf.transpose(self.phi_r, perm=[0,2,1,3]) self.doppler = tf.transpose(self.doppler, perm=[0,2,1,3]) else: self.tau = tf.transpose(self.tau, perm=[0,3,4,1,2,5]) self._min_tau = tf.transpose(self._min_tau, perm=[0,3,4,1,2,5]) self.theta_t = tf.transpose(self.theta_t, perm=[0,3,4,1,2,5]) self.phi_t = tf.transpose(self.phi_t, perm=[0,3,4,1,2,5]) self.theta_r = tf.transpose(self.theta_r, perm=[0,3,4,1,2,5]) self.phi_r = tf.transpose(self.phi_r, perm=[0,3,4,1,2,5]) self.doppler = tf.transpose(self.doppler, perm=[0,3,4,1,2,5]) self._reverse_direction = v
[docs] def cir(self, los=True, reflection=True, diffraction=True, scattering=True, ris=True, cluster_ris_paths=True, num_paths=None): # pylint: disable=line-too-long r""" Returns the baseband equivalent channel impulse response :eq:`h_b` which can be used for link simulations by other Sionna components. The baseband equivalent channel coefficients :math:`a^{\text{b}}_{i}` are computed as : .. math:: a^{\text{b}}_{i} = a_{i} e^{-j2 \pi f \tau_{i}} where :math:`i` is the index of an arbitrary path, :math:`a_{i}` is the passband path coefficient (:attr:`~sionna.rt.Paths.a`), :math:`\tau_{i}` is the path delay (:attr:`~sionna.rt.Paths.tau`), and :math:`f` is the carrier frequency. Note: For the paths of a given type to be returned (LoS, reflection, etc.), they must have been previously computed by :meth:`~sionna.rt.Scene.compute_paths`, i.e., the corresponding flags must have been set to `True`. Input ------ los : bool If set to `False`, LoS paths are not returned. Defaults to `True`. reflection : bool If set to `False`, specular paths are not returned. Defaults to `True`. diffraction : bool If set to `False`, diffracted paths are not returned. Defaults to `True`. scattering : bool If set to `False`, scattered paths are not returned. Defaults to `True`. ris : bool If set to `False`, paths involving RIS are not returned. Defaults to `True`. cluster_ris_paths : bool If set to `True`, the paths from each RIS are coherently combined into a single path, and the delays are averaged. Note that this process is performed separately for each RIS. For large RIS, clustering the paths significantly reduces the memory required to run link-level simulations. Defaults to `True`. num_paths : int or `None` All CIRs are either zero-padded or cropped to the largest ``num_paths`` paths. Defaults to `None` which means that no padding or cropping is done. Output ------- a : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, num_time_steps], tf.complex Path coefficients tau : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float Path delays """ ris = ris and (len(self._scene.ris) > 0) # Select only the desired effects types = self.types[0] # [max_num_paths] selection_mask = tf.fill(tf.shape(types), False) if los: selection_mask = tf.logical_or(selection_mask, types == Paths.LOS) if reflection: selection_mask = tf.logical_or(selection_mask, types == Paths.SPECULAR) if diffraction: selection_mask = tf.logical_or(selection_mask, types == Paths.DIFFRACTED) if scattering: selection_mask = tf.logical_or(selection_mask, types == Paths.SCATTERED) if ris: if cluster_ris_paths: # Combine path coefficients from every RIS coherently and # average their delays. # This process is performed separately for each RIS. # # Extract paths coefficients and delays corresponding to RIS # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, # num_ris_paths, num_time_steps] a_ris = tf.gather(self.a, tf.where(types == Paths.RIS)[:,0], axis=-2) # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, # num_ris_paths] or [batch_size, num_rx, num_tx,num_ris_paths] tau_ris = tf.gather(self.tau, tf.where(types == Paths.RIS)[:,0], axis=-1) # [batch_size, num_rx, num_rx_ant/1, num_tx, num_tx_ant/1, # num_ris_paths] if self._scene.synthetic_array: tau_ris = tf.expand_dims(tau_ris, 2) tau_ris = tf.expand_dims(tau_ris, 4) # Loop over RIS to combine their path coefficients and delays index_start = 0 index_end = 0 a_combined_ris_all = [] tau_combined_ris_all = [] for ris_ in self._scene.ris.values(): index_end = index_start + ris_.num_cells # Extract the path coefficients and delays corresponding to # the paths from RIS # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, # num_this_ris_path, num_time_steps] a_this_ris = a_ris[..., index_start:index_end,:] # [batch_size, num_rx, num_rx_ant/1, num_tx, num_tx_ant/1, # num_this_ris_path] tau_this_ris = tau_ris[...,index_start:index_end] # Average the delays # [batch_size, num_rx, num_rx_ant/1, num_tx, num_tx_ant/1,1] mean_tau_this_ris = tf.reduce_mean(tau_this_ris, axis=-1, keepdims=True) # Phase shift due to propagation delay. # We subtract the average delay to ensure the propagation # delay is not applied, only the phase shift due to the # RIS geometry # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, # num_this_ris_path] tau_this_ris -= mean_tau_this_ris ps = tf.complex(tf.zeros_like(tau_this_ris), -2.*PI*self._scene.frequency*tau_this_ris) ps = ps[...,tf.newaxis] # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, # num_this_ris_path, num_time_steps] a_this_ris = a_this_ris*tf.exp(ps) # Combine the paths coefficients and delays # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 1, # num_time_steps] a_this_ris = tf.reduce_sum(a_this_ris, axis=-2, keepdims=True) # a_combined_ris_all.append(a_this_ris) tau_combined_ris_all.append(mean_tau_this_ris) # index_start = index_end # # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, num_ris, # num_time_steps] a_combined_ris_all = tf.concat(a_combined_ris_all, axis=-2) # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, num_ris] tau_combined_ris_all = tf.concat(tau_combined_ris_all, axis=-1) else: selection_mask = tf.logical_or(selection_mask, types == Paths.RIS) # Extract selected paths # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, # num_time_steps] a = tf.gather(self.a, tf.where(selection_mask)[:,0], axis=-2) # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] # or [batch_size, num_rx, num_tx, max_num_paths] tau = tf.gather(self.tau, tf.where(selection_mask)[:,0], axis=-1) if self._scene.synthetic_array: tau_ = tf.expand_dims(tau, 2) tau_ = tf.expand_dims(tau_, 4) else: tau_ = tau # If RIS paths were combined, add the results of the clustering if ris and cluster_ris_paths: # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, # max_num_paths, num_time_steps] a = tf.concat([a, a_combined_ris_all], axis=-2) # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, # max_num_paths] tau_ = tf.concat([tau_, tau_combined_ris_all], axis=-1) # Compute base-band CIR # [batch_size, num_rx, 1/num_rx_ant, num_tx, 1/num_tx_ant, # max_num_paths, num_time_steps, 1] tau_ = tf.expand_dims(tau_, -1) phase = tf.complex(tf.zeros_like(tau_), -2*PI*self._scene.frequency*tau_) # Manual repeat along the time step dimension as high-dimensional # broadcast is not possible phase = tf.repeat(phase, a.shape[-1], axis=-1) a = a*tf.exp(phase) if num_paths is not None: a, tau = self.pad_or_crop(a, tau, num_paths) return a,tau
####################################################### # Internal methods and properties ####################################################### @ property def targets_sources_mask(self): # pylint: disable=line-too-long """ [num_targets, num_sources, max_num_paths], tf.bool : Set to `False` for non-existent paths. When there are multiple transmitters or receivers, path counts may vary between links. This is used to identify non-existent paths. For such paths, the channel coefficient is set to `0` and the delay to `-1`. Same as `mask`, but for sources and targets. """ return self._targets_sources_mask @ targets_sources_mask.setter def targets_sources_mask(self, v): self._targets_sources_mask = v @property def vertices(self): # pylint: disable=line-too-long """ [max_depth, num_targets, num_sources, max_num_paths, 3], tf.float : Positions of intersection points. """ return self._vertices @vertices.setter def vertices(self, v): self._vertices = v @property def objects(self): # pylint: disable=line-too-long """ [max_depth, num_targets, num_sources, max_num_paths], tf.int : Indices of the intersected scene objects or wedges. Paths with depth lower than ``max_depth`` are padded with `-1`. """ return self._objects @objects.setter def objects(self, v): self._objects = v def merge(self, more_paths): r""" Merge ``more_paths`` with the current paths and returns the so-obtained instance. `self` is not updated. Input ----- more_paths : :class:`~sionna.rt.Paths` First set of paths to merge """ dtype = self._scene.dtype more_vertices = more_paths.vertices more_objects = more_paths.objects more_types = more_paths.types # The paths to merge must have the same number of sources and targets assert more_paths.targets.shape[0] == self.targets.shape[0],\ "Paths to merge must have same number of targets" assert more_paths.sources.shape[0] == self.sources.shape[0],\ "Paths to merge must have same number of targets" # Pad the paths with the lowest depth padding = self.vertices.shape[0] - more_vertices.shape[0] if padding > 0: more_vertices = tf.pad(more_vertices, [[0,padding],[0,0],[0,0],[0,0],[0,0]], constant_values=tf.zeros((), dtype.real_dtype)) more_objects = tf.pad(more_objects, [[0,padding],[0,0],[0,0],[0,0]], constant_values=-1) elif padding < 0: padding = -padding self.vertices = tf.pad(self.vertices, [[0,padding],[0,0],[0,0],[0,0],[0,0]], constant_values=tf.zeros((), dtype.real_dtype)) self.objects = tf.pad(self.objects, [[0,padding],[0,0],[0,0],[0,0]], constant_values=-1) # Merge types if tf.rank(self.types) == 0: merged_types = tf.repeat(self.types, tf.shape(self.vertices)[3]) else: merged_types = self.types if tf.rank(more_types) == 0: more_types = tf.repeat(more_types, tf.shape(more_vertices)[3]) self.types = tf.concat([merged_types, more_types], axis=0) # Concatenate all self.a = tf.concat([self.a, more_paths.a], axis=2) self.tau = tf.concat([self.tau, more_paths.tau], axis=2) self.theta_t = tf.concat([self.theta_t, more_paths.theta_t], axis=2) self.phi_t = tf.concat([self.phi_t, more_paths.phi_t], axis=2) self.theta_r = tf.concat([self.theta_r, more_paths.theta_r], axis=2) self.phi_r = tf.concat([self.phi_r, more_paths.phi_r], axis=2) self.mask = tf.concat([self.mask, more_paths.mask], axis=2) self.vertices = tf.concat([self.vertices, more_vertices], axis=3) self.objects = tf.concat([self.objects, more_objects], axis=3) self.doppler = tf.concat([self.doppler, more_paths.doppler], axis=2) return self def finalize(self): """ This function must be called to finalize the creation of the paths. This function: - Flags the LoS paths - Computes the smallest delay for delay normalization """ self.set_los_path_type() # Add dummy-dimension for batch_size # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] self.mask = tf.expand_dims(self.mask, axis=0) # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] self.a = tf.expand_dims(self.a, axis=0) # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] self.tau = tf.expand_dims(self.tau, axis=0) # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] self.theta_t = tf.expand_dims(self.theta_t, axis=0) # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] self.phi_t = tf.expand_dims(self.phi_t, axis=0) # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] self.theta_r = tf.expand_dims(self.theta_r, axis=0) # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] self.phi_r = tf.expand_dims(self.phi_r, axis=0) # [1, max_num_paths] self.types = tf.expand_dims(self.types, axis=0) # [1, max_num_paths] self.doppler = tf.expand_dims(self.doppler, axis=0) tau = self.tau if tau.shape[-1] == 0: # No paths self._min_tau = tf.zeros_like(tau) else: zero = tf.zeros((), tau.dtype) inf = tf.cast(np.inf, tau.dtype) tau = tf.where(tau < zero, inf, tau) if self._scene.synthetic_array: # [1, num_rx, num_tx, 1] min_tau = tf.reduce_min(tau, axis=3, keepdims=True) else: # [1, num_rx, 1, num_tx, 1, 1] min_tau = tf.reduce_min(tau, axis=(2, 4, 5), keepdims=True) min_tau = tf.where(tf.math.is_inf(min_tau), zero, min_tau) self._min_tau = min_tau # Add the time steps dimension # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 1] self.a = tf.expand_dims(self.a, axis=-1) # Normalize delays self.normalize_delays = True def set_los_path_type(self): """ Flags paths that do not hit any object as LoS """ # [max_depth, num_targets, num_sources, num_paths] objects = self.objects # [num_targets, num_sources, num_paths] mask = self.targets_sources_mask if objects.shape[3] > 0: # [num_targets, num_sources, num_paths] los_path = tf.reduce_all(objects == -1, axis=0) # [num_targets, num_sources, num_paths] los_path = tf.logical_and(los_path, mask) # [num_paths] los_path = tf.reduce_any(los_path, axis=(0,1)) # [[1]] los_path_index = tf.where(los_path) updates = tf.repeat(Paths.LOS, tf.shape(los_path_index)[0], 0) self.types = tf.tensor_scatter_nd_update(self.types, los_path_index, updates) def pad_or_crop(self, a, tau, k): """ Enforces that CIRs have exactly k paths by either zero-padding of cropping the weakest paths """ max_num_paths = a.shape[-2] # Crop if k<max_num_paths: # Compute indices of the k strongest paths # As is independent of the number of time steps, # Therefore, we use only the first one a[...,0]. # ind : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, k] _, ind = tf.math.top_k(tf.abs(a[...,0]), k=k, sorted=True) # Gather the strongest paths # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, k, num_time_steps] a = tf.gather(a, ind, batch_dims=5) # Gather the corresponding path delays # Synthetic array if tf.rank(tau)==4: # tau : [batch_size, num_rx, num_tx, max_num_paths] # Get relevant indices # [batch_size, num_rx, num_tx, k] ind_tau = ind[:,:,0,:,0] # [batch_size, num_rx, num_tx, k] tau = tf.gather(tau, ind_tau, batch_dims=3) # Non-synthetic array else: # tau: [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, k] tau = tf.gather(tau, ind, batch_dims=5) # Pad elif k>max_num_paths: # Pad paths with zeros pad_size = k-max_num_paths # Paddings for the paths gains paddings = tf.constant([[0, 0] if i != 5 else [0, pad_size] for i in range(7)]) a = tf.pad(a, paddings=paddings, mode='CONSTANT', constant_values=0) # Paddings for the delays (-1 by Sionna convention) paddings = tf.constant([[0, 0] if i != tf.rank(tau)-1 else [0, pad_size] for i in range(tf.rank(tau))]) tau = tf.pad(tau, paddings=paddings, mode='CONSTANT', constant_values=-1) return a, tau