Source code for sionna.fec.utils

#
# SPDX-FileCopyrightText: Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
"""Utility functions and layers for the FEC package."""

import tensorflow as tf
from tensorflow.keras.layers import Layer
import numpy as np
import matplotlib.pyplot as plt
import warnings
from importlib_resources import files, as_file
from sionna.fec.ldpc import codes
from sionna.utils import log2
from sionna.nr.utils import generate_prng_seq as generate_prng_seq_utils


[docs]class GaussianPriorSource(Layer): r"""GaussianPriorSource(specified_by_mi=False, dtype=tf.float32, **kwargs) Generates `fake` LLRs as if the all-zero codeword was transmitted over an Bi-AWGN channel with noise variance ``no`` or mutual information (if ``specified_by_mi`` is True). If selected, the mutual information denotes the mutual information associated with a binary random variable observed at the output of a corresponding AWGN channel (cf. Gaussian approximation). .. image:: ../figures/GaussianPriorSource.png The generated LLRs are drawn from a Gaussian distribution with .. math:: \sigma_{\text{llr}}^2 = \frac{4}{\sigma_\text{ch}^2} and .. math:: \mu_{\text{llr}} = \frac{\sigma_\text{llr}^2}{2} where :math:`\sigma_\text{ch}^2` is the channel noise variance as defined by ``no``. If ``specified_by_mi`` is True, this class uses the of the so-called `J-function` (relates mutual information to Gaussian distributed LLRs) as proposed in [Brannstrom]_. Parameters ---------- specified_by_mi : bool Defaults to False. If True, the second input parameter ``no`` is interpreted as mutual information instead of noise variance. dtype : tf.DType Defaults to `tf.float32`. Defines the datatype for internal calculations and the output. Must be one of the following `(tf.float16, tf.bfloat16, tf.float32, tf.float64)`. Input ----- (output_shape, no): Tuple: output_shape : tf.int Integer tensor or Python array defining the shape of the desired output tensor. no : tf.float32 Scalar defining the noise variance or mutual information (if ``specified_by_mi`` is True) of the corresponding (fake) AWGN channel. Output ------ : ``dtype``, defaults to `tf.float32` 1+D Tensor with shape as defined by ``output_shape``. Raises ------ InvalidArgumentError If mutual information is not in (0,1). AssertionError If ``inputs`` is not a list with 2 elements. """ def __init__(self, specified_by_mi=False, dtype=tf.float32, **kwargs): if dtype not in (tf.float16, tf.float32, tf.float64, tf.bfloat16, tf.complex64, tf.complex128): raise ValueError("Only float dtypes are supported.") # use real_dtype to support tf.complex super().__init__(dtype=dtype.real_dtype, **kwargs) assert isinstance(specified_by_mi, bool),"specified_by_mi must be bool." self._specified_by_mi = specified_by_mi def call(self, inputs): """Generate Gaussian distributed fake LLRs as if the all-zero codeword was transmitted over an Bi-AWGN channel. Args: inputs (list): ``[output_shape, no]``, where ``output_shape`` (tf.int32): 1D list or tensor describing the desired shape of the output. ``no`` (tf.float32): Scalar defining the noise variance or mutual information (if ``specified_by_mi`` is True) of the corresponding (fake) AWGN channel. Returns: 1+D Tensor (``dtype``): Shape as defined by ``output_shape``. """ assert isinstance(inputs, (list, tuple)), \ "inputs must be a list or tuple." assert len(inputs)==2, "inputs must be a list with 2 elements." output_shape, noise_var = inputs if self._specified_by_mi: # interpret noise_var as mutual information mi_a = tf.cast(noise_var, tf.float32) tf.debugging.assert_greater_equal(mi_a, 0., "Mutual information must be positive.") tf.debugging.assert_less_equal(mi_a, 1., "Mutual information must be less or equal 1.") #clip Ia to range (0,1) mi_a = tf.maximum(mi_a, 1e-7) mi_a = tf.minimum(mi_a, 1.) mu_llr = j_fun_inv_tf(mi_a) sigma_llr = tf.math.sqrt(2*mu_llr) else: noise_var = tf.cast(noise_var, tf.float32) # noise_var must be positive noise_var = tf.maximum(noise_var, 1e-7) sigma_llr = tf.math.sqrt(4 / noise_var) mu_llr = sigma_llr**2 / 2 mu_llr = tf.cast(mu_llr, super().dtype) sigma_llr = tf.cast(sigma_llr, super().dtype) # generate LLRs with Gaussian approximation (BPSK, all-zero cw) # Use negative mean as we generate logits with definition p(b=1)/p(b=0) llr = tf.random.normal(output_shape, mean=-1.*mu_llr, stddev=sigma_llr, dtype=super().dtype) return llr
[docs]def llr2mi(llr, s=None, reduce_dims=True): # pylint: disable=line-too-long r"""Implements an approximation of the mutual information based on LLRs. The function approximates the mutual information for given ``llr`` as derived in [Hagenauer]_ assuming an `all-zero codeword` transmission .. math:: I \approx 1 - \sum \operatorname{log_2} \left( 1 + \operatorname{e}^{-\text{llr}} \right). This approximation assumes that the following `symmetry condition` is fulfilled .. math:: p(\text{llr}|x=0) = p(\text{llr}|x=1) \cdot \operatorname{exp}(\text{llr}). For `non-all-zero` codeword transmissions, this methods requires knowledge about the signs of the original bit sequence ``s`` and flips the signs correspondingly (as if the all-zero codeword was transmitted). Please note that we define LLRs as :math:`\frac{p(x=1)}{p(x=0)}`, i.e., the sign of the LLRs differ to the solution in [Hagenauer]_. Input ----- llr : tf.float32 Tensor of arbitrary shape containing LLR-values. s : None or tf.float32 Tensor of same shape as llr containing the signs of the transmitted sequence (assuming BPSK), i.e., +/-1 values. reduce_dims : bool Defaults to True. If True, all dimensions are reduced and the return is a scalar. Otherwise, `reduce_mean` is only taken over the last dimension. Output ------ mi : tf.float32 A scalar tensor (if ``reduce_dims`` is True) or a tensor of same shape as ``llr`` apart from the last dimensions that is removed. It contains the approximated value of the mutual information. Raises ------ TypeError If dtype of ``llr`` is not a real-valued float. """ if s is None: s = tf.ones_like(llr) if llr.dtype not in (tf.float16, tf.bfloat16, tf.float32, tf.float64): raise TypeError("Dtype of llr must be a real-valued float.") # ensure that both tensors are compatible s = tf.cast(s, llr.dtype) # scramble sign as if all-zero cw was transmitted llr_zero = tf.multiply(s, llr) llr_zero = tf.clip_by_value(llr_zero, -20., 20.) # clip for stability x = log2(1. + tf.exp(1.* llr_zero)) if reduce_dims: x = 1. - tf.reduce_mean(x) else: x = 1. - tf.reduce_mean(x, axis=-1) return x
[docs]def j_fun(mu): # pylint: disable=line-too-long r"""Calculates the `J-function` in NumPy. The so-called `J-function` relates mutual information to the mean of Gaussian distributed LLRs (cf. Gaussian approximation). We use the approximation as proposed in [Brannstrom]_ which can be written as .. math:: J(\mu) \approx \left( 1- 2^{H_\text{1}(2\mu)^{H_\text{2}}}\right)^{H_\text{2}} with :math:`\mu` denoting the mean value of the LLR distribution and :math:`H_\text{1}=0.3073`, :math:`H_\text{2}=0.8935` and :math:`H_\text{3}=1.1064`. Input ----- mu : float float or `ndarray` of float. Output ------ : float `ndarray` of same shape as the input. """ assert np.all(mu<1000), "mu too large." # we support exact 0 for EXIT (clipping is used in any way) assert np.all(mu>-0.0001), "mu must be positive." h1 = 0.3073 h2 = 0.8935 h3 = 1.1064 mu = np.maximum(mu, 1e-10) # input must be positive for numerical stability mi = (1-2**(-h1*(2*mu)**h2))**h3 return mi
[docs]def j_fun_inv(mi): # pylint: disable=line-too-long r"""Calculates the inverse `J-function` in NumPy. The so-called `J-function` relates mutual information to the mean of Gaussian distributed LLRs (cf. Gaussian approximation). We use the approximation as proposed in [Brannstrom]_ which can be written as .. math:: J(\mu) \approx \left( 1- 2^{H_\text{1}(2\mu)^{H_\text{2}}}\right)^{H_\text{2}} with :math:`\mu` denoting the mean value of the LLR distribution and :math:`H_\text{1}=0.3073`, :math:`H_\text{2}=0.8935` and :math:`H_\text{3}=1.1064`. Input ----- mi : float float or `ndarray` of float. Output ------- : float `ndarray` of same shape as the input. Raises ------ AssertionError If ``mi`` < 0.001 or ``mi`` > 0.999. """ assert np.all(mi<0.999), "mi must be smaller 1." assert np.all(mi>0.001), "mi must be greater 0." h1 = 0.3073 h2 = 0.8935 h3 = 1.1064 mi = np.maximum(mi,1e-10) # add small value to avoid log(0) mu = 0.5*((-1/h1)*np.log2((1-mi**(1/h3)) + 1e-12))**(1/(h2)) return np.minimum(mu, 20) # clipp the output to mu_max =20
[docs]def j_fun_tf(mu, verify_inputs=True): # pylint: disable=line-too-long r"""Calculates the `J-function` in Tensorflow. The so-called `J-function` relates mutual information to the mean of Gaussian distributed LLRs (cf. Gaussian approximation). We use the approximation as proposed in [Brannstrom]_ which can be written as .. math:: J(\mu) \approx \left( 1- 2^{H_\text{1}(2\mu)^{H_\text{2}}}\right)^{H_\text{2}} with :math:`\mu` denoting the mean value of the LLR distribution and :math:`H_\text{1}=0.3073`, :math:`H_\text{2}=0.8935` and :math:`H_\text{3}=1.1064`. Input ----- mu : tf.float32 Tensor of arbitrary shape. verify_inputs : bool A boolean defaults to True. If True, ``mu`` is clipped internally to be numerical stable. Output ------ : tf.float32 Tensor of same shape and dtype as ``mu``. Raises ------ InvalidArgumentError If ``mu`` is negative. """ assert isinstance(verify_inputs, bool), "verify_inputs must be bool." if verify_inputs: # input must be positive for numerical stability mu = tf.maximum(mu, 1e-10) else: tf.debugging.assert_greater_equal(mu, 0., "mu must be positive.") h1 = 0.3073 h2 = 0.8935 h3 = 1.1064 mi = (1-2**(-h1*(2*mu)**h2))**h3 return mi
[docs]def j_fun_inv_tf(mi, verify_inputs=True): # pylint: disable=line-too-long r"""Calculates the inverse `J-function` in Tensorflow. The so-called `J-function` relates mutual information to the mean of Gaussian distributed LLRs (cf. Gaussian approximation). We use the approximation as proposed in [Brannstrom]_ which can be written as .. math:: J(\mu) \approx \left( 1- 2^{H_\text{1}(2\mu)^{H_\text{2}}}\right)^{H_\text{2}} with :math:`\mu` denoting the mean value of the LLR distribution and :math:`H_\text{1}=0.3073`, :math:`H_\text{2}=0.8935` and :math:`H_\text{3}=1.1064`. Input ----- mi : tf.float32 Tensor of arbitrary shape. verify_inputs : bool A boolean defaults to True. If True, ``mi`` is clipped internally to be numerical stable. Output ------ : tf.float32 Tensor of same shape and dtype as the ``mi``. Raises ------ InvalidArgumentError If ``mi`` is not in `(0,1)`. """ assert isinstance(verify_inputs, bool), "verify_inputs must be bool." if verify_inputs: # input must be positive for numerical stability mi = tf.maximum(mi, 1e-10) # ensure that I>0 mi = tf.minimum(mi, 1.) # ensure that I=<1 else: tf.debugging.assert_greater_equal(mi, 0., "mi must be positive.") tf.debugging.assert_less_equal(mi, 1., "mi must be less or equal 1.") h1 = 0.3073 h2 = 0.8935 h3 = 1.1064 mu = 0.5*((-1/h1) * log2((1-mi**(1/h3))))**(1/(h2)) return tf.minimum(mu, 20) # clipp the output to mu_max =20
[docs]def plot_trajectory(plot, mi_v, mi_c, ebno=None): """Utility function to plot the trajectory of an EXIT-chart. Input ----- plot : matplotlib.figure A handle to a matplotlib figure. mi_v : float An ndarray of floats containing the variable node mutual information. mi_c : float An ndarray of floats containing the check node mutual information. ebno : float A float denoting the EbNo in dB for the legend entry. """ assert (len(mi_v)==len(mi_c)), "mi_v and mi_c must have same length." # number of decoding iterations to plot iters = np.shape(mi_v)[0] - 1 x = np.zeros([2*iters]) y = np.zeros([2*iters]) # iterate between VN and CN MI value y[1] = mi_v[0] for i in range(1, iters): x[2*i] = mi_c[i-1] y[2*i] = mi_v[i-1] x[2*i+1] = mi_c[i-1] y[2*i+1] = mi_v[i] if ebno is not None: label_str = f"Actual trajectory @ {ebno} dB" else: label_str = "Actual trajectory" #plot trajectory plot.plot(x, y, "-", linewidth=3, color="g", label=label_str) plot.legend(fontsize=18) # and show the legend
[docs]def plot_exit_chart(mi_a=None, mi_ev=None, mi_ec=None, title="EXIT-Chart"): """Utility function to plot EXIT-Charts [tenBrinkEXIT]_. If all inputs are `None` an empty EXIT chart is generated. Otherwise, the mutual information curves are plotted. Input ----- mi_a : float An ndarray of floats containing the a priori mutual information. mi_v : float An ndarray of floats containing the variable node mutual information. mi_c : float An ndarray of floats containing the check node mutual information. title : str A string defining the title of the EXIT chart. Output ------ plt: matplotlib.figure A matplotlib figure handle Raises ------ AssertionError If ``title`` is not `str`. """ assert isinstance(title, str), "title must be str." if not (mi_ev is None and mi_ec is None): if mi_a is None: raise ValueError("mi_a cannot be None if mi_e is provided.") if mi_ev is not None: assert (len(mi_a)==len(mi_ev)), "mi_a and mi_ev must have same length." if mi_ec is not None: assert (len(mi_a)==len(mi_ec)), "mi_a and mi_ec must have same length." plt.figure(figsize=(10,10)) plt.title(title, fontsize=25) plt.xlabel("$I_{a}^v$, $I_{e}^c$", fontsize=25) plt.ylabel("$I_{e}^v$, $I_{a}^c$", fontsize=25) plt.grid(visible=True, which='major') # for MI, the x,y limits are always (0,1) plt.xlim(0, 1) plt.ylim(0, 1) plt.xticks(fontsize=18) plt.yticks(fontsize=18) # and plot EXIT curves if mi_ec is not None: plt.plot(mi_ec, mi_a, "r", linewidth=3, label="Check node") plt.legend() if mi_ev is not None: plt.plot(mi_a, mi_ev, "b", linewidth=3, label="Variable node") plt.legend() return plt
[docs]def get_exit_analytic(pcm, ebno_db): """Calculate the analytic EXIT-curves for a given parity-check matrix. This function extracts the degree profile from ``pcm`` and calculates the variable (VN) and check node (CN) decoder EXIT curves. Please note that this is an asymptotic tool which needs a certain codeword length for accurate predictions. Transmission over an AWGN channel with BPSK modulation and SNR ``ebno_db`` is assumed. The detailed equations can be found in [tenBrink]_ and [tenBrinkEXIT]_. Input ----- pcm : ndarray The parity-check matrix. ebno_db : float The channel SNR in dB. Output ------ mi_a : ndarray of floats NumPy array containing the `a priori` mutual information. mi_ev : ndarray of floats NumPy array containing the extrinsic mutual information of the variable node decoder for the corresponding ``mi_a``. mi_ec : ndarray of floats NumPy array containing the extrinsic mutual information of the check node decoder for the corresponding ``mi_a``. Note ---- This function assumes random parity-check matrices without any imposed structure. Thus, explicit code construction algorithms may lead to inaccurate EXIT predictions. Further, this function is based on asymptotic properties of the code, i.e., only works well for large parity-check matrices. For details see [tenBrink]_. """ # calc coderate n = pcm.shape[1] k = n - pcm.shape[0] coderate = k/n # calc mean and noise_var of Gaussian distributed LLRs for given channel SNR ebno = 10**(ebno_db/10) snr = ebno*coderate noise_var = 1/(2*snr) # For BiAWGN channels the LLRs follow a Gaussian distr. as given below [1] sigma_llr = np.sqrt(4 / noise_var) mu_llr = sigma_llr**2 / 2 # calculate max node degree # "+1" as the array indices later directly denote the node degrees and we # have to account the array start at position 0 (i.e., we need one more # element) c_max = int(np.max(np.sum(pcm, axis=1)) + 1 ) v_max = int(np.max(np.sum(pcm, axis=0)) + 1 ) # calculate degree profile (node perspective) c = np.histogram(np.sum(pcm, axis=1), bins=c_max, range=(0, c_max), density=False)[0] v = np.histogram(np.sum(pcm, axis=0), bins=v_max, range=(0, v_max), density=False)[0] # calculate degrees from edge perspective r = np.zeros([c_max]) for i in range(1,c_max): r[i] = (i-1)*c[i] r = r / np.sum(r) l = np.zeros([v_max]) for i in range(1,v_max): l[i] = (i-1)*v[i] l = l / np.sum(l) mi_a = np.arange(0.002, 0.998, 0.001) # quantize Ia with 0.01 resolution # Exit function of check node update mi_ec = np.zeros_like(mi_a) for i in range(1, c_max): mi_ec += r[i] * j_fun((i-1.) * j_fun_inv(1 - mi_a)) mi_ec = 1 - mi_ec # Exit function of variable node update mi_ev = np.zeros_like(mi_a) for i in range(1, v_max): mi_ev += l[i] * j_fun(mu_llr + (i-1.) * j_fun_inv(mi_a)) return mi_a, mi_ev, mi_ec
[docs]def load_parity_check_examples(pcm_id, verbose=False): # pylint: disable=line-too-long """Utility function to load example codes stored in sub-folder LDPC/codes. The following codes are available - 0 : `(7,4)`-Hamming code of length `k=4` information bits and codeword length `n=7`. - 1 : `(63,45)`-BCH code of length `k=45` information bits and codeword length `n=63`. - 2 : (127,106)-BCH code of length `k=106` information bits and codeword length `n=127`. - 3 : Random LDPC code with regular variable node degree 3 and check node degree 6 of length `k=50` information bits and codeword length `n=100`. - 4 : 802.11n LDPC code of length of length `k=324` information bits and codeword length `n=648`. Input ----- pcm_id : int An integer defining which matrix id to load. verbose : bool Defaults to False. If True, the code parameters are printed. Output ------ pcm: ndarray of `zeros` and `ones` An ndarray containing the parity check matrix. k : int An integer defining the number of information bits. n : int An integer defining the number of codeword bits. coderate : float A float defining the coderate (assuming full rank of parity-check matrix). """ source = files(codes).joinpath("example_codes.npy") with as_file(source) as code: pcms = np.load(code, allow_pickle=True) pcm = np.array(pcms[pcm_id]) # load parity-check matrix n = int(pcm.shape[1]) # number of codeword bits (codeword length) k = int(n - pcm.shape[0]) # number of information bits k per codeword coderate = k / n if verbose: print(f"\nn: {n}, k: {k}, coderate: {coderate:.3f}") return pcm, k, n, coderate
[docs]def bin2int(arr): """Convert binary array to integer. For example ``arr`` = `[1, 0, 1]` is converted to `5`. Input ----- arr: int or float An iterable that yields 0's and 1's. Output ----- : int Integer representation of ``arr``. """ if len(arr) == 0: return None return int(''.join([str(x) for x in arr]), 2)
[docs]def bin2int_tf(arr): """ Converts binary tensor to int tensor. Binary representation in ``arr`` is across the last dimension from most significant to least significant. For example ``arr`` = `[0, 1, 1]` is converted to `3`. Input ----- arr: int or float Tensor of 0's and 1's. Output ----- : int Tensor containing the integer representation of ``arr``. """ len_ = tf.shape(arr)[-1] shifts = tf.range(len_-1,-1,-1) # (2**len_-1)*arr[0] +... 2*arr[len_-2] + 1*arr[len_-1] op = tf.reduce_sum(tf.bitwise.left_shift(arr, shifts), axis=-1) return op
[docs]def int2bin(num, len_): """ Convert ``num`` of int type to list of length ``len_`` with 0's and 1's. ``num`` and ``len_`` have to non-negative. For e.g., ``num`` = `5`; `int2bin(num`, ``len_`` =4) = `[0, 1, 0, 1]`. For e.g., ``num`` = `12`; `int2bin(num`, ``len_`` =3) = `[1, 0, 0]`. Input ----- num: int An integer to be converted into binary representation. len_: int An integer defining the length of the desired output. Output ----- : list of int Binary representation of ``num`` of length ``len_``. """ assert num >= 0, "Input integer should be non-negative" assert len_ >= 0, "width should be non-negative" bin_ = format(num, f'0{len_}b') binary_vals = [int(x) for x in bin_[-len_:]] if len_ else [] return binary_vals
[docs]def int2bin_tf(ints, len_): """ Converts (int) tensor to (int) tensor with 0's and 1's. `len_` should be to non-negative. Additional dimension of size `len_` is inserted at end. Input ----- ints: int Tensor of arbitrary shape `[...,k]` containing integer to be converted into binary representation. len_: int An integer defining the length of the desired output. Output ----- : int Tensor of same shape as ``ints`` except dimension of length ``len_`` is added at the end `[...,k, len_]`. Contains the binary representation of ``ints`` of length ``len_``. """ assert len_ >= 0 shifts = tf.range(len_-1, -1, delta=-1) bits = tf.math.floormod( tf.bitwise.right_shift(tf.expand_dims(ints, -1), shifts), 2) return bits
[docs]def alist2mat(alist, verbose=True): # pylint: disable=line-too-long r"""Convert `alist` [MacKay]_ code definition to `full` parity-check matrix. Many code examples can be found in [UniKL]_. About `alist` (see [MacKay]_ for details): - `1.` Row defines parity-check matrix dimension `m x n` - `2.` Row defines int with `max_CN_degree`, `max_VN_degree` - `3.` Row defines VN degree of all `n` column - `4.` Row defines CN degree of all `m` rows - Next `n` rows contain non-zero entries of each column (can be zero padded at the end) - Next `m` rows contain non-zero entries of each row. Input ----- alist: list Nested list in `alist`-format [MacKay]_. verbose: bool Defaults to True. If True, the code parameters are printed. Output ------ (pcm, k, n, coderate): Tuple: pcm: ndarray NumPy array of shape `[n-k, n]` containing the parity-check matrix. k: int Number of information bits. n: int Number of codewords bits. coderate: float Coderate of the code. Note ---- Use :class:`~sionna.fec.utils.load_alist` to import alist from a textfile. For example, the following code snippet will import an alist from a file called ``filename``: .. code-block:: python al = load_alist(path = filename) pcm, k, n, coderate = alist2mat(al) """ assert len(alist)>4, "Invalid alist format." n = alist[0][0] m = alist[0][1] v_max = alist[1][0] c_max = alist[1][1] k = n - m coderate = k / n vn_profile = alist[2] cn_profile = alist[3] # plausibility checks assert np.sum(vn_profile)==np.sum(cn_profile), "Invalid alist format." assert np.max(vn_profile)==v_max, "Invalid alist format." assert np.max(cn_profile)==c_max, "Invalid alist format." if len(alist)==len(vn_profile)+4: print("Note: .alist does not contain (redundant) CN perspective.") print("Recovering parity-check matrix from VN only.") print("Please verify the correctness of the results manually.") vn_only = True else: assert len(alist)==len(vn_profile) + len(cn_profile) + 4, \ "Invalid alist format." vn_only = False pcm = np.zeros((m,n)) num_edges = 0 # count number of edges for idx_v in range(n): for idx_i in range(vn_profile[idx_v]): # first 4 rows of alist contain meta information idx_c = alist[4+idx_v][idx_i]-1 # "-1" as this is python pcm[idx_c, idx_v] = 1 num_edges += 1 # count number of edges (=each non-zero entry) # validate results from CN perspective if not vn_only: for idx_c in range(m): for idx_i in range(cn_profile[idx_c]): # first 4 rows of alist contain meta information # follwing n rows contained VN perspective idx_v = alist[4+n+idx_c][idx_i]-1 # "-1" as this is python assert pcm[idx_c, idx_v]==1 # entry must already exist if verbose: print("Number of variable nodes (columns): ", n) print("Number of check nodes (rows): ", m) print("Number of information bits per cw: ", k) print("Number edges: ", num_edges) print("Max. VN degree: ", v_max) print("Max. CN degree: ", c_max) print("VN degree: ", vn_profile) print("CN degree: ", cn_profile) return pcm, k, n, coderate
[docs]def load_alist(path): """Read `alist`-file [MacKay]_ and return nested list describing the parity-check matrix of a code. Many code examples can be found in [UniKL]_. Input ----- path:str Path to file to be loaded. Output ------ alist: list A nested list containing the imported alist data. """ alist = [] with open(path, "r") as reader: # pylint: disable=unspecified-encoding # read list line by line (different length) for line in reader: l = [] # append all entries for word in line.split(): l.append(int(word)) if l: # ignore empty lines alist.append(l) return alist
[docs]def make_systematic(mat, is_pcm=False): r"""Bring binary matrix in its systematic form. Input ----- mat : ndarray Binary matrix to be transformed to systematic form of shape `[k, n]`. is_pcm: bool Defaults to False. If true, ``mat`` is interpreted as parity-check matrix and, thus, the last k columns will be the identity part. Output ------ mat_sys: ndarray Binary matrix in systematic form, i.e., the first `k` columns equal the identity matrix (or last `k` if ``is_pcm`` is True). column_swaps: list of int tuples A list of integer tuples that describes the swapped columns (in the order of execution). Note ---- This algorithm (potentially) swaps columns of the input matrix. Thus, the resulting systematic matrix (potentially) relates to a permuted version of the code, this is defined by the returned list ``column_swap``. Note that, the inverse permutation must be applied in the inverse list order (in case specific columns are swapped multiple times). If a parity-check matrix is passed as input (i.e., ``is_pcm`` is True), the identity part will be re-arranged to the last columns.""" m = mat.shape[0] n = mat.shape[1] assert m<=n, "Invalid matrix dimensions." # check for all-zero columns (=unchecked nodes) if is_pcm: c_node_deg = np.sum(mat, axis=0) if np.any(c_node_deg==0): warnings.warn("All-zero column in parity-check matrix detected. " \ "It seems as if the code contains unprotected nodes.") mat = np.copy(mat) column_swaps = [] # store all column swaps # convert to bool for faster arithmetics mat = mat.astype(bool) # bring in upper triangular form for idx_c in range(m): success = mat[idx_c, idx_c] if not success: # skip if leading "1" already occurred # step 1: find next leading "1" for idx_r in range(idx_c+1,m): # skip if entry is "0" if mat[idx_r, idx_c]: mat[[idx_c, idx_r]] = mat[[idx_r, idx_c]] # swap rows success = True break # Could not find "1"-entry for column idx_c # => swap with columns from non-sys part # The task is to find a column with index idx_cc that has a "1" at # row idx_c if not success: for idx_cc in range(idx_c+1, n): if mat[idx_c, idx_cc]: # swap columns mat[:,[idx_c, idx_cc]] = mat[:,[idx_cc, idx_c]] column_swaps.append([idx_c, idx_cc]) success = True break if not success: raise ValueError("Could not succeed; mat is not full rank?") # we can now assume a leading "1" at row idx_c for idx_r in range(idx_c+1, m): if mat[idx_r, idx_c]: mat[idx_r,:] ^= mat[idx_c,:] # bin. add of row idx_c to idx_r # remove upper triangle part in inverse order for idx_c in range(m-1, -1, -1): for idx_r in range(idx_c-1, -1, -1): if mat[idx_r, idx_c]: mat[idx_r,:] ^= mat[idx_c,:] # bin. add of row idx_c to idx_r # verify results assert np.array_equal(mat[:,:m], np.eye(m)), \ "Internal error, could not find systematic matrix." # bring identity part to end of matrix if parity-check matrix is provided if is_pcm: # individual column swaps instead of copying entire block # this simplifies the tracking of column swaps. for i in range(n-1, (n-1)-m, -1): j = i - (n-m) mat[:,[i, j]] = mat[:,[j, i]] column_swaps.append([i, j]) # return integer array mat = mat.astype(int) return mat, column_swaps
[docs]def gm2pcm(gm, verify_results=True): r"""Generate the parity-check matrix for a given generator matrix. This function brings ``gm`` :math:`\mathbf{G}` in its systematic form and uses the following relation to find the parity-check matrix :math:`\mathbf{H}` in GF(2) .. math:: \mathbf{G} = [\mathbf{I} | \mathbf{M}] \Leftrightarrow \mathbf{H} = [\mathbf{M} ^t | \mathbf{I}]. \tag{1} This follows from the fact that for an all-zero syndrome, it must hold that .. math:: \mathbf{H} \mathbf{c}^t = \mathbf{H} * (\mathbf{u} * \mathbf{G})^t = \mathbf{H} * \mathbf{G} ^t * \mathbf{u}^t =: \mathbf{0} where :math:`\mathbf{c}` denotes an arbitrary codeword and :math:`\mathbf{u}` the corresponding information bits. This leads to .. math:: \mathbf{G} * \mathbf{H} ^t =: \mathbf{0}. \tag{2} It can be seen that (1) fulfills (2), as it holds in GF(2) that .. math:: [\mathbf{I} | \mathbf{M}] * [\mathbf{M} ^t | \mathbf{I}]^t = \mathbf{M} + \mathbf{M} = \mathbf{0}. Input ----- gm : ndarray Binary generator matrix of shape `[k, n]`. verify_results: bool Defaults to True. If True, it is verified that the generated parity-check matrix is orthogonal to the generator matrix in GF(2). Output ------ : ndarray Binary parity-check matrix of shape `[n-k, n]`. Note ---- This algorithm only works if ``gm`` has full rank. Otherwise an error is raised. """ k = gm.shape[0] n = gm.shape[1] assert k<n, "Invalid matrix dimensions." # bring gm in systematic form gm_sys, c_swaps = make_systematic(gm, is_pcm=False) m_mat = np.transpose(np.copy(gm_sys[:,-(n-k):])) i_mat = np.eye(n-k) pcm = np.concatenate((m_mat, i_mat), axis=1) # undo column swaps for l in c_swaps[::-1]: # reverse ordering when going through list pcm[:,[l[0], l[1]]] = pcm[:,[l[1], l[0]]] # swap columns if verify_results: assert verify_gm_pcm(gm=gm, pcm=pcm), \ "Resulting parity-check matrix does not match to generator matrix." return pcm
[docs]def pcm2gm(pcm, verify_results=True): r"""Generate the generator matrix for a given parity-check matrix. This function brings ``pcm`` :math:`\mathbf{H}` in its systematic form and uses the following relation to find the generator matrix :math:`\mathbf{G}` in GF(2) .. math:: \mathbf{G} = [\mathbf{I} | \mathbf{M}] \Leftrightarrow \mathbf{H} = [\mathbf{M} ^t | \mathbf{I}]. \tag{1} This follows from the fact that for an all-zero syndrome, it must hold that .. math:: \mathbf{H} \mathbf{c}^t = \mathbf{H} * (\mathbf{u} * \mathbf{G})^t = \mathbf{H} * \mathbf{G} ^t * \mathbf{u}^t =: \mathbf{0} where :math:`\mathbf{c}` denotes an arbitrary codeword and :math:`\mathbf{u}` the corresponding information bits. This leads to .. math:: \mathbf{G} * \mathbf{H} ^t =: \mathbf{0}. \tag{2} It can be seen that (1) fulfills (2) as in GF(2) it holds that .. math:: [\mathbf{I} | \mathbf{M}] * [\mathbf{M} ^t | \mathbf{I}]^t = \mathbf{M} + \mathbf{M} = \mathbf{0}. Input ----- pcm : ndarray Binary parity-check matrix of shape `[n-k, n]`. verify_results: bool Defaults to True. If True, it is verified that the generated generator matrix is orthogonal to the parity-check matrix in GF(2). Output ------ : ndarray Binary generator matrix of shape `[k, n]`. Note ---- This algorithm only works if ``pcm`` has full rank. Otherwise an error is raised. """ n = pcm.shape[1] k = n - pcm.shape[0] assert k<n, "Invalid matrix dimensions." # bring pcm in systematic form pcm_sys, c_swaps = make_systematic(pcm, is_pcm=True) m_mat = np.transpose(np.copy(pcm_sys[:,:k])) i_mat = np.eye(k) gm = np.concatenate((i_mat, m_mat), axis=1) # undo column swaps for l in c_swaps[::-1]: # reverse ordering when going through list gm[:,[l[0], l[1]]] = gm[:,[l[1], l[0]]] # swap columns if verify_results: assert verify_gm_pcm(gm=gm, pcm=pcm), \ "Resulting parity-check matrix does not match to generator matrix." return gm
[docs]def verify_gm_pcm(gm, pcm): r"""Verify that generator matrix :math:`\mathbf{G}` ``gm`` and parity-check matrix :math:`\mathbf{H}` ``pcm`` are orthogonal in GF(2). For an all-zero syndrome, it must hold that .. math:: \mathbf{H} \mathbf{c}^t = \mathbf{H} * (\mathbf{u} * \mathbf{G})^t = \mathbf{H} * \mathbf{G} ^t * \mathbf{u}^t =: \mathbf{0} where :math:`\mathbf{c}` denotes an arbitrary codeword and :math:`\mathbf{u}` the corresponding information bits. As :math:`\mathbf{u}` can be arbitrary it follows that .. math:: \mathbf{H} * \mathbf{G} ^t =: \mathbf{0}. Input ----- gm : ndarray Binary generator matrix of shape `[k, n]`. pcm : ndarray Binary parity-check matrix of shape `[n-k, n]`. Output ------ : bool True if ``gm`` and ``pcm`` define a valid pair of parity-check and generator matrices in GF(2). """ # check for valid dimensions k = gm.shape[0] n = gm.shape[1] n_pcm = pcm.shape[1] k_pcm = n_pcm - pcm.shape[0] assert k==k_pcm, "Inconsistent shape of gm and pcm." assert n==n_pcm, "Inconsistent shape of gm and pcm." # check that both matrices are binary assert ((gm==0) | (gm==1)).all(), "gm is not binary." assert ((pcm==0) | (pcm==1)).all(), "pcm is not binary." # check for zero syndrome s = np.mod(np.matmul(pcm, np.transpose(gm)), 2) # mod2 to account for GF(2) return np.sum(s)==0 # Check for Non-zero syndrom of H*G'
[docs]def generate_reg_ldpc(v, c, n, allow_flex_len=True, verbose=True): r"""Generate random regular (v,c) LDPC codes. This functions generates a random LDPC parity-check matrix of length ``n`` where each variable node (VN) has degree ``v`` and each check node (CN) has degree ``c``. Please note that the LDPC code is not optimized to avoid short cycles and the resulting codes may show a non-negligible error-floor. For encoding, the :class:`~sionna.fec.utils.LinearEncoder` layer can be used, however, the construction does not guarantee that the pcm has full rank. Input ----- v : int Desired variable node (VN) degree. c : int Desired check node (CN) degree. n : int Desired codeword length. allow_flex_len: bool Defaults to True. If True, the resulting codeword length can be (slightly) increased. verbose : bool Defaults to True. If True, code parameters are printed. Output ------ (pcm, k, n, coderate): Tuple: pcm: ndarray NumPy array of shape `[n-k, n]` containing the parity-check matrix. k: int Number of information bits per codeword. n: int Number of codewords bits. coderate: float Coderate of the code. Note ---- This algorithm works only for regular node degrees. For state-of-the-art bit-error-rate performance, usually one needs to optimize irregular degree profiles (see [tenBrink]_). """ # check input values for consistency assert isinstance(allow_flex_len, bool), \ 'allow_flex_len must be bool.' # allow slight change in n to keep num edges # from CN and VN perspective an integer if allow_flex_len: for n_mod in range(n, n+2*c): if np.mod((v/c) * n_mod, 1.)==0: n = n_mod if verbose: print("Setting n to: ", n) break # calculate number of nodes coderate = 1 - (v/c) n_v = n n_c = int((v/c) * n) k = n_v - n_c # generate sockets v_socks = np.tile(np.arange(n_v),v) c_socks = np.tile(np.arange(n_c),c) if verbose: print("Number of edges (VN perspective): ", len(v_socks)) print("Number of edges (CN perspective): ", len(c_socks)) assert len(v_socks) == len(c_socks), "Number of edges from VN and CN " \ "perspective does not match. Consider to (slightly) change n." # apply random permutations np.random.shuffle(v_socks) np.random.shuffle(c_socks) # and generate matrix pcm = np.zeros([n_c, n_v]) idx = 0 shuffle_max = 200 # stop if no success shuffle_counter = 0 cont = True while cont: # if edge is available, take it if pcm[c_socks[idx],v_socks[idx]]==0: pcm[c_socks[idx],v_socks[idx]] = 1 idx +=1 # and go to next socket shuffle_counter = 0 # reset counter if idx==len(v_socks): cont = False else: # shuffle sockets shuffle_counter+=1 if shuffle_counter<shuffle_max: np.random.shuffle(v_socks[idx:]) np.random.shuffle(c_socks[idx:]) else: print("Stopping - no solution found!") cont=False v_deg = np.sum(pcm, axis=0) c_deg = np.sum(pcm, axis=1) assert((v_deg==v).all()), "VN degree not always v." assert((c_deg==c).all()), "CN degree not always c." if verbose: print(f"Generated regular ({v},{c}) LDPC code of length n={n}") print(f"Code rate is r={coderate:.3f}.") plt.spy(pcm) return pcm, k, n, coderate
[docs]def int_mod_2(x): r"""Efficient implementation of modulo 2 operation for integer inputs. This function assumes integer inputs or implicitly casts to int. Remark: the function `tf.math.mod(x, 2)` is placed on the CPU and, thus, causes unnecessary memory copies. Parameters ---------- x: tf.Tensor Tensor to which the modulo 2 operation is applied. """ x_int32 = tf.cast(x, tf.int32) y_int32 = tf.bitwise.bitwise_and(x_int32, tf.constant(1, tf.int32)) return tf.cast(y_int32, x.dtype)
########################################################### # Deprecated aliases that will not be included in the next # major release ########################################################### # ignore invalid name as this is required for legacy reasons # pylint: disable=C0103 def LinearEncoder(enc_mat, is_pcm=False, dtype=tf.float32, **kwargs): # defer import as circular dependency is generated otherwise from sionna.fec.linear import LinearEncoder as LE # pylint: disable=C0415 print("Warning: The alias fec.utils.LinearEncoder will not be included in "\ "Sionna 1.0. Please use fec.linear.LinearEncoder instead.") return LE(enc_mat=enc_mat, is_pcm=is_pcm, dtype=dtype, **kwargs) # Scrambling has been refactored and c_init is now directly calculated during # the initialization of the Scrambler def generate_prng_seq(length, n_rnti=None, n_id=None, c_init=None): print("Warning: The alias sionna.fec.utils.generate_prng_seq will not be " \ "included in Sionna 1.0. Please use " \ "sionna.nr.utils.generate_prng_seq instead.") # n_rnti and n_id have been integrated in the TB5GScrambler assert (n_rnti is None), \ "The parameter n_rnti is deprecated and has been removed. Please " \ "refer to the TB5GScrambler for the Scrambler initialization." assert (n_id is None), \ "The parameter n_id is deprecated and has been removed. Please " \ "refer to the TB5GScrambler for the Scrambler initialization." return generate_prng_seq_utils(length, c_init=c_init)