#
# SPDX-FileCopyrightText: Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
"""Stores the computed propagation paths"""
import mitsuba as mi
import drjit as dr
from typing import Literal
from .paths_buffer import PathsBuffer
from sionna.rt.constants import InteractionType, INVALID_SHAPE,\
INVALID_PRIMITIVE
from sionna.rt import Scene
from sionna.rt.utils import cpx_mul, cpx_abs_square, r_hat, sinc, cpx_convert,\
map_angle_to_canonical_range
[docs]
class Paths:
# pylint: disable=line-too-long
r"""
Paths()
Stores the simulated propagation paths
Paths are generated for the loaded scene using a path solver, such as
:class:`~sionna.rt.PathSolver`. Please refer to the documentation of this
class for further details.
:param scene: Scene for which paths are computed
:param src_positions: Positions of the sources
:param tgt_positions: Positions of the targets
:param tx_velocities: Velocities of the transmitters
:param rx_velocities: Velocities of the receivers
:param synthetic_array: If set to `True`, then the antenna arrays are applied synthetically
:param paths_buffer: Paths buffer storing the computed paths
:param rel_ant_positions_tx: Positions of the array elements with respect to the center of the transmitters. Only required if synthetic arrays are used.
:param rel_ant_positions_rx: Positions of the array elements with respect to the center of the receivers. Only required if synthetic arrays are used.
"""
def __init__(self,
scene : Scene,
src_positions : mi.Point3f,
tgt_positions : mi.Point3f,
tx_velocities : mi.Vector3f,
rx_velocities : mi.Vector3f,
synthetic_array : bool,
paths_buffer : PathsBuffer,
rel_ant_positions_tx : mi.Point3f | None,
rel_ant_positions_rx : mi.Point3f | None):
self._paths_buffer = paths_buffer
# Sources and targets, i.e., end-points of the paths
self._src_positions = src_positions
self._tgt_positions = tgt_positions
# Velocities of the radio devices
self._tx_velocities = tx_velocities
self._rx_velocities = rx_velocities
# Referencess to the transmitter and receiver arrays
self._tx_array = scene.tx_array
self._rx_array = scene.rx_array
# Numbers of transmitters and receivers
self._num_tx = len(scene.transmitters)
self._num_rx = len(scene.receivers)
# Flag indicating if synthetic arrays were used
self._synthetic_array = synthetic_array
# Wavelength for which the paths were computed
self._wavelength = scene.wavelength
# Frequency for which the paths were computed
self._frequency = scene.frequency
# Flag indicating if the tensors storing the paths components were built
# For efficiency, these are only built if requested.
self._paths_components_built = False
# To avoid errors, we handle the corner case in which no paths were
# found separately
if paths_buffer.buffer_size == 0:
self._build_empty_paths()
return # Stop init
# The following quantities are used in a few internal functions.
# They are pre-computed here to avoid code duplication
# Effective array size
# If synthetic array is assumed, only a single source (target) was
# used to model for the transmitter (receiver) array.
# The synthetic phases shifts are applied afterwards.
if self._synthetic_array:
self._eff_tx_array_size = 1
self._eff_rx_array_size = 1
else:
self._eff_tx_array_size = self._tx_array.array_size
self._eff_rx_array_size = self._rx_array.array_size
src_ind = paths_buffer.source_indices
tgt_ind = paths_buffer.target_indices
# Indices of transmitters, receivers, and antennas.
# An antenna-first ordering is assumed.
self._tx_ind = src_ind // self._eff_tx_array_size
self._rx_ind = tgt_ind // self._eff_rx_array_size
#
self._tx_ant_ind = src_ind % self._eff_tx_array_size
self._rx_ant_ind = tgt_ind % self._eff_rx_array_size
# The following uses `dr.scatter_inc` to jointly:
# - Compute the maximum number of paths over all (source, target)
# couples, which is then used to allocate the memory for the tensors.
# - Compute a path index such that paths sharing the same
# (source, target) couple do not share the same index. This is later
# used to scatter data in the allocated tensors.
#
# We must ensure that the path index used for scattering the path data
# in the allocated tensors are unique for every (source, target) couple.
# Map (src_ind, tgt_ind) to a unique integer identifying the path source
# and target
num_src = self._num_tx*self._eff_tx_array_size
num_tgt = self._num_rx*self._eff_rx_array_size
src_tgt_id = tgt_ind*num_src + src_ind
#
num_paths = dr.zeros(mi.UInt, num_src*num_tgt)
self._path_ind = dr.scatter_inc(num_paths, src_tgt_id)
dr.eval(self._path_ind)
self._max_num_paths = dr.max(num_paths)[0]
# Build the tensors
# Builds tensors from the paths buffer
self._build_from_buffer()
# Apply synthetic arrat if required
if self._synthetic_array:
self._apply_synthetic_array(rel_ant_positions_tx,
rel_ant_positions_rx)
# Fuse the pattern and array dimensions
self._fuse_pattern_array_dims()
@property
def sources(self):
# pylint: disable=line-too-long
r"""Positions of the paths sources. If synthetic
arrays are not used (:attr:`~sionna.rt.Paths.synthetic_array` is `False`),
then every transmit antenna is modeled as a source of paths.
Otherwise, transmitters are modelled as if they had a single antenna
located at their :attr:`~sionna.rt.RadioDevice.position`. The channel
responses for each individual antenna of the arrays are then computed
"synthetically" by applying appropriate phase shifts.
:type: :py:class:`mi.Point3f`"""
return self._src_positions
@property
def targets(self):
# pylint: disable=line-too-long
r"""Positions of the paths targets. If synthetic
arrays are not used (:attr:`~sionna.rt.Paths.synthetic_array` is `False`),
then every receiver antenna is modeled as a source of paths.
Otherwise, receivers are modelled as if they had a single antenna
located at their :attr:`~sionna.rt.RadioDevice.position`. The channel
responses for each individual antenna of the arrays are then computed
"synthetically" by applying appropriate phase shifts.
:type: :py:class:`mi.Point3f`"""
return self._tgt_positions
@property
def tx_array(self):
"""Antenna array used by transmitters
:type: :class:`~sionna.rt.AntennaArray`
"""
return self._tx_array
@property
def rx_array(self):
"""Antenna array used by receivers
:type: :class:`~sionna.rt.AntennaArray`"""
return self._rx_array
@property
def num_tx(self):
"""Number of transmitters
:type: :py:class:`int`"""
return self._num_tx
@property
def num_rx(self):
"""Number of receivers
:type: :py:class:`int`
"""
return self._num_rx
@property
def synthetic_array(self):
"""Flag indicating if synthetic arrays were used to trace the paths
:type: :py:class:`bool`
"""
return self._synthetic_array
@property
def valid(self):
# pylint: disable=line-too-long
"""
Flags indicating valid paths
:type: :py:class:`mi.TensorXb [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths]`
"""
return self._valid
@property
def a(self):
# pylint: disable=line-too-long
r"""
Real and imaginary components of the channel coefficients
:type: :py:class:`Tuple[mi.TensorXf [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths], mi.TensorXf [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths]]`
"""
return self._a_real, self._a_imag
@property
def tau(self):
# pylint: disable=line-too-long
"""
Paths delays [s]
:type: :py:class:`mi.TensorXf [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [num_rx, num_tx, num_paths]`
"""
return self._tau
@property
def theta_t(self):
# pylint: disable=line-too-long
"""
Zenith angles of departure [rad]
:type: :py:class:`mi.TensorXf [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [num_rx, num_tx, num_paths]`
"""
return self._theta_t
@property
def phi_t(self):
# pylint: disable=line-too-long
"""
Azimuth angles of departure [rad]
:type: :py:class:`mi.TensorXf [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [num_rx, num_tx, num_paths]`
"""
return self._phi_t
@property
def theta_r(self):
# pylint: disable=line-too-long
"""
Zenith angles of arrival [rad]
:type: :py:class:`mi.TensorXf [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [num_rx, num_tx, num_paths]`
"""
return self._theta_r
@property
def phi_r(self):
# pylint: disable=line-too-long
"""
Azimuth angles of arrival [rad]
:type: :py:class:`mi.TensorXf [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [num_rx, num_tx, num_paths]`
"""
return self._phi_r
@property
def interactions(self):
# pylint: disable=line-too-long
"""
Interaction type represented using
:class:`~sionna.rt.constants.InteractionType`
:type: :py:class:`mi.TensorXu [max_depth, num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [max_depth, num_rx, num_tx, num_paths]`
"""
if not self._paths_components_built:
self._build_paths_components()
return self._interactions
@property
def objects(self):
# pylint: disable=line-too-long
"""
IDs of the intersected objects. Invalid objects are represented by
:data:`~sionna.rt.constants.INVALID_SHAPE`.
:type: :py:class:`mi.TensorXu [max_depth, num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [max_depth, num_rx, num_tx, num_paths]`
"""
if not self._paths_components_built:
self._build_paths_components()
return self._shapes
@property
def primitives(self):
# pylint: disable=line-too-long
"""
Indices of the intersected primitives. Invalid primitives are
represented by :data:`~sionna.rt.constants.INVALID_PRIMITIVE`.
:type: :py:class:`mi.TensorXu [max_depth, num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [max_depth, num_rx, num_tx, num_paths]`
"""
if not self._paths_components_built:
self._build_paths_components()
return self._primitives
@property
def vertices(self):
# pylint: disable=line-too-long
"""
Paths' vertices, i.e., the interaction points of the paths with the
scene
:type: :py:class:`mi.TensorXf [max_depth, num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths, 3] or [max_depth, num_rx, num_tx, num_paths, 3]`
"""
if not self._paths_components_built:
self._build_paths_components()
return self._vertices
@property
def doppler(self):
# pylint: disable=line-too-long
r"""
Doppler shift for each path
To understand how Doppler shifts are computed, let us consider a single propagation path undergoing
:math:`n` scattering processes, e.g., reflection, diffuse scattering,
refraction, as shown in the figure below.
.. figure:: ../figures/doppler.png
:align: center
The object on which lies the :math:`i\text{th}` scattering point has the velocity vector
:math:`\hat{\mathbf{v}}_i` and the outgoing ray direction at this point is
denoted :math:`\hat{\mathbf{k}}_i`. The first and last point correspond to the transmitter
and receiver, respectively. We therefore have
.. math::
\hat{\mathbf{k}}_0 &= \hat{\mathbf{r}}(\theta_{\text{T}}, \varphi_{\text{T}})\\
\hat{\mathbf{k}}_{n} &= -\hat{\mathbf{r}}(\theta_{\text{R}}, \varphi_{\text{R}})
where :math:`(\theta_{\text{T}}, \varphi_{\text{T}})` are the AoDs,
:math:`(\theta_{\text{R}}, \varphi_{\text{R}})` are the AoAs, and :math:`\hat{\mathbf{r}}(\theta,\varphi)` is defined in :eq:`spherical_vecs`.
If the transmitter emits a signal with frequency :math:`f`, the receiver
will observe the signal at frequency :math:`f'=f + f_\Delta`, where :math:`f_\Delta` is the Doppler
shift, which can be computed as [Wiffen2018]_
.. math::
f' = f \prod_{i=0}^n \frac{1 - \frac{\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c}}{1 - \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i}{c}}.
Under the assumption that :math:`\lVert \mathbf{v}_i \rVert\ll c`, we can apply the Taylor expansion :math:`(1-x)^{-1}\approx 1+x`, for :math:`x\ll 1`, to the previous equation
to obtain
.. math::
f' &\approx f \prod_{i=0}^n \left(1 - \frac{\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c}\right)\left(1 + \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i}{c}\right)\\
&\approx f \left(1 + \sum_{i=0}^n \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i -\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c} \right)
where the second line results from ignoring terms in :math:`c^{-2}`. Solving for :math:`f_\Delta`, grouping terms with the same :math:`\mathbf{v}_i` together, and using :math:`f=c/\lambda`, we obtain
.. math::
f_\Delta = \frac{1}{\lambda}\left(\mathbf{v}_{0}^\mathsf{T}\hat{\mathbf{k}}_0 - \mathbf{v}_{n+1}^\mathsf{T}\hat{\mathbf{k}}_n + \sum_{i=1}^n \mathbf{v}_{i}^\mathsf{T}\left(\hat{\mathbf{k}}_i-\hat{\mathbf{k}}_{i-1} \right) \right) \qquad \text{[Hz]}.
:type: :py:class:`mi.TensorXf [num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths] or [num_rx, num_tx, num_paths]`:
"""
return self._doppler
[docs]
def cir(self, *,
sampling_frequency: float = 1.,
num_time_steps: int = 1,
normalize_delays: bool = True,
reverse_direction : bool = False,
out_type: Literal["drjit", "jax", "numpy", "tf", "torch"] = "drjit"
):
# pylint: disable=line-too-long
r"""
Returns the baseband-equivalent channel impulse response :eq:`h_b`
which can be used for link simulations by other Sionna components.
Optionally, time evolution of the channel can be simulated based
on the Doppler shifts of all paths.
The baseband equivalent channel coefficient :math:`a^{\text{b}}_{i}(t)`
at time :math:`t` is computed as :
.. math::
a^{\text{b}}_{i}(t) = \underbrace{a_{i} e^{-j2 \pi f \tau_{i}}}_{a^{\text{b}}_{i} } \underbrace{e^{j 2\pi f_{\Delta, i} t}}_{\text{Doppler phase shift}}
where :math:`i` is the index of an arbitrary path, :math:`a_{i}`
is the passband path coefficient (:attr:`~sionna.rt.Paths.a`),
:math:`\tau_{i}` is the path delay (:attr:`~sionna.rt.Paths.tau`),
:math:`f` is the carrier frequency, and :math:`f_{\Delta, i}` is the
Doppler shift of the :math:`i\text{th}` path.
:param sampling_frequency: Frequency [Hz] at which the channel impulse
response is sampled
:param num_time_steps: Number of time steps
:param normalize_delays: If set to `True`, path delays are normalized
such that the first path between any pair of antennas of a
transmitter and receiver arrives at :math:`\tau = 0`
:param reverse_direction: If set to True, swaps receivers and
transmitters
:param out_type: Name of the desired output type.
Currently supported are
`Dr.Jit <https://drjit.readthedocs.io/en/latest/reference.html>`_
("drjit), `Numpy <https://numpy.org>`_ ("numpy"),
`Jax <https://jax.readthedocs.io/en/latest/index.html>`_ ("jax"),
`TensorFlow <https://www.tensorflow.org>`_ ("tf"),
and `PyTorch <https://pytorch.org>`_ ("torch").
:return:
Real and imaginary components of the baseband equivalent channel
coefficients :math:`a^{\text{b}}_{i}`
:return type: Shape : [num_rx, num_rx_ant, num_tx,
num_tx_ant, num_paths, num_time_steps],
Type: :py:class:`Tuple[mi.TensorXf, mi.TensorXf]`
| :py:class:`np.array` | :py:class:`jax.array`
| :py:class:`tf.Tensor` | :py:class:`torch.tensor`
:return: Paths delays :math:`\tau_{i}` [s]
:return type: Shape : [num_rx, num_rx_ant, num_tx,
num_tx_ant, num_paths] or [num_rx, num_tx, num_paths],
Type : :py:class:`mi.TensorXf`
| :py:class:`np.array` | :py:class:`jax.array`
| :py:class:`tf.Tensor` | :py:class:`torch.tensor`
"""
# Reverse direction if requited
if reverse_direction:
a = self._reverse_direction(self.a)
tau, = self._reverse_direction((self.tau,))
else:
a = dr.copy(self.a)
tau = dr.copy(self.tau)
# If no paths, then return immediately
if tau.shape[-1] == 0:
a = (dr.zeros(mi.TensorXf, list(a[0].shape)+[num_time_steps]),
dr.zeros(mi.TensorXf, list(a[1].shape)+[num_time_steps]))
tau = dr.zeros(mi.TensorXf, tau.shape)
if out_type == "drjit":
return a, tau
return cpx_convert(a, out_type), getattr(tau, out_type)()
# Normalize delays if required
if normalize_delays:
tau = dr.select(tau<0, dr.inf, tau)
if self.synthetic_array:
min_tau = dr.min(tau, axis=-1)
min_tau = dr.reshape(mi.TensorXf, min_tau,
shape=list(min_tau.shape) + [1])
else:
min_tau = dr.min(tau, axis=(1,3,4))
num_rx, num_tx = min_tau.shape
min_tau = dr.reshape(mi.TensorXf, min_tau,
shape=[num_rx, 1, num_tx, 1, 1])
# Apply delay normalizaztion
tau -= min_tau
# Set delays of invalid paths to -1
tau = dr.select(dr.isnan(tau) | dr.isinf(tau), -1, tau)
# Compute baseband-equivalent CIR
if self.synthetic_array:
num_rx, num_tx, num_paths = tau.shape
reshape_to = [num_rx, 1, num_tx, 1, num_paths]
tau_ = dr.reshape(mi.TensorXf, tau, reshape_to)
else:
tau_ = tau
# Compute phase shifts and apply to paths coefficients
tau_ = dr.select(tau_==-1, 0, tau_)
phase = -dr.two_pi*self._frequency*tau_
phase = map_angle_to_canonical_range(phase)
sin_phase, cos_phase = dr.sincos(phase)
exp = (cos_phase, sin_phase)
a = cpx_mul(a, exp)
# Add dummy time dimensions
a = [dr.reshape(mi.TensorXf, a_, list(a_.shape) + [1]) for a_ in a]
# Apply Doppler phase shifts
if num_time_steps > 1:
doppler = self.doppler
# Reshape the Doppler shift tensor to fit `a`
if self.synthetic_array:
doppler = dr.reshape(mi.TensorXf, doppler, reshape_to + [1])
else:
doppler = dr.reshape(mi.TensorXf, doppler,
list(doppler.shape) + [1])
time_steps = dr.arange(mi.Float, num_time_steps)/sampling_frequency
time_steps = dr.reshape(mi.TensorXf, time_steps, [1, 1, 1, 1, 1,
num_time_steps])
phase = dr.two_pi*doppler*time_steps
phase = map_angle_to_canonical_range(phase)
sin_phase, cos_phase = dr.sincos(phase)
exp = (cos_phase, sin_phase)
a = cpx_mul(a, exp)
if out_type == "drjit":
return a, tau
return cpx_convert(a, out_type), getattr(tau, out_type)()
[docs]
def taps(self,
bandwidth: float,
l_min: int,
l_max: int,
sampling_frequency: float | None = None,
num_time_steps: int = 1,
normalize: bool = False,
normalize_delays: bool = True,
reverse_direction : bool = False,
out_type: Literal["drjit", "jax", "numpy", "tf", "torch"] = "drjit"
):
r"""
Returns the channel taps forming the discrete complex
baseband-equivalent channel impulse response
This function assumes that a sinc filter is used for pulse shaping and
receive filtering. Therefore, given a channel impulse response
:math:`(a_{i}^\text{b}(t), \tau_{i}), 0 \leq i \leq M-1` (which can be
computed by :meth:`~sionna.rt.Paths.cir`), the
:math:`\ell\text{th}` channel tap at sample instance :math:`n`
is computed as follows (see (Eq. 2.34) [Tse]_):
.. math::
\bar{h}_{n, \ell}
= \sum_{i=0}^{M-1} a_{i}^\text{b}\left(\frac{n}{W}\right)
\text{sinc}\left( \ell - W\tau_{m} \right)
for :math:`\ell` ranging from ``l_min`` to ``l_max``, and where :math:`W` is
the ``bandwidth``.
This function allows for an arbitrary ``sampling_frequency`` at which
the channel taps are sampled. By default, it is equal to the
``bandwidth``.
:param bandwidth: Bandwidth [Hz] to which the channel impulse response
will be limited
:param l_min: Smallest time-lag for the discrete complex
baseband-equivalent channel (:math:`L_{\text{min}}`)
:param l_max: Largest time-lag for the discrete complex
baseband-equivalent channel (:math:`L_{\text{max}}`)
:param sampling_frequency: Frequency [Hz] at which the channel impulse
response is sampled. If set to `None`, the ``bandwidth`` is used
instead.
:param num_time_steps: Number of time steps
:param normalize: If set to `True`, the channel is normalized such that
the average total energy of the channel taps is equal to one.
:param normalize_delays: If set to `True`, path delays are normalized
such that the first path between any pair of antennas of a
transmitter and receiver arrives at :math:`\tau = 0`.
:param reverse_direction: If set to True, swaps receivers and
transmitters
:param out_type: Name of the desired output type.
Currently supported are
`Dr.Jit <https://drjit.readthedocs.io/en/latest/reference.html>`_
("drjit), `Numpy <https://numpy.org>`_ ("numpy"),
`Jax <https://jax.readthedocs.io/en/latest/index.html>`_ ("jax"),
`TensorFlow <https://www.tensorflow.org>`_ ("tf"),
and `PyTorch <https://pytorch.org>`_ ("torch").
:return: Channel tap coefficients
:return type: Shape: [num_rx, num_rx_ant, num_tx, num_tx_ant,
num_time_steps, l_max - l_min + 1],
Type : :py:class:`Tuple[mi.TensorXf, mi.TensorXf]`
| :py:class:`np.array` | :py:class:`jax.array`
| :py:class:`tf.Tensor` | :py:class:`torch.tensor`
"""
# Get complex baseband equivalent CIR
if sampling_frequency is None:
sampling_frequency = bandwidth
a, tau = self.cir(sampling_frequency=sampling_frequency,
num_time_steps=num_time_steps,
normalize_delays=normalize_delays,
reverse_direction=reverse_direction)
# If no paths, then return immediately
if tau.shape[-1] == 0:
out_shape = list(a[0].shape)[:-2]+[num_time_steps,
l_max - l_min + 1]
h = (dr.zeros(mi.TensorXf, out_shape),
dr.zeros(mi.TensorXf, out_shape))
if out_type == "drjit":
return h
return cpx_convert(h, out_type)
# Tap indices
l = dr.arange(mi.Float, l_min, l_max+1)
# Reshape for matching dimensions
l = dr.reshape(mi.TensorXf, l, shape=[1]*6 + [-1])
a = [dr.reshape(mi.TensorXf, a_, shape=list(a_.shape)+[1]) for a_ in a]
if self.synthetic_array:
num_rx, num_tx, num_paths = tau.shape
reshape_to = [num_rx, 1, num_tx, 1, num_paths, 1, 1]
tau = dr.reshape(mi.TensorXf, tau, reshape_to)
else:
tau = dr.reshape(mi.TensorXf, tau, shape=list(tau.shape) + [1,1])
# Compute taps by low-pass filtering
g = sinc(l - tau*bandwidth)
h = [dr.sum(a_*g, axis=4) for a_ in a]
# Normalize energy
if normalize:
# Normalization is performed such that for every link the energy
# per block is one.
# The total energy of a channel response is the sum of the squared
# norm over the channel taps.
# Total energy in all taps
c = dr.sum(cpx_abs_square(h), axis=-1)
# Average over time steps, RX antennas, and TX antennas
c = dr.mean(c, axis=(1, 3, 4))
# Reshaping before normalization
num_rx, num_tx = c.shape
c = dr.reshape(mi.TensorXf, c, [num_rx, 1, num_tx, 1, 1, 1])
# Normalization
h = [dr.select(c==0, 0, h_*dr.rsqrt(c)) for h_ in h]
if out_type == "drjit":
return h
return cpx_convert(h, out_type)
[docs]
def cfr(self,
frequencies: mi.Float,
sampling_frequency: float = 1.,
num_time_steps: int = 1,
normalize_delays: bool = True,
normalize: bool = False,
reverse_direction : bool = False,
out_type: Literal["drjit", "jax", "numpy", "tf", "torch"] = "drjit"
):
r"""
Compute the frequency response of the channel at ``frequencies``.
Optionally, time evolution of the channel can be simulated based
on the Doppler shifts of all paths.
Given a channel impulse response
:math:`(a_i^\text{b}(t), \tau_{i}), 0 \leq i \leq M-1`, as computed
by :meth:`~sionna.rt.Paths.cir`,
the channel frequency response for the frequency :math:`f`
is computed as follows:
.. math::
\widehat{h}(f, t) = \sum_{i=0}^{M-1}a_i^\text{b}(t) e^{-j2\pi f \tau_{i}}
The time evolution of the channel is simulated as described in
the documentation of :meth:`~sionna.rt.Paths.cir`.
:param frequencies: Frequencies [Hz] at which to compute the
channel response
:param sampling_frequency: Frequency [Hz] at which the channel impulse
response is sampled
:param num_time_steps: Number of time steps
:param normalize_delays: If set to `True`, path delays are normalized
such that the first path between any pair of antennas of a
transmitter and receiver arrives at :math:`\tau = 0`.
:param normalize: If set to `True`, the channel is normalized across
time and frequencies to ensure unit average energy.
:param reverse_direction: If set to True, swaps receivers and
transmitters
:param out_type: Name of the desired output type.
Currently supported are
`Dr.Jit <https://drjit.readthedocs.io/en/latest/reference.html>`_
("drjit), `Numpy <https://numpy.org>`_ ("numpy"),
`Jax <https://jax.readthedocs.io/en/latest/index.html>`_ ("jax"),
`TensorFlow <https://www.tensorflow.org>`_ ("tf"),
and `PyTorch <https://pytorch.org>`_ ("torch").
:return: Real and imaginary components of the baseband equivalent channel
coefficients :math:`a^{\text{b}}_{i}`
:return type: Shape : [num_rx, num_rx_ant, num_tx, num_tx_ant,
num_time_steps, num_frequencies],
Type : :py:class:`Tuple[mi.TensorXf`
| :py:class:`np.array` | :py:class:`jax.array`
| :py:class:`tf.Tensor` | :py:class:`torch.tensor`
"""
frequencies = mi.Float(frequencies)
# Get complex baseband equivalent CIR
a, tau_ = self.cir(sampling_frequency=sampling_frequency,
num_time_steps=num_time_steps,
normalize_delays=normalize_delays,
reverse_direction=reverse_direction)
# If no paths, then return immediately
if tau_.shape[-1] == 0:
out_shape = list(a[0].shape)[:-2]+[num_time_steps,
frequencies.shape[0]]
h = (dr.zeros(mi.TensorXf, out_shape),
dr.zeros(mi.TensorXf, out_shape))
if out_type == "drjit":
return h
return cpx_convert(h, out_type)
# Add dummy dimensions to account for synthetic arrays
if self.synthetic_array:
num_rx, num_tx, num_paths = tau_.shape
tau_ = dr.reshape(mi.TensorXf, tau_,
[num_rx, 1, num_tx, 1, num_paths])
# Add dimensions for frequencies and time
tau_ = dr.reshape(mi.TensorXf, tau_, list(tau_.shape) + [1,1])
a = [dr.reshape(mi.TensorXf, a_, list(a[0].shape) + [1]) for a_ in a]
# Compute phase shifts
phase = -dr.two_pi*frequencies*tau_
phase = map_angle_to_canonical_range(phase)
sin_phase, cos_phase = dr.sincos(phase)
exp = (cos_phase, sin_phase)
# Compute resulting Fourier coefficients
a_f = cpx_mul(a, exp)
# Compute CFR
h_f = (dr.sum(a_f[0], axis=-3), dr.sum(a_f[1], axis=-3))
# Normalize
if normalize:
c = dr.rcp(dr.sqrt(dr.mean(cpx_abs_square(h_f), axis=(1,3,4,5))))
num_rx, num_tx = c.shape
c = dr.reshape(mi.TensorXf, c, [num_rx, 1, num_tx, 1, 1, 1])
h_f = [h*c for h in h_f]
if out_type == "drjit":
return h_f
return cpx_convert(h_f, out_type)
###########################################
# Internal methods
###########################################
def _build_from_buffer(self) -> None:
r"""
Builds the multi-dimensional tensors storing the paths data from the
path buffer object
This function reorganizes the path data (coefficients, delays, angles
of arrival and departure) into tensors.
The vertices, intersected objects and primitives and other data only
required for rendering or for more exhaustive study of the paths are not
processed by this function and only computed on demand.
"""
paths_buffer = self._paths_buffer
num_tx = self._num_tx
num_rx = self._num_rx
tx_array_size = self._eff_tx_array_size
rx_array_size = self._eff_rx_array_size
tx_ind = self._tx_ind
rx_ind = self._rx_ind
tx_ant_ind = self._tx_ant_ind
rx_ant_ind = self._rx_ant_ind
num_rx_patterns = len(self._rx_array.antenna_pattern.patterns)
num_tx_patterns = len(self._tx_array.antenna_pattern.patterns)
# Path index such that paths sharing the same (source, target) couple
# do not share the same index.
path_ind = self._path_ind
# Maximum number of paths over all (source, target) couples
max_num_paths = self._max_num_paths
# Allocate the tensors and fill them
# `a`
a_real = dr.zeros(mi.TensorXf, [num_rx, num_rx_patterns, rx_array_size,
num_tx, num_tx_patterns, tx_array_size,
max_num_paths])
a_imag = dr.zeros(mi.TensorXf, [num_rx, num_rx_patterns, rx_array_size,
num_tx, num_tx_patterns, tx_array_size,
max_num_paths])
# Scatter indices
f1 = max_num_paths
f2 = tx_array_size*f1
f3 = num_tx_patterns*f2
f4 = num_tx*f3
f5 = rx_array_size*f4
f6 = num_rx_patterns*f5
a_scat_ind = path_ind + tx_ant_ind*f1 + tx_ind*f3 + rx_ant_ind*f4\
+ rx_ind*f6
# Fill the `a` tensor
for n in range(num_rx_patterns):
a_scat_ind_ = dr.copy(a_scat_ind) + n*f5
for m in range(num_tx_patterns):
a_scat_ind__ = a_scat_ind_ + m*f2
dr.scatter(a_real.array, dr.real(paths_buffer.a[n][m]),
a_scat_ind__)
dr.scatter(a_imag.array, dr.imag(paths_buffer.a[n][m]),
a_scat_ind__)
# For the other tensors, if a synthetic arrays are used, then the
# pattern and antenna dimension do not need to be created
# Shapes and indices for scattering
if self._synthetic_array:
tensor_shape = [num_rx, num_tx, max_num_paths]
num_rx_patterns = num_tx_patterns = 1
else:
tensor_shape = [num_rx, num_rx_patterns, rx_array_size, num_tx,
num_tx_patterns, tx_array_size, max_num_paths]
# We need to recompute these factors as `num_rx_patterns`
# or `num_rx_patterns` might have changed
f3 = num_tx_patterns*f2
f4 = num_tx*f3
f5 = rx_array_size*f4
f6 = num_rx_patterns*f5
scat_ind = path_ind + tx_ant_ind*f1 + tx_ind*f3 + rx_ant_ind*f4\
+ rx_ind*f6
# Instantiate the tensors
valid = dr.full(mi.TensorXb, False, tensor_shape)
tau = dr.full(mi.TensorXf, -1., tensor_shape)
theta_t = dr.zeros(mi.TensorXf, tensor_shape)
phi_t = dr.zeros(mi.TensorXf, tensor_shape)
theta_r = dr.zeros(mi.TensorXf, tensor_shape)
phi_r = dr.zeros(mi.TensorXf, tensor_shape)
doppler = dr.zeros(mi.TensorXf, tensor_shape)
# Finalize Doppler shift computation by applying the shift due to
# transmitter and receiver mobility
doppler_ = self._finalize_doppler_shift_compute()
# Fill the tensor
# Note that we cannot fuse this loop with the one used to fill `a` as
# `num_rx_patterns` or `num_tx_patterns` might be different
for n in range(num_rx_patterns):
scat_ind_ = scat_ind + n*f5
for m in range(num_tx_patterns):
scat_ind__ = scat_ind_ + m*f2
#
dr.scatter(valid.array, True, scat_ind__)
#
dr.scatter(tau.array, paths_buffer.tau, scat_ind__)
#
dr.scatter(theta_t.array, paths_buffer.theta_t, scat_ind__)
dr.scatter(phi_t.array, paths_buffer.phi_t, scat_ind__)
#
dr.scatter(theta_r.array, paths_buffer.theta_r, scat_ind__)
dr.scatter(phi_r.array, paths_buffer.phi_r, scat_ind__)
#
dr.scatter(doppler.array, doppler_, scat_ind__)
self._valid = valid
self._a_real = a_real
self._a_imag = a_imag
self._tau = tau
self._theta_t = theta_t
self._phi_t = phi_t
self._theta_r = theta_r
self._phi_r = phi_r
self._doppler = doppler
def _apply_synthetic_array(self,
rel_ant_positions_tx : mi.Point3f | None,
rel_ant_positions_rx : mi.Point3f | None
) -> None:
r"""
Applies the phase shifts to simulate the effect of a synthetic array
on a planar wave
:param rel_ant_positions_tx: Positions of the array elements with
respect to the center of the transmitters. Only required if
synthetic arrays are used.
:param rel_ant_positions_rx: Positions of the array elements with
respect to the center of the receivers. Only required if synthetic
arrays are used.
"""
num_tx = self._num_tx
num_rx = self._num_rx
tx_array_size = self._tx_array.array_size
rx_array_size = self._rx_array.array_size
max_num_paths = dr.shape(self._a_real)[-1]
# [num_rx, num_rx_patterns, 1, num_tx, num_tx_patterns, 1,
# max_num_paths]
a_real = self._a_real
a_imag = self._a_imag
# Directions of arrival and departures
# [num_rx, num_tx, max_num_paths]
theta_t, phi_t = self._theta_t, self._phi_t
# [num_tx, num_tx, max_num_paths, 3]
sin_phi_t, cos_phi_t = dr.sincos(phi_t)
sin_theta_t, cos_theta_t = dr.sincos(theta_t)
k_tx_x = sin_theta_t*cos_phi_t
k_tx_y = sin_theta_t*sin_phi_t
k_tx_z = cos_theta_t
# Expand for broadcasting
k_tx_x = dr.reshape(mi.TensorXf, k_tx_x,
[num_rx, 1, 1, num_tx, 1, 1, max_num_paths])
k_tx_y = dr.reshape(mi.TensorXf, k_tx_y,
[num_rx, 1, 1, num_tx, 1, 1, max_num_paths])
k_tx_z = dr.reshape(mi.TensorXf, k_tx_z,
[num_rx, 1, 1, num_tx, 1, 1, max_num_paths])
# [num_rx, num_tx, max_num_paths]
theta_r, phi_r = self._theta_r, self._phi_r
# [num_tx, num_tx, max_num_paths, 3]
sin_phi_r, cos_phi_r = dr.sincos(phi_r)
sin_theta_r, cos_theta_r = dr.sincos(theta_r)
k_rx_x = sin_theta_r*cos_phi_r
k_rx_y = sin_theta_r*sin_phi_r
k_rx_z = cos_theta_r
# Expand for broadcasting
k_rx_x = dr.reshape(mi.TensorXf, k_rx_x,
[num_rx, 1, 1, num_tx, 1, 1, max_num_paths])
k_rx_y = dr.reshape(mi.TensorXf, k_rx_y,
[num_rx, 1, 1, num_tx, 1, 1, max_num_paths])
k_rx_z = dr.reshape(mi.TensorXf, k_rx_z,
[num_rx, 1, 1, num_tx, 1, 1, max_num_paths])
# Relative positions of the antennas of the transmitters and receivers
# [1, 1, 1, num_tx, 1, tx_array_size, 1]
rel_ant_pos_tx_x = dr.reshape(mi.TensorXf, rel_ant_positions_tx.x,
[1, 1, 1, num_tx, 1, tx_array_size, 1])
rel_ant_pos_tx_y = dr.reshape(mi.TensorXf, rel_ant_positions_tx.y,
[1, 1, 1, num_tx, 1, tx_array_size, 1])
rel_ant_pos_tx_z = dr.reshape(mi.TensorXf, rel_ant_positions_tx.z,
[1, 1, 1, num_tx, 1, tx_array_size, 1])
# Relative positions of the antennas of the transmitters and receivers
# [num_rx, 1, rx_array_size, 1, 1, 1, 1]
rel_ant_pos_rx_x = dr.reshape(mi.TensorXf, rel_ant_positions_rx.x,
[num_rx, 1, rx_array_size, 1, 1, 1, 1])
rel_ant_pos_rx_y = dr.reshape(mi.TensorXf, rel_ant_positions_rx.y,
[num_rx, 1, rx_array_size, 1, 1, 1, 1])
rel_ant_pos_rx_z = dr.reshape(mi.TensorXf, rel_ant_positions_rx.z,
[num_rx, 1, rx_array_size, 1, 1, 1, 1])
# Compute the phase shifts by taking the dot products between directions
# off departure (arrival) and the antenna array relative positions
# TX
# [num_rx, 1, 1, num_tx, 1, tx_array_size, max_num_paths]
tx_phase_shifts = rel_ant_pos_tx_z*k_tx_z
tx_phase_shifts = dr.fma(rel_ant_pos_tx_y, k_tx_y, tx_phase_shifts)
tx_phase_shifts = dr.fma(rel_ant_pos_tx_x, k_tx_x, tx_phase_shifts)
# RX
# [num_rx, 1, rx_array_size, num_tx, 1, 1, max_num_paths]
rx_phase_shifts = rel_ant_pos_rx_z*k_rx_z
rx_phase_shifts = dr.fma(rel_ant_pos_rx_y, k_rx_y, rx_phase_shifts)
rx_phase_shifts = dr.fma(rel_ant_pos_rx_x, k_rx_x, rx_phase_shifts)
# Total phase shift
# [num_rx, 1, rx_array_size, num_tx, 1, tx_array_size, max_num_paths]
phase_shifts = rx_phase_shifts + tx_phase_shifts
phase_shifts = dr.two_pi*phase_shifts/self._wavelength
# Apply the phase shifts
# [num_rx, num_rx_patterns, rx_array_size, num_tx, num_tx_patterns,
# tx_array_size, max_num_paths]
sin_phi, cos_phi = dr.sincos(phase_shifts)
a_real_ = dr.fma(a_real, cos_phi, -a_imag*sin_phi)
a_imag_ = dr.fma(a_real, sin_phi, a_imag*cos_phi)
self._a_real = a_real_
self._a_imag = a_imag_
def _fuse_pattern_array_dims(self) -> None:
r"""
Merges the pattern and array dimentions of the tensors storing the
channel coefficients
"""
num_rx, num_rx_patterns, num_rx_ant, num_tx, num_tx_patterns,\
num_tx_ant, max_num_paths = dr.shape(self._a_real)
# [num_rx, num_rx_patterns*rx_array_size, num_tx,
# num_tx_patterns*tx_array_size, max_num_paths]
self._a_real = dr.reshape(mi.TensorXf, self._a_real,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
self._a_imag = dr.reshape(mi.TensorXf, self._a_imag,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
if not self._synthetic_array:
self._valid = dr.reshape(mi.TensorXb, self._valid,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
self._tau = dr.reshape(mi.TensorXf, self._tau,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
self._theta_t = dr.reshape(mi.TensorXf, self._theta_t,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
self._phi_t = dr.reshape(mi.TensorXf, self._phi_t,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
self._theta_r = dr.reshape(mi.TensorXf, self._theta_r,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
self._phi_r = dr.reshape(mi.TensorXf, self._phi_r,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
self._doppler = dr.reshape(mi.TensorXf, self._doppler,
[num_rx, num_rx_patterns*num_rx_ant,
num_tx, num_tx_patterns*num_tx_ant,
max_num_paths])
def _build_empty_paths(self) -> None:
r"""
Builds empty tensors with dimensions fitting the setting of the paths
buffer, i.e., same number of radio devices, antennas, etc
"""
max_depth = self._paths_buffer.max_depth
num_tx = self._num_tx
num_rx = self._num_rx
num_tx_ant = self._tx_array.num_ant
tx_array_size = self._tx_array.array_size
num_rx_ant = self._rx_array.num_ant
rx_array_size = self._rx_array.array_size
a_tensor_base_shape = [num_rx, num_rx_ant, num_tx, num_tx_ant, 0]
if self._synthetic_array:
other_tensor_base_shape = [num_rx, num_tx, 0]
else:
other_tensor_base_shape = [num_rx, rx_array_size, num_tx,
tx_array_size, 0]
self._valid = dr.full(mi.TensorXb, False, other_tensor_base_shape)
self._a_real = dr.zeros(mi.TensorXf, a_tensor_base_shape)
self._a_imag = dr.zeros(mi.TensorXf, a_tensor_base_shape)
self._tau = dr.zeros(mi.TensorXf, other_tensor_base_shape)
self._theta_t = dr.zeros(mi.TensorXf, other_tensor_base_shape)
self._phi_t = dr.zeros(mi.TensorXf, other_tensor_base_shape)
self._theta_r = dr.zeros(mi.TensorXf, other_tensor_base_shape)
self._phi_r = dr.zeros(mi.TensorXf, other_tensor_base_shape)
self._doppler = dr.zeros(mi.TensorXf, other_tensor_base_shape)
#
self._interactions = dr.full(mi.TensorXu, InteractionType.NONE,
[max_depth] + other_tensor_base_shape)
self._shapes = dr.full(mi.TensorXu, INVALID_SHAPE,
[max_depth] + other_tensor_base_shape)
self._primitives = dr.full(mi.TensorXu, INVALID_PRIMITIVE,
[max_depth] + other_tensor_base_shape)
self._vertices = dr.zeros(mi.TensorXf,
[max_depth] + other_tensor_base_shape + [3])
self._paths_components_built = True
def _build_paths_components(self) -> None:
r"""
Builds and fills tensors for storing the additional paths data required
for paths visualization or more exhaustive studies of the paths
(vertices, interaction types, etc)
"""
paths_buffer = self._paths_buffer
depth_dim_size = paths_buffer.depth_dim_size
total_paths_count = paths_buffer.buffer_size
num_tx = self._num_tx
num_rx = self._num_rx
tx_array_size = self._eff_tx_array_size
rx_array_size = self._eff_rx_array_size
tx_ind = self._tx_ind
rx_ind = self._rx_ind
tx_ant_ind = self._tx_ant_ind
rx_ant_ind = self._rx_ant_ind
num_rx_patterns = len(self._rx_array.antenna_pattern.patterns)
num_tx_patterns = len(self._tx_array.antenna_pattern.patterns)
# Path index such that paths sharing the same (source, target) couple
# do not share the same index.
path_ind = self._path_ind
# Maximum number of paths over all (source, target) couples
max_num_paths = self._max_num_paths
# Shape of the tensors
if self._synthetic_array:
tensor_shape = [depth_dim_size, num_rx, num_tx, max_num_paths]
num_rx_patterns = num_tx_patterns = 1
else:
tensor_shape = [depth_dim_size, num_rx, num_rx_patterns,
rx_array_size, num_tx, num_tx_patterns,
tx_array_size, max_num_paths]
# To build the indices that are used for scattering in the tensors, we
# reshape the path, device, and antenna indices to tensors to leverage
# the broadcasting feature of dr.jit
# [depth_dim_size]
depth_ind = dr.arange(mi.UInt, depth_dim_size)
# [1, depth_dim_size]
depth_ind = dr.reshape(mi.TensorXu, depth_ind, [1, depth_dim_size])
# [total_paths_count, 1]
path_ind = dr.reshape(mi.TensorXu, path_ind, [total_paths_count, 1])
tx_ant_ind = dr.reshape(mi.TensorXu, tx_ant_ind, [total_paths_count, 1])
tx_ind = dr.reshape(mi.TensorXu, tx_ind, [total_paths_count, 1])
rx_ant_ind = dr.reshape(mi.TensorXu, rx_ant_ind, [total_paths_count, 1])
rx_ind = dr.reshape(mi.TensorXu, rx_ind, [total_paths_count, 1])
# Scatter indices
f1 = max_num_paths
f2 = tx_array_size*f1
f3 = num_tx_patterns*f2
f4 = num_tx*f3
f5 = rx_array_size*f4
f6 = num_rx_patterns*f5
f7 = num_rx*f6
# [total_paths_count, depth_dim_size]
scat_ind = path_ind + tx_ant_ind*f1 + tx_ind*f3 + rx_ant_ind*f4\
+ rx_ind*f6 + depth_ind*f7
# [total_paths_count*depth_dim_size]
scat_ind = scat_ind.array
# Allocate and fill the tensors
interactions = dr.full(mi.TensorXu, InteractionType.NONE, tensor_shape)
shapes = dr.full(mi.TensorXu, INVALID_SHAPE, tensor_shape)
primitives = dr.full(mi.TensorXu, INVALID_PRIMITIVE, tensor_shape)
# `vertices` requires an extra dimension for storing the (x,y,z)
# coordinates
vertices = dr.zeros(mi.TensorXf, tensor_shape + [3])
for n in range(num_rx_patterns):
scat_ind_ = scat_ind + n*f5
for m in range(num_tx_patterns):
scat_ind__ = scat_ind_ + m*f2
#
dr.scatter(interactions.array,
paths_buffer.interaction_types.array,
scat_ind__)
#
dr.scatter(shapes.array, paths_buffer.shapes.array, scat_ind__)
#
dr.scatter(primitives.array, paths_buffer.primitives.array,
scat_ind__)
#
dr.scatter(vertices.array, paths_buffer.vertices_x.array,
scat_ind__*3)
dr.scatter(vertices.array, paths_buffer.vertices_y.array,
scat_ind__*3 + 1)
dr.scatter(vertices.array, paths_buffer.vertices_z.array,
scat_ind__*3 + 2)
if not self._synthetic_array:
interactions = dr.reshape(mi.TensorXu, interactions,
[depth_dim_size,
num_rx, num_rx_patterns*rx_array_size,
num_tx, num_tx_patterns*tx_array_size,
max_num_paths])
shapes = dr.reshape(mi.TensorXu, shapes,
[depth_dim_size,
num_rx, num_rx_patterns*rx_array_size,
num_tx, num_tx_patterns*tx_array_size,
max_num_paths])
primitives = dr.reshape(mi.TensorXu, primitives,
[depth_dim_size,
num_rx, num_rx_patterns*rx_array_size,
num_tx, num_tx_patterns*tx_array_size,
max_num_paths])
vertices = dr.reshape(mi.TensorXf, vertices,
[depth_dim_size,
num_rx, num_rx_patterns*rx_array_size,
num_tx, num_tx_patterns*tx_array_size,
max_num_paths, 3])
self._interactions = interactions
self._shapes = shapes
self._primitives = primitives
self._vertices = vertices
self._paths_components_built = True
def _finalize_doppler_shift_compute(self) -> mi.Float:
r"""
Finalizes the computation of the Doppler shift of paths by applying the
shift due to mobility of radio devices
:return: Finalized Doppler shift [Hz]
"""
paths_buffer = self._paths_buffer
tx_ind = self._tx_ind
rx_ind = self._rx_ind
# Doppler shift due to transmitters mobility
# Paths direction of departure
k_tx = r_hat(paths_buffer.theta_t, paths_buffer.phi_t)
# Transmitters velocities
v_tx = dr.gather(mi.Vector3f, self._tx_velocities, tx_ind)
# Doppler shift [Hz]
tx_doppler = dr.dot(k_tx, v_tx)/self._wavelength
# Doppler shift due to receivers mobility
# Paths direction of departure
k_rx = -r_hat(paths_buffer.theta_r, paths_buffer.phi_r)
# Transmitters velocities
v_rx = dr.gather(mi.Vector3f, self._rx_velocities, rx_ind)
# Doppler shift [Hz]
rx_doppler = dr.dot(k_rx, v_rx)/self._wavelength
doppler = paths_buffer.doppler + tx_doppler - rx_doppler
return doppler
def _reverse_direction(self,
t_list : list[mi.TensorXf]) -> list[mi.TensorXf]:
r"""
Reverses the direction of the paths
Assumes that the tensors in `t_list` have the same shape.
:param t_list: List of tensors to reverse the direction of the paths
:return: List of tensors with the direction of the paths reversed
"""
item = t_list[0]
# Transpose the tensor using gather and scatter without for loops
original_shape = item.shape
num_items = len(original_shape)
if num_items == 5:
num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths = original_shape
target_shape = [num_tx, num_tx_ant, num_rx, num_rx_ant, num_paths]
elif num_items == 3:
num_rx, num_tx, num_paths = original_shape
target_shape = [num_tx, num_rx, num_paths]
num_rx_ant = num_tx_ant = 1
# Stop immediately if there are no paths
if num_paths == 0:
output_list = [dr.zeros(mi.TensorXf, target_shape) for _ in t_list]
return output_list
# Compute the indices for the gathering operation that implements
# the transpose operation
# Indices as we want them in the transposed tensor
tx, txa, rx, rxa, p = dr.meshgrid(
dr.arange(mi.UInt, num_tx),
dr.arange(mi.UInt, num_tx_ant),
dr.arange(mi.UInt, num_rx),
dr.arange(mi.UInt, num_rx_ant),
dr.arange(mi.UInt, num_paths),
indexing='ij'
)
# Indices are computed to fit the tensor from which we are gathering
gather_ind = rx*num_rx_ant*num_tx*num_tx_ant*num_paths + \
rxa*num_tx*num_tx_ant*num_paths + \
tx*num_tx_ant*num_paths + \
txa*num_paths + \
p
# Gather the values from the original tensor using the new indices
output_list = []
for item in t_list:
item_ = dr.gather(mi.Float, item.array, gather_ind)
item_ = dr.reshape(mi.TensorXf, item_, target_shape)
output_list.append(item_)
return output_list