Source code for sionna.rt.radio_map_solvers.radio_map_solver

#
# SPDX-FileCopyrightText: Copyright (c) 2021-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
"""Radio map solver"""

import mitsuba as mi
import drjit as dr
from typing import Tuple, Callable, List

from sionna.rt.utils import spawn_ray_from_sources, fibonacci_lattice,\
    rotation_matrix, spectrum_to_matrix_4f, WedgeGeometry,\
    sample_wedge_diffraction_point, spawn_ray_towards, sample_keller_cone,\
    spawn_ray_to
from sionna.rt import Scene
from sionna.rt.antenna_pattern import antenna_pattern_to_world_implicit
from sionna.rt.constants import InteractionType, MIN_SEGMENT_LENGTH
from sionna.rt.scene_object import SceneObject
from sionna.rt.scene_utils import extend_scene_with_mesh
from .radio_map import RadioMap
from .planar_radio_map import PlanarRadioMap
from .mesh_radio_map import MeshRadioMap
from sionna.rt.utils.hashing import EdgeHasher


[docs] class RadioMapSolver: # pylint: disable=line-too-long r""" Class that implements the radio map solver This solver generates a radio map for each transmitter within the scene. For any given transmitter, the radio map is calculated over a measurement surface that is segmented into flat cells. Each cell :math:`i` in the radio map is associated with the quantity: .. math:: :label: cm_def g_i = \frac{1}{\lvert C_i \rvert} \int_{C_{i}} \lvert h(s) \rvert^2 ds. Here, :math:`\lvert h(s) \rvert^2` represents the sum of the squared amplitudes of the path coefficients :math:`a_i` at the position :math:`s=(x,y)`, assuming an ideal isotropic receiver. The integral is evaluated over the cell :math:`C_i`, with :math:`ds` being the infinitesimally small surface element, defined as :math:`ds=dx \cdot dy`. The value :math:`g_i` can be interpreted as the average :attr:`~sionna.rt.RadioMap.path_gain` across a cell. This solver approximates :math:`g_i` using Monte Carlo integration. For further details, refer to the `section on the radio map solver of the Sionna RT Technical Report <https://nvlabs.github.io/sionna/rt/tech-report/S4.html>`_. The path gain can be transformed into the received signal strength (:attr:`~sionna.rt.RadioMap.rss`) by multiplying it with the transmit :attr:`~sionna.rt.Transmitter.power`: .. math:: \mathrm{RSS}_{i} = P_{tx} g_{i}. In scenarios with multiple transmitters, the signal-to-interference-plus-noise ratio (SINR) for a specific transmitter :math:`k` is calculated as follows: .. math:: \mathrm{SINR}^k_{i}=\frac{\mathrm{RSS}^k_{i}}{N_0+\sum_{k'\ne k} \mathrm{RSS}^{k'}_{i}} where :math:`N_0` [W] represents the thermal noise power of the scene (:attr:`~sionna.rt.Scene.thermal_noise_power`), which is determined by: .. math:: N_0 = B \times T \times k. In this equation, :math:`B` [Hz] is the bandwidth of the transmission (:attr:`~sionna.rt.Scene.bandwidth`), :math:`T` [K] is the temperature of the scene (:attr:`~sionna.rt.Scene.temperature`), and :math:`k = 1.380649 \times 10^{-23}` [J/K] is the Boltzmann constant. This solver supports arbitrary meshes as measurement surfaces, where each triangle in the mesh acts as a cell in the radio map. If a mesh is not provided, the solver computes the radio map over a rectangular measurement grid, which is defined by the parameters: ``center``, ``orientation``, ``size``, and ``cell_size``. The resulting output is then a real-valued matrix of dimensions ``[num_cells_y, num_cells_x]`` for each transmitter, where: .. math:: \texttt{num\_cells\_x} = \bigg\lceil\frac{\texttt{size[0]}}{\texttt{cell\_size[0]}} \bigg\rceil\\ \texttt{num\_cells\_y} = \bigg\lceil \frac{\texttt{size[1]}}{\texttt{cell\_size[1]}} \bigg\rceil. An orientation of (0,0,0) aligns the radio map parallel to the XY plane, with the surface normal directed towards the :math:`+z` axis. By default, the radio map is set parallel to the XY plane, spans the entire scene, and is positioned at an elevation of :math:`z = 1.5\text{m}`. For transmitters equipped with multiple antennas, transmit precoding is applied as specified by the ``precoding_vec``. For each path :math:`n` that intersects a radio map cell :math:`i`, the channel coefficients :math:`a_n` and the angles of departure (AoDs) :math:`(\theta_{\text{T},n}, \varphi_{\text{T},n})` and arrival (AoAs) :math:`(\theta_{\text{R},n}, \varphi_{\text{R},n})` are determined. For further information, refer to the `Primer on Electromagnetics <../em_primer.html>`_. A "synthetic" array is simulated by introducing additional phase shifts, which are dependent on the antenna's position relative to the transmitter and the AoDs. Let :math:`\mathbf{d}_{\text{T},k}\in\mathbb{R}^3` represent the relative position of the :math:`k^\text{th}` antenna (relative to the transmitter's position) for which the channel impulse response is calculated. This can be accessed via the antenna array's property :attr:`~sionna.rt.AntennaArray.positions`. Assuming a plane-wave model, the phase shift for this antenna is computed as: .. math:: p_{\text{T},n,k} = \frac{2\pi}{\lambda}\hat{\mathbf{r}}(\theta_{\text{T},n}, \varphi_{\text{T},n})^\mathsf{T} \mathbf{d}_{\text{T},k}. The path coefficient for the :math:`k\text{th}` antenna is then expressed as: .. math:: h_{n,k} = a_n e^{j p_{\text{T}, n,k}}. These coefficients form the complex-valued channel vector :math:`\mathbf{h}_n` with a size of :math:`\texttt{num\_tx\_ant}`. Finally, the coefficient for the equivalent SISO channel is given by: .. math:: h_n = \mathbf{h}_n^{\textsf{H}} \mathbf{p} where :math:`\mathbf{p}` denotes the precoding vector ``precoding_vec``. Note ----- This solver supports Russian roulette, which can significantly improve the efficiency of ray tracing by terminating rays that contribute little to the final result. The implementation of Russian roulette in this solver consists in terminating a ray with probability equal to the complement of its path gain (without the distance-based path loss). Formally, after the :math:`d^{\text{th}}` bounce, the ray path loss is set to: .. math:: a_d \leftarrow \begin{cases} \frac{a_d}{\sqrt{\min \{ p_{c},|a_d|^2 \}}}, & \text{with probability } \min \{ p_{c},|a_d|^2 \}\\ 0, & \text{with probability } 1 - \min \{ p_{c},|a_d|^2 \} \end{cases} where :math:`a_d` is the path coefficient corresponding to the ray (without the distance-based pathloss) and :math:`p_c` the maximum probability with which to continue a path (``rr_prob``). The first case consists in continuing the ray, whereas the second case consists in terminating the ray. When the ray is continued, the scaling by :math:`\frac{1}{\sqrt{\min \{ p_{c},|a_d|^2 \}}}` ensures an unbiased map by accounting for the rays that were terminated. When a ray is terminated, it is no longer traced, leading to a reduction of the required computations. Russian roulette is by default disabled. It can be enabled by setting the ``rr_depth`` parameter to a positive value. ``rr_depth`` corresponds to the path depth, i.e., the number of bounces, from which on Russian roulette is enabled. Note ----- The parameter ``stop_threshold`` can be set to deactivate (i.e., stop tracing) paths whose gain has dropped below this threshold (in dB). Example ------- .. code-block:: python import sionna from sionna.rt import load_scene, PlanarArray, Transmitter, RadioMapSolver scene = load_scene(sionna.rt.scene.munich) scene.radio_materials["marble"].thickness = 0.5 # Configure antenna array for all transmitters scene.tx_array = PlanarArray(num_rows=8, num_cols=2, vertical_spacing=0.7, horizontal_spacing=0.5, pattern="tr38901", polarization="VH") # Add a transmitters tx = Transmitter(name="tx", position=[8.5,21,30], orientation=[0,0,0]) scene.add(tx) tx.look_at(mi.Point3f(40,80,1.5)) solver = RadioMapSolver() rm = solver(scene, cell_size=(1., 1.), samples_per_tx=100000000) scene.preview(radio_map=rm, clip_at=15., rm_vmin=-100.) .. figure:: ../figures/radio_map_preview.png :align: center """ # Size of the hash table for the edge intersections _WEDGE_INTERSECTIONS_SIZE = 10000 def __init__(self): # Sampler self._sampler = mi.load_dict({'type': 'independent'}) # Dr.Jit mode for running the loop that implement the solver. # Symbolic mode is the fastest mode but does not currently support # automatic differentiation. self._loop_mode = "symbolic" self.edge_hash_functions = [EdgeHasher(op='round'), EdgeHasher(op='floor')] @property def loop_mode(self): # pylint: disable=line-too-long r"""Get/set the Dr.Jit mode used to evaluate the loop that implements the solver. Should be one of "evaluated" or "symbolic". Symbolic mode (default) is the fastest one but does not support automatic differentiation. For more details, see the `corresponding Dr.Jit documentation <https://drjit.readthedocs.io/en/latest/cflow.html#sym-eval>`_. :type: "evaluated" | "symbolic" """ return self._loop_mode @loop_mode.setter def loop_mode(self, mode): if mode not in ("evaluated", "symbolic"): raise ValueError("Invalid loop mode. Must be either 'evaluated'" " or 'symbolic'") self._loop_mode = mode
[docs] def __call__( self, scene: Scene, center: mi.Point3f | None = None, orientation: mi.Point3f | None = None, size: mi.Point2f | None = None, cell_size: mi.Point2f = mi.Point2f(10, 10), measurement_surface: mi.Shape | SceneObject | None = None, precoding_vec: Tuple[mi.TensorXf, mi.TensorXf] | None = None, samples_per_tx: int = 1000000, max_depth: int = 3, los: bool = True, specular_reflection: bool = True, diffuse_reflection: bool = False, refraction: bool = True, diffraction: bool = False, edge_diffraction: bool = False, diffraction_lit_region: bool = True, seed: int = 42, rr_depth: int = -1, rr_prob: float = 0.95, stop_threshold: float | None = None, modified_scene: mi.Scene | None = None ) -> RadioMap: # pylint: disable=line-too-long r""" Executes the solver :param scene: Scene for which to compute the radio map :param center: Center of the radio map measurement plane :math:`(x,y,z)` [m] as a three-dimensional vector. Ignored if ``measurement_surface`` is provided. If set to `None`, the radio map is centered on the center of the scene, except for the elevation :math:`z` that is set to 1.5m. Otherwise, ``orientation`` and ``size`` must be provided. :param orientation: Orientation of the radio map measurement plane :math:`(\alpha, \beta, \gamma)` specified through three angles corresponding to a 3D rotation as defined in :eq:`rotation`. Ignored if ``measurement_surface`` is provided. An orientation of :math:`(0,0,0)` or `None` corresponds to a radio map that is parallel to the XY plane. If not set to `None`, then ``center`` and ``size`` must be provided. :param size: Size of the radio map measurement plane [m]. Ignored if ``measurement_surface`` is provided. If set to `None`, then the size of the radio map is set such that it covers the entire scene. Otherwise, ``center`` and ``orientation`` must be provided. :param cell_size: Size of a cell of the radio map measurement plane [m]. Ignored if ``measurement_surface`` is provided. :param measurement_surface: Measurement surface. If set, the radio map is computed for this surface, where every triangle in the mesh is a cell in the radio map. If set to `None`, then the radio map is computed for a measurement grid defined by ``center``, ``orientation``, ``size``, and ``cell_size``. :param precoding_vec: Real and imaginary components of the complex-valued precoding vector. If set to `None`, then defaults to :math:`\frac{1}{\sqrt{\text{num\_tx\_ant}}} [1,\dots,1]^{\mathsf{T}}`. :param samples_per_tx: Number of samples per source :param max_depth: Maximum depth :param los: Enable line-of-sight paths :param specular_reflection: Enable specular reflections :param diffuse_reflection: Enable diffuse reflectios :param refraction: Enable refraction :param diffraction: Enable diffraction :param edge_diffraction: Enable diffraction on free floating edges :param diffraction_lit_region: Enable diffraction in the lit region :param seed: Seed :param rr_depth: Depth from which on to start Russian roulette :param rr_prob: Maximum probability with which to keep a path when Russian roulette is enabled :param stop_threshold: Gain threshold [dB] below which a path is deactivated :param modified_scene: Advanced parameter. Can be used to provide a Mitsuba scene instance that was already extended to contain the measurement surface. This can be useful when calling the solver in a loop with the same scene and measurement surface. Not applicable to planar radio maps. :return: Computed radio map """ # Check that the scene is all set for simulations scene.all_set(radio_map=True) # Check and initialize the precoding vector num_tx = len(scene.transmitters) num_tx_ant = scene.tx_array.num_ant if precoding_vec is None: precoding_vec_real = dr.ones(mi.TensorXf, [num_tx, num_tx_ant]) precoding_vec_real /= dr.sqrt(scene.tx_array.num_ant) precoding_vec_imag = dr.zeros(mi.TensorXf, [num_tx, num_tx_ant]) precoding_vec = (precoding_vec_real, precoding_vec_imag) else: precoding_vec_real, precoding_vec_imag = precoding_vec if not isinstance(precoding_vec_real, type(precoding_vec_imag)): raise TypeError("The real and imaginary components of "\ "`precoding_vec` must be of the same type") # If a single precoding vector is provided, then it is used by # all transmitters if ( isinstance(precoding_vec_real, mi.Float) or (isinstance(precoding_vec_real, mi.TensorXf) and len(dr.shape(precoding_vec_real)) == 1) ): precoding_vec_real = mi.Float(precoding_vec_real) precoding_vec_imag = mi.Float(precoding_vec_imag) precoding_vec_real = dr.tile(precoding_vec_real, num_tx) precoding_vec_imag = dr.tile(precoding_vec_imag, num_tx) # precoding_vec_real = dr.reshape(mi.TensorXf, precoding_vec_real, [num_tx, num_tx_ant]) precoding_vec_imag = dr.reshape(mi.TensorXf, precoding_vec_imag, [num_tx, num_tx_ant]) precoding_vec = (precoding_vec_real, precoding_vec_imag) # Transmitter configurations # Generates sources positions and orientations tx_positions, tx_orientations, rel_ant_positions_tx, _ = \ scene.sources(True, False) dr.make_opaque(tx_positions, tx_orientations) # Trace paths and compute channel impulse responses tx_antenna_patterns = scene.tx_array.antenna_pattern.patterns num_tx = dr.shape(tx_positions)[1] num_samples = samples_per_tx * num_tx # If the Russian roulette threshold depth is set to -1, disable Russian # roulette by setting the threshold depth to a value higher than # `max_depth` if rr_depth == -1: rr_depth = max_depth + 1 # If a threshold for the path gain is set below which paths are # deactivated, then convert it to linear scale if stop_threshold is not None: stop_threshold = dr.power(10., stop_threshold / 10.) # Set the seed of the sampler self._sampler.seed(seed, num_samples) # Build Radio Map instance # If a measurement surface is provided, then it is a mesh radio map. # Moreover, in this case, the Mitsuba scene is modified by adding the # measurement surface to the scene. if measurement_surface is not None: if isinstance(measurement_surface, SceneObject): measurement_surface = measurement_surface.mi_mesh if modified_scene is None: modified_scene = extend_scene_with_mesh(scene.mi_scene, measurement_surface) radio_map = MeshRadioMap(scene, measurement_surface) else: if modified_scene is not None: raise ValueError( "Parameter `modified_scene` should be omitted (or set to None) when using" " planar radio maps (i.e., `measurement_surface` is None)." ) modified_scene = scene.mi_scene radio_map = PlanarRadioMap(scene, cell_size, center, orientation, size) # Computes the pathloss map (except for diffraction) # `radio_map` is updated in-place. # This flag is set to avoid tracing the loops twice, which adds # a fairly significant CPU overhead. with dr.scoped_set_flag(dr.JitFlag.OptimizeLoops, False): with scene.use_mi_scene(modified_scene): wedges, wedges_counter = self._shoot_and_bounce( scene, radio_map, self._sampler, tx_positions, tx_orientations, tx_antenna_patterns, precoding_vec, rel_ant_positions_tx, samples_per_tx, max_depth, los, specular_reflection, diffuse_reflection, refraction, diffraction, edge_diffraction, rr_depth, rr_prob, stop_threshold ) # Add the contribution of diffracted paths to # the radio map if diffraction and max_depth > 0 and wedges_counter > 0: # Reseting the seed of the sampler to ensure its state # was scheduled self._sampler.seed(seed + 1 , num_samples) self._evaluate_first_order_diffraction( scene, radio_map, self._sampler, tx_positions, tx_orientations, tx_antenna_patterns, precoding_vec, rel_ant_positions_tx, samples_per_tx, wedges, diffraction_lit_region ) return radio_map
# pylint: disable=line-too-long @dr.syntax def _shoot_and_bounce( self, scene: Scene, radio_map: RadioMap, sampler: mi.Sampler, tx_positions: mi.Point3f, tx_orientations: mi.Point3f, tx_antenna_patterns: List[Callable[[mi.Float, mi.Float], Tuple[mi.Complex2f, mi.Complex2f]]], precoding_vec: Tuple[mi.TensorXf, mi.TensorXf], rel_ant_positions_tx: mi.Point3f, samples_per_tx: int, max_depth: int, los_enabled: bool, specular_reflection_enabled: bool, diffuse_reflection_enabled: bool, refraction_enabled: bool, diffraction_enabled: bool, edge_diffraction_enabled: bool, rr_depth: int, rr_prob: float, stop_threshold: float | None, ) -> WedgeGeometry | None: r""" Executes the shoot-and-bounce loop The ``radio_map`` is updated in-place. :param scene: Scene for which to compute the radio map :param radio_map: Radio map :param sampler: Sampler used to generate random numbers and vectors :param tx_positions: Positions of the transmitters :param tx_orientations: Orientations of the transmitters :param tx_antenna_patterns: Antenna pattern of the transmitters :param precoding_vec: Real and imaginary components of the complex-valued precoding vector :param rel_ant_positions_tx: Positions of the antenna elements relative to the transmitters positions :param samples_per_tx: Number of samples per source :param max_depth: Maximum depth :param los_enabled: If set to `True`, then the line-of-sight paths are computed :param specular_reflection_enabled: If set to `True`, then the specularly reflected paths are computed :param diffuse_reflection_enabled: If set to `True`, then the diffusely reflected paths are computed :param refraction_enabled: If set to `True`, then the refracted paths are computed :param diffraction_enabled: If set to `True`, then the diffracted paths are computed :param edge_diffraction_enabled: If set to `True`, then the edge diffraction is computed :param rr_depth: Depth from which to start Russian roulette :param rr_prob: Minimum probability with which to keep a path when Russian roulette is enabled :param stop_threshold: Gain threshold (linear scale) below which a path is deactivated :return: Wedge geometry for diffraction and number of wedges or `None`\ if diffraction is disabled """ num_txs = dr.shape(tx_positions)[1] num_samples = samples_per_tx * num_txs num_tx_ant_patterns = len(tx_antenna_patterns) tx_indices = dr.repeat(dr.arange(mi.UInt, num_txs), samples_per_tx) # Spawn rays from the transmitters ray = spawn_ray_from_sources(fibonacci_lattice, samples_per_tx, tx_positions) # Weights to account for the antenna array and precoding array_w = self._synthetic_array_weighting(scene, ray.d, rel_ant_positions_tx, precoding_vec) # Mask indicating which rays are active active = dr.full(dr.mask_t(mi.Float), True, num_samples) # Flag storing which type of interactions are enabled for the # current interaction enabled_interactions = dr.full(mi.UInt, 0, num_samples) if specular_reflection_enabled: enabled_interactions |= InteractionType.SPECULAR if diffuse_reflection_enabled: enabled_interactions |= InteractionType.DIFFUSE if refraction_enabled: enabled_interactions |= InteractionType.REFRACTION # Length of the ray tube. # Only required if a threshold is set to deactivate path based on their # gain if stop_threshold is not None: ray_tube_length = dr.zeros(mi.Float, num_samples) else: ray_tube_length = None # Hash table storing intersection counts for each edge used in diffraction. # This avoids allocating memory proportional to the total number of edges # in the scene, which is efficient since typically only a small subset of # edges participate in diffraction, particularly for large scenes, and as # only first-order diffraction is computed. num_hashes = len(self.edge_hash_functions) if diffraction_enabled: wedges_counter = [ dr.zeros(mi.UInt, RadioMapSolver._WEDGE_INTERSECTIONS_SIZE) for _ in range(num_hashes) ] next_wedge_ind = mi.UInt(0) wedges = WedgeGeometry.build_with_size( RadioMapSolver._WEDGE_INTERSECTIONS_SIZE) else: wedges_counter = None next_wedge_ind = None wedges = None # Solid angle of the ray tube. # It is required to compute the diffusely reflected field. # Initialized assuming that all the rays initially spawn from the source # share the unit sphere equally, i.e., initialized to # 4*PI/samples_per_src solid_angle = dr.full(mi.Float, 4.0 * dr.pi * dr.rcp(samples_per_tx), num_samples) # Initialize the electric field # Orientation of sources and targets and correspond to-world transforms sample_tx_orientation = dr.repeat(tx_orientations, samples_per_tx) tx_to_world = rotation_matrix(sample_tx_orientation) # The following loop transports and update the electric field. # The electric field is initialized by the source antenna pattern e_fields = [antenna_pattern_to_world_implicit(src_antenna_pattern, tx_to_world, ray.d, direction="out") for src_antenna_pattern in tx_antenna_patterns] depth = mi.UInt(0) while dr.hint(active, mode=self.loop_mode, exclude=[ array_w, tx_positions, edge_diffraction_enabled, enabled_interactions, los_enabled, max_depth, next_wedge_ind, num_hashes, num_tx_ant_patterns, radio_map, rr_depth, rr_prob, scene, self, stop_threshold, tx_indices, wedges, wedges_counter ]): # Test intersection with the scene si_scene = scene.mi_scene.ray_intersect(ray, active=active) # For first-order diffraction: project intersection points onto wedges # and store wedge geometry for subsequent diffraction calculations if dr.hint(diffraction_enabled, mode="scalar"): store_wedges = active & si_scene.is_valid() & (depth == 0) \ & (si_scene.shape != radio_map.measurement_surface) # Sample diffraction point on wedges valid_wedge_, wedges_, _ = sample_wedge_diffraction_point( si_scene, ray.o, ray.d, sampler.next_1d(), edge_diffraction_enabled, store_wedges) store_wedges &= valid_wedge_ # Store wedges for i in range(num_hashes): hash_value = self.edge_hash_functions[i]( wedges_.o, wedges_.o + wedges_.e_hat * wedges_.length ) counter_ind = hash_value % RadioMapSolver._WEDGE_INTERSECTIONS_SIZE sample_counter = dr.scatter_inc(wedges_counter[i], counter_ind, store_wedges) store_wedges &= sample_counter == 0 wedge_ind = dr.scatter_inc(next_wedge_ind, mi.UInt(0), store_wedges) store_wedges &= wedge_ind < RadioMapSolver._WEDGE_INTERSECTIONS_SIZE dr.scatter(wedges, wedges_, wedge_ind, store_wedges) # Each interaction processes either an interaction with the scene # or an interaction with the measurement plane. # `mp_int` is set to `True` if the interaction is with the measurement # plane. # `scene_int` is set to `True` if the interaction is with the scene. # If the radio map is planar, then test interaction with the measurement # plane. Otherwise, the measurement surface is assumed to be part of # the scene and we only need to check that it has hit the measurement surface. si_mp = None if dr.hint(isinstance(radio_map, PlanarRadioMap), mode="scalar"): si_mp = radio_map.measurement_surface.ray_intersect( ray, active=active ) # An intersection with the measurement plane is valid only if # (i) it was not obstructed by the scene, and (ii) the intersection # is valid. mp_int = active & si_mp.is_valid() & (si_mp.t < si_scene.t) # Ignore interaction with measurement plane if LoS is disabled mp_int &= ((depth > 0) | los_enabled) else: # The measurement surface is part of the scene si_mp = si_scene mp_int = active & si_mp.is_valid() \ & (si_mp.shape == radio_map.measurement_surface) # Update the radio map # The radio map is not updated if LoS is disabled and it is a # LoS interaction with the measurement surface # Note that we need to redo the LoS test to handle the case where # the measurement surface is part of the scene update_radio_map = mp_int & ((depth > 0) | los_enabled) radio_map.add_paths(e_fields, array_w, si_mp, ray.d, tx_indices, update_radio_map, diffracted_paths=False, solid_angle=solid_angle) # Update the active state of rays # Active if has hit the scene (which includes the measurement surface # if it is part of the scene) active &= si_scene.is_valid() # It is an interaction with the scene if (i) the ray has hit the scene # and (ii) it has not hit the measurement plane scene_int = active & ~mp_int # Sample the BSDF # Sample random numbers to sample the BSDF sample1 = sampler.next_1d() sample2 = sampler.next_2d() s, e_fields = self._sample_radio_material( si_scene, ray.d, e_fields, solid_angle, sample1, sample2, enabled_interactions, scene_int ) interaction_type = dr.select(scene_int, s.sampled_component, InteractionType.NONE) diffuse = scene_int & (interaction_type == InteractionType.DIFFUSE) # Update the solid angle # If a diffuse reflection is sampled, then it is set to 2PI. # Otherwise it is left unchanged solid_angle = dr.select(diffuse, dr.two_pi, solid_angle) # Spawn rays for next iteration ray_scene_int = si_scene.spawn_ray(d=s.wo) ray_mp_int = si_mp.spawn_ray(d=ray.d) ray = dr.select(scene_int, ray_scene_int, ray_mp_int) depth = dr.select(scene_int, depth + 1, depth) active &= (depth <= max_depth) active &= mp_int | (interaction_type != InteractionType.NONE) ## Russian roulette and gain threshold deactivation # Compute the path gain (without applying the spreading factor) gain = dr.sum([dr.squared_norm(e_field) for e_field in e_fields]) gain /= num_tx_ant_patterns # Update the ray tube length if dr.hint(stop_threshold is not None, mode="scalar"): ray_tube_length = dr.select(diffuse, 0.0, ray_tube_length) ray_tube_length += dr.select(scene_int, si_scene.t, si_mp.t) # Is the depth threshold to activate Russian roulette reached? rr_inactive = depth < rr_depth # User specify a maximum probability of continuing tracing rr_continue_prob = dr.minimum(gain, rr_prob) # Randomly stop tracing of rays rr_continue = scene_int & (sampler.next_1d() < rr_continue_prob) active &= (rr_inactive | rr_continue) # Scale the remaining rays accordingly to ensure an unbiased result for i in range(num_tx_ant_patterns): e_fields[i] = dr.select(rr_inactive, e_fields[i], e_fields[i] * dr.rsqrt(rr_continue_prob)) # Deactivate rays with gains below the set threshold if dr.hint(stop_threshold is not None, mode="scalar"): gain_pl = gain * dr.square(scene.wavelength * dr.rcp(4. * dr.pi * ray_tube_length)) th_continue = scene_int & (gain_pl > stop_threshold) active &= th_continue return wedges, next_wedge_ind @dr.syntax def _evaluate_first_order_diffraction( self, scene: Scene, radio_map: RadioMap, sampler: mi.Sampler, tx_positions: mi.Point3f, tx_orientations: mi.Point3f, tx_antenna_patterns: List[Callable[[mi.Float, mi.Float], Tuple[mi.Complex2f, mi.Complex2f]]], precoding_vec: Tuple[mi.TensorXf, mi.TensorXf], rel_ant_positions_tx: mi.Point3f, samples_per_tx: int, wedges: WedgeGeometry, diffraction_lit_region: bool): """ Traces first-order diffracted paths for a set of `wedges` and adds their contributions to the radio map This method computes the electric field contributions from rays that undergo first-order diffraction at wedges in the scene. It samples diffraction points along the wedges and evaluates the diffracted field. If paths intersect with the measurement surface, then their contributions are added to the radio map. Wedges are sampled proportionally to their length. :param scene: Scene containing the geometry and material properties :param radio_map: Radio map object to be updated with diffraction contributions :param sampler: Sampler for generating random numbers :param tx_positions: Positions of all transmitters in the scene :param tx_orientations: Orientations of all transmitters in the scene :param tx_antenna_patterns: List of antenna pattern functions for each transmitter :param precoding_vec: Precoding vectors for transmitter arrays :param rel_ant_positions_tx: Relative positions of antenna elements in transmitter arrays :param samples_per_tx: Number of samples to generate per transmitter :param wedges: Geometry information for all wedges in the scene :param diffraction_lit_region: Enable diffraction in the lit region """ num_txs = dr.shape(tx_positions)[1] num_samples = num_txs * samples_per_tx is_planar_radio_map = isinstance(radio_map, PlanarRadioMap) ############################################################# # Sample diffraction point on the wedges ############################################################# # Sample wedges # Wedges are sampled proportionnally to their length total_length = dr.sum(wedges.length) wedges_sample_prob = wedges.length / total_length dist = mi.DiscreteDistribution(wedges_sample_prob) wedges_index = dist.sample(sampler.next_1d()) sampled_wedges = dr.gather(WedgeGeometry, wedges, wedges_index) # Compute the number of samples for each wedge. # To avoid contention, we spread the targets of atomic operations. spread_factor = 8 intersections_cnt = dr.zeros( mi.UInt, spread_factor * dr.width(wedges.length) ) dr.scatter_add(intersections_cnt, 1, spread_factor * wedges_index) samples_per_wedge = dr.gather(mi.UInt, intersections_cnt, spread_factor * wedges_index) tx_indices = dr.repeat(dr.arange(mi.UInt, num_txs), samples_per_tx) tx_positions_ = dr.gather(mi.Point3f, tx_positions, tx_indices) # Sample points on the wedges diff_point = sampled_wedges.o +\ sampler.next_1d() * sampled_wedges.length * sampled_wedges.e_hat # Direction of incident wave ki = dr.normalize(diff_point - tx_positions_) # To avoid leakage, we offset the diffraction point for shooting rays # to test for interaction with a surface that would overlap with one # of the wedge face. # Note that this is only done for visibility test. Computation of the # diffraction field is done with the original diffraction point. edge_diffraction = sampled_wedges.primn == sampled_wedges.prim0 ne = dr.select(edge_diffraction, mi.Vector3f(0), dr.normalize(sampled_wedges.n0 + sampled_wedges.nn)) diff_point_offset = diff_point + 5e-2*ne ############################################################# # Initialize the electric field and array weighting ############################################################# # Weights to account for the antenna array and precoding array_w = self._synthetic_array_weighting(scene, ki, rel_ant_positions_tx, precoding_vec) # Initialize the electric field # Orientation of sources and targets and correspond to-world transforms sample_tx_orientation = dr.repeat(tx_orientations, samples_per_tx) tx_to_world = rotation_matrix(sample_tx_orientation) # The following loop transports and update the electric field. # The electric field is initialized by the source antenna pattern e_fields = [antenna_pattern_to_world_implicit(src_antenna_pattern, tx_to_world, ki, direction="out") for src_antenna_pattern in tx_antenna_patterns] ############################################################# # Test occlusion between source and diffraction point ############################################################# ray_occ = spawn_ray_to(tx_positions_, diff_point) vis_ray1 = spawn_ray_to(tx_positions_, diff_point_offset) # In the case of mesh-based radio maps, we must discard # intersections with the measurement surface active_s1 = dr.full(mi.Bool, True, num_samples) hit_scene = dr.full(mi.Bool, False, num_samples) while dr.hint(active_s1, mode=self.loop_mode): # Mark intersections with the scene si = scene.mi_scene.ray_intersect(ray_occ, active=active_s1) hit_scene |= si.is_valid() &\ (si.shape != radio_map.measurement_surface) # Test for visibility with an offset diffraction point # to avoid leakage si_vis = scene.mi_scene.ray_intersect(vis_ray1, active=active_s1) hit_scene |= si_vis.is_valid() &\ (si_vis.shape != radio_map.measurement_surface) # Deactivate ray if it hit the scene (occluded) or if it didn't hit # anything (reached the diffraction point without obstruction) active_s1 &= ~hit_scene & si.is_valid() ray_occ = si.spawn_ray_to(diff_point) vis_ray1 = si_vis.spawn_ray_to(diff_point_offset) valid_path = ~hit_scene del vis_ray1, ray_occ ############################################################# # Sample Keller cone ############################################################# ko = sample_keller_cone(sampled_wedges.e_hat, sampled_wedges.n0, sampled_wedges.nn, sampler.next_1d(), ki, diffraction_lit_region) ############################################################# # Add path to radio map ############################################################# ray_mp = spawn_ray_towards(diff_point, diff_point + ko, ne) vis_ray2 = spawn_ray_to(diff_point_offset, diff_point_offset + ko) # For mesh-based radio maps, handle potential multiple # intersections with the measurement surface while dr.hint(valid_path, mode=self.loop_mode, exclude=[array_w, tx_positions]): # Test intersection with the measurement # surface, and ensure that the segment from the # diffraction to the measurement surface is not # occluded. si_mp = None si_vis = None if dr.hint(is_planar_radio_map, mode='scalar'): si_mp = radio_map.measurement_surface.ray_intersect( ray_mp, active=valid_path ) valid_path &= si_mp.is_valid() # Ensure that the segment from the diffraction to the # measurement surface is not occluded by the scene ray_to_plane = spawn_ray_to(si_mp.p, diff_point) valid_path &= ~scene.mi_scene.ray_test( ray_to_plane, coherent=False, active=valid_path ) del ray_to_plane # Test for visibility with an offset diffraction point # to avoid leakage vis_ray3 = spawn_ray_to(si_mp.p, diff_point_offset) valid_path &= ~scene.mi_scene.ray_test( vis_ray3, coherent=False, active=valid_path ) del vis_ray3 si_vis = si_mp else: si_mp = scene.mi_scene.ray_intersect( ray_mp, ray_flags=mi.RayFlags.Minimal, coherent=False, active=valid_path ) valid_path &= si_mp.is_valid() & \ (si_mp.shape == radio_map.measurement_surface) # Test for visibility with an offset diffraction point # to avoid leakage si_vis = scene.mi_scene.ray_intersect( vis_ray2, ray_flags=mi.RayFlags.Minimal, coherent=False, active=valid_path ) valid_path &= si_vis.is_valid() &\ (si_vis.shape == radio_map.measurement_surface) # Discard paths where either segment is too short s_prime = dr.norm(diff_point - tx_positions_) s = dr.norm(diff_point - si_mp.p) add_to_rm = valid_path & (s > MIN_SEGMENT_LENGTH) \ & (s_prime > MIN_SEGMENT_LENGTH) # Compute the diffracted field shape = mi.ShapePtr(sampled_wedges.shape) e_fields = self._evaluate_radio_material_diffraction( shape, sampled_wedges, diff_point, ki, ko, e_fields, s, s_prime, add_to_rm ) # Add contribution to the radio map radio_map.add_paths(e_fields, array_w, si_mp, ko, tx_indices, add_to_rm, True, None, tx_positions, sampled_wedges, diff_point, samples_per_wedge) # Spawn new rays ray_mp = si_mp.spawn_ray(ko) vis_ray2 = si_vis.spawn_ray(ko) del si_mp, si_vis @dr.syntax def _synthetic_array_weighting( self, scene: Scene, k_tx: mi.Vector3f, rel_ant_positions_tx: mi.Point3f, precoding_vec: Tuple[mi.TensorXf, mi.TensorXf], ) -> List[mi.Float]: r""" Computes the weighting to apply to the electric field to synthetically model the transmitter array :param scene: Scene for which to compute the radio map :param k_tx: Directions of departures of paths :param rel_ant_positions_tx: Positions of the antenna elements relative to the transmitters positions :param precoding_vec: Real and imaginary components of the complex-valued precoding vector :return: Weightings """ num_patterns = len(scene.tx_array.antenna_pattern.patterns) array_size = scene.tx_array.array_size num_samples = dr.shape(k_tx)[-1] num_tx = len(scene.transmitters) samples_per_tx = num_samples // num_tx # k_tx : 3*[num_samples = num_tx*samples_per_tx] # rel_ant_positions_tx : 3*[num_tx, array_size] # precoding_vec : 2*[num_tx, num_patterns*array_size] precoding_vec_real_, precoding_vec_imag_ = precoding_vec precoding_vec_real = [] precoding_vec_imag = [] precoding_vec_real_ = dr.reshape(mi.TensorXf, precoding_vec_real_.array, [num_tx, num_patterns, array_size]) precoding_vec_imag_ = dr.reshape(mi.TensorXf, precoding_vec_imag_.array, [num_tx, num_patterns, array_size]) # [num_tx, array_size] for i in range(num_patterns): precoding_vec_real.append(precoding_vec_real_[...,i,:]) precoding_vec_imag.append(precoding_vec_imag_[...,i,:]) # It is required here to allocate # Weights for each antenna pattern w_real = [] w_imag = [] for i in range(num_patterns): w_real.append(dr.zeros(mi.Float, num_samples)) w_imag.append(dr.zeros(mi.Float, num_samples)) ############################################################## # Iterate over the antennas to compute the weighting ############################################################## ant_gather_indices = dr.repeat(dr.arange(mi.UInt, num_tx), samples_per_tx) * array_size n = mi.UInt(0) while dr.hint(n < array_size, mode=self.loop_mode): # Extract the relative position of antenna # [num_samples, 3] ant_pos = dr.gather(mi.Point3f, rel_ant_positions_tx, ant_gather_indices) # Compute the phase shifts # [num_samples] tx_phase_shifts = dr.dot(ant_pos, k_tx) tx_phase_shifts *= dr.two_pi / scene.wavelength array_vec_imag, array_vec_real = dr.sincos(tx_phase_shifts) for i in range(num_patterns): # Dot product with precoding vector iteratively computed # [num_samples] prec_real = dr.gather(mi.Float, precoding_vec_real[i].array, ant_gather_indices) prec_imag = dr.gather(mi.Float, precoding_vec_imag[i].array, ant_gather_indices) # [num_samples] w_real_ = array_vec_real * prec_real \ - array_vec_imag * prec_imag w_imag_ = array_vec_real * prec_imag \ + array_vec_imag * prec_real # [num_samples] w_real[i] += w_real_ w_imag[i] += w_imag_ # Prepare for next antenna ant_gather_indices += 1 n += 1 # Reshape to fit total number of samples w = [] for i in range(num_patterns): w_ = mi.Matrix4f(w_real[i], 0.0, -w_imag[i], 0.0, 0.0, w_real[i], 0.0, -w_imag[i], w_imag[i], 0.0, w_real[i], 0.0, 0.0, w_imag[i], 0.0, w_real[i]) w.append(w_) return w def _sample_radio_material( self, si: mi.SurfaceInteraction3f, k_world: mi.Vector3f, e_fields: List[mi.Vector4f], solid_angle: mi.Float, sample1: mi.Float, sample2: mi.Point2f, enabled_interactions: mi.UInt, active: mi.Bool ) -> Tuple[mi.BSDFSample3f, mi.Vector4f]: r""" Evaluates the radio material and updates the electric field accordingly :param si: Information about the interaction of the rays with a surface of the scene :param k_world: Direction of propagation of the incident wave in the world frame :param e_fields: Electric field Jones vector as a 4D real-valued vector :param solid_angle: Ray tube solid angle [sr] :param sample1: Random float uniformly distributed in :math:`[0,1]`. Used to sample the interaction type. :param sample2: Random 2D point uniformly distributed in :math:`[0,1]^2`. Used to sample the direction of diffusely reflected waves. :param enabled_interactions: Flags indicating the enabled interactions :param active: Mask to specify active rays :return: Updated electric field and sampling record """ # Ensure the normal is oriented in the opposite of the direction of # propagation of the incident wave normal_world = si.n * dr.sign(dr.dot(si.n, -k_world)) si.sh_frame.n = normal_world si.initialize_sh_frame() si.n = normal_world # Set `si.wi` to the local direction of propagation of the incident wave si.wi = si.to_local(k_world) # `si.dp_du.z` stores the flags indicating the enabled interactions si.dp_du = mi.Vector3f(0., 0., 0.) si.dp_du.z = dr.reinterpret_array(mi.Float, enabled_interactions) # Specify the components that are required ctx = mi.BSDFContext(mode=mi.TransportMode.Importance, type_mask=0, component=0) # Samples and evaluate the radio material for i, e_field in enumerate(e_fields): # `si.duv_dx` and `si.duv_dy` stores the incident field si.duv_dx = mi.Vector2f(e_field.x, # S e_field.y) # P si.duv_dy = mi.Vector2f(e_field.z, # S e_field.w) # P # `si.t` stores the solid angle si.t = solid_angle # Sample and evaluate the radio material bs_sample, jones_mat = si.bsdf().sample(ctx, si, sample1, sample2, active) jones_mat = spectrum_to_matrix_4f(jones_mat) # Update the field by applying the Jones matrix e_fields[i] = dr.select( active, mi.Vector4f(jones_mat @ e_field), e_fields[i] ) # Note: we assume that the `bs_sample` returned is the same for all # `e_field` values, so we just return the last one. return bs_sample, e_fields def _evaluate_radio_material_diffraction(self, shape: mi.ShapePtr, wedges: WedgeGeometry, diff_point: mi.Point3f, ki_world: mi.Vector3f, ko_world: mi.Vector3f, e_fields: List[mi.Vector4f], s: mi.Float, s_prime: mi.Float, active: mi.Bool ) -> Tuple[mi.Vector4f]: # pylint: disable=line-too-long r""" Evaluates the radio material for diffraction and updates the electric field accordingly :param shape: Intersected shape :param wedges: Geometry of the intersected wedges :param diff_point: Position of the diffraction point on the wedge :param ki_world: Directions of propagation of the incident waves in the world frame :param ko_world: Directions of propagation of the scattered waves in the world frame :param e_fields: Jones vector representing the electric field as a 4D real-valued vector :param s: Distance parameter for diffraction calculations :param s_prime: Second distance parameter for diffraction calculations :param active: Mask to specify active rays :return: Updated electric field """ # Radio material of the intersected shape rm = shape.bsdf() # Build a surface interaction object and context object to call the # radio material si = dr.zeros(mi.SurfaceInteraction3f, shape.shape[0]) ctx = mi.BSDFContext(mode=mi.TransportMode.Importance, type_mask=0, component=0) # If diffraction is globally disabled, we can avoid runing the related code to # speed up the computation ctx.component |= InteractionType.DIFFRACTION # Ensure the normal is oriented in the opposite of the direction of # propagation of the incident wave normal_world = wedges.n0 * dr.sign(dr.dot(wedges.n0, -ki_world)) si.n = normal_world si.sh_frame.n = normal_world si.initialize_sh_frame() # Set `si.wi` to the local direction of propagation of the incident wave si.wi = si.to_local(ki_world) # Interaction point si.p = diff_point # Intersected shape si.shape = shape # Intersected primitive si.prim_index = wedges.prim0 # `si.dn_du` stores the edge vector in the local frame si.dn_du = si.to_local(wedges.e_hat) # `si.dn_dv` stores the normal to the n-face in the local frame si.dn_dv = si.to_local(wedges.nn) # `si.dp_du` stores the path length from the diffraction point to the # source, target, and the interaction type. enabled_interactions = mi.UInt(InteractionType.DIFFRACTION) si.dp_du = mi.Vector3f(s, s_prime, dr.reinterpret_array(mi.Float, enabled_interactions)) # Update the fields # Spreading factor sf = dr.rsqrt(s*s_prime*(s + s_prime)) for i, e_field in enumerate(e_fields): # `si.duv_dx` and `si.duv_dy` stores the incident field si.duv_dx = mi.Vector2f(e_field.x, # S e_field.y) # P si.duv_dy = mi.Vector2f(e_field.z, # S e_field.w) # P # Evaluate the radio material jones_mat = rm.eval(ctx, si, ko_world, active) jones_mat = spectrum_to_matrix_4f(jones_mat) jones_mat *= sf # Update the field by applying the Jones matrix e_fields[i] = dr.select(active, jones_mat@e_field, e_field) return e_fields def _hash_edge_index( self, shape_ind: mi.UInt, prim_ind: mi.UInt, local_edge_ind: mi.UInt ) -> mi.UInt: r""" Hashes an edge indexed by a shape, primitive, and local edge index into a single integer :param shape_ind: Shape index :param prim_ind: Primitive index :param local_edge_ind: Local edge index """ hash_value = 101 hash_value = hash_value * 1009 + shape_ind hash_value = hash_value * 9176 + prim_ind hash_value = hash_value * 92821 + local_edge_ind return hash_value