Source code for sionna.phy.fec.ldpc.encoding

#
# SPDX-FileCopyrightText: Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0#
"""Blocks for LDPC channel encoding and utility functions."""

import tensorflow as tf
import numpy as np
import scipy as sp
from importlib_resources import files, as_file
from . import codes # pylint: disable=relative-beyond-top-level
import numbers # to check if n, k are numbers
from sionna.phy import Block

[docs] class LDPC5GEncoder(Block): # pylint: disable=line-too-long """5G NR LDPC Encoder following the 3GPP 38.212 including rate-matching. The implementation follows the 3GPP NR Initiative [3GPPTS38212_LDPC]_. Parameters ---------- k: int Defining the number of information bit per codeword. n: int Defining the desired codeword length. num_bits_per_symbol: `None` (default) | int Defining the number of bits per QAM symbol. If this parameter is explicitly provided, the codeword will be interleaved after rate-matching as specified in Sec. 5.4.2.2 in [3GPPTS38212_LDPC]_. bg: `None` (default) | "bg1" | "bg2" Basegraph to be used for the code construction. If `None` is provided, the encoder will automatically select the basegraph according to [3GPPTS38212_LDPC]_. precision : `None` (default) | 'single' | 'double' Precision used for internal calculations and outputs. If set to `None`, :py:attr:`~sionna.phy.config.precision` is used. Input ----- bits: [...,k], tf.float Binary tensor containing the information bits to be encoded. Output ------ : [...,n], tf.float Binary tensor of same shape as inputs besides last dimension has changed to `n` containing the encoded codeword bits. Note ---- As specified in [3GPPTS38212_LDPC]_, the encoder also performs rate-matching (puncturing and shortening). Thus, the corresponding decoder needs to `invert` these operations, i.e., must be compatible with the 5G encoding scheme. """ def __init__(self, k, n, num_bits_per_symbol=None, bg=None, precision=None, **kwargs): super().__init__(precision=precision, **kwargs) if not isinstance(k, numbers.Number): raise TypeError("k must be a number.") if not isinstance(n, numbers.Number): raise TypeError("n must be a number.") k = int(k) # k or n can be float (e.g. as result of n=k*r) n = int(n) # k or n can be float (e.g. as result of n=k*r) if k>8448: raise ValueError("Unsupported code length (k too large).") if k<12: raise ValueError("Unsupported code length (k too small).") if n>(316*384): raise ValueError("Unsupported code length (n too large).") if n<0: raise ValueError("Unsupported code length (n negative).") # init encoder parameters self._k = k # number of input bits (= input shape) self._n = n # the desired length (= output shape) self._coderate = k / n self._check_input = True # check input for consistency (i.e., binary) # allow actual code rates slightly larger than 948/1024 # to account for the quantization procedure in 38.214 5.1.3.1 if self._coderate>(948/1024): # as specified in 38.212 5.4.2.1 print(f"Warning: effective coderate r>948/1024 for n={n}, k={k}.") if self._coderate>(0.95): # as specified in 38.212 5.4.2.1 raise ValueError(f"Unsupported coderate (r>0.95) for n={n}, k={k}.") if self._coderate<(1/5): # outer rep. coding currently not supported raise ValueError("Unsupported coderate (r<1/5).") # construct the basegraph according to 38.212 # if bg is explicitly provided self._bg = self._sel_basegraph(self._k, self._coderate, bg) self._z, self._i_ls, self._k_b = self._sel_lifting(self._k, self._bg) self._bm = self._load_basegraph(self._i_ls, self._bg) # total number of codeword bits self._n_ldpc = self._bm.shape[1] * self._z # if K_real < K _target puncturing must be applied earlier self._k_ldpc = self._k_b * self._z # construct explicit graph via lifting pcm = self._lift_basegraph(self._bm, self._z) pcm_a, pcm_b_inv, pcm_c1, pcm_c2 = self._gen_submat(self._bm, self._k_b, self._z, self._bg) # init sub-matrices for fast encoding ("RU"-method) # note: dtype is tf.float32; self._pcm = pcm # store the sparse parity-check matrix (for decoding) # store indices for fast gathering (instead of explicit matmul) self._pcm_a_ind = self._mat_to_ind(pcm_a) self._pcm_b_inv_ind = self._mat_to_ind(pcm_b_inv) self._pcm_c1_ind = self._mat_to_ind(pcm_c1) self._pcm_c2_ind = self._mat_to_ind(pcm_c2) self._num_bits_per_symbol = num_bits_per_symbol if num_bits_per_symbol is not None: self._out_int, self._out_int_inv = self.generate_out_int(self._n, self._num_bits_per_symbol) ############################### # Public methods and properties ############################### @property def k(self): """Number of input information bits""" return self._k @property def n(self): "Number of output codeword bits" return self._n @property def coderate(self): """Coderate of the LDPC code after rate-matching""" return self._coderate @property def k_ldpc(self): """Number of LDPC information bits after rate-matching""" return self._k_ldpc @property def n_ldpc(self): """Number of LDPC codeword bits before rate-matching""" return self._n_ldpc @property def pcm(self): """Parity-check matrix for given code parameters""" return self._pcm @property def z(self): """Lifting factor of the basegraph""" return self._z @property def num_bits_per_symbol(self): """Modulation order used for the rate-matching output interleaver""" return self._num_bits_per_symbol @property def out_int(self): """Output interleaver sequence as defined in 5.4.2.2""" return self._out_int @property def out_int_inv(self): """Inverse output interleaver sequence as defined in 5.4.2.2""" return self._out_int_inv ################# # Utility methods #################
[docs] def generate_out_int(self, n, num_bits_per_symbol): """Generates LDPC output interleaver sequence as defined in Sec 5.4.2.2 in [3GPPTS38212_LDPC]_. Parameters ---------- n: int Desired output sequence length. num_bits_per_symbol: int Number of symbols per QAM symbol, i.e., the modulation order. Output ------ perm_seq: ndarray of length n Containing the permuted indices. perm_seq_inv: ndarray of length n Containing the inverse permuted indices. Note ---- The interleaver pattern depends on the modulation order and helps to reduce dependencies in bit-interleaved coded modulation (BICM) schemes combined with higher order modulation. """ # allow float inputs, but verify that they represent integer if n%1!=0: raise ValueError("n must be int.") if num_bits_per_symbol%1!=0: raise ValueError("num_bits_per_symbol must be int.") n = int(n) if n<=0: raise ValueError("n must be a positive integer.") if num_bits_per_symbol<=0: raise ValueError("num_bits_per_symbol must be a positive integer.") num_bits_per_symbol = int(num_bits_per_symbol) if n%num_bits_per_symbol!=0: raise ValueError("n must be a multiple of num_bits_per_symbol.") # pattern as defined in Sec 5.4.2.2 perm_seq = np.zeros(n, dtype=int) for j in range(int(n/num_bits_per_symbol)): for i in range(num_bits_per_symbol): perm_seq[i + j*num_bits_per_symbol] \ = int(i * int(n/num_bits_per_symbol) + j) perm_seq_inv = np.argsort(perm_seq) return perm_seq, perm_seq_inv
def _sel_basegraph(self, k, r, bg_=None): """Select basegraph according to [3GPPTS38212_LDPC]_ and check for consistency.""" # if bg is explicitly provided, we only check for consistency if bg_ is None: if k <= 292: bg = "bg2" elif k <= 3824 and r <= 0.67: bg = "bg2" elif r <= 0.25: bg = "bg2" else: bg = "bg1" elif bg_ in ("bg1", "bg2"): bg = bg_ else: raise ValueError("Basegraph must be bg1, bg2 or None.") # check for consistency if bg=="bg1" and k>8448: raise ValueError("K is not supported by BG1 (too large).") if bg=="bg2" and k>3840: raise ValueError( f"K is not supported by BG2 (too large) k ={k}.") if bg=="bg1" and r<1/3: raise ValueError("Only coderate>1/3 supported for BG1. \ Remark: Repetition coding is currently not supported.") if bg=="bg2" and r<1/5: raise ValueError("Only coderate>1/5 supported for BG2. \ Remark: Repetition coding is currently not supported.") return bg def _load_basegraph(self, i_ls, bg): """Helper to load basegraph from csv files. ``i_ls`` is sub_index of the basegraph and fixed during lifting selection. """ if i_ls > 7: raise ValueError("i_ls too large.") if i_ls < 0: raise ValueError("i_ls cannot be negative.") # csv files are taken from 38.212 and dimension is explicitly given if bg=="bg1": bm = np.zeros([46, 68]) - 1 # init matrix with -1 (None positions) elif bg=="bg2": bm = np.zeros([42, 52]) - 1 # init matrix with -1 (None positions) else: raise ValueError("Basegraph not supported.") # and load the basegraph from csv format in folder "codes" source = files(codes).joinpath(f"5G_{bg}.csv") with as_file(source) as codes.csv: bg_csv = np.genfromtxt(codes.csv, delimiter=";") # reconstruct BG for given i_ls r_ind = 0 for r in np.arange(2, bg_csv.shape[0]): # check for next row index if not np.isnan(bg_csv[r, 0]): r_ind = int(bg_csv[r, 0]) c_ind = int(bg_csv[r, 1]) # second column in csv is column index value = bg_csv[r, i_ls + 2] # i_ls entries start at offset 2 bm[r_ind, c_ind] = value return bm def _lift_basegraph(self, bm, z): """Lift basegraph with lifting factor ``z`` and shifted identities as defined by the entries of ``bm``.""" num_nonzero = np.sum(bm>=0) # num of non-neg elements in bm # init all non-zero row/column indices r_idx = np.zeros(z*num_nonzero) c_idx = np.zeros(z*num_nonzero) data = np.ones(z*num_nonzero) # row/column indices of identity matrix for lifting im = np.arange(z) idx = 0 for r in range(bm.shape[0]): for c in range(bm.shape[1]): if bm[r,c]==-1: # -1 is used as all-zero matrix placeholder pass #do nothing (sparse) else: # roll matrix by bm[r,c] c_roll = np.mod(im+bm[r,c], z) # append rolled identity matrix to pcm r_idx[idx*z:(idx+1)*z] = r*z + im c_idx[idx*z:(idx+1)*z] = c*z + c_roll idx += 1 # generate lifted sparse matrix from indices pcm = sp.sparse.csr_matrix((data,(r_idx, c_idx)), shape=(z*bm.shape[0], z*bm.shape[1])) return pcm def _sel_lifting(self, k, bg): """Select lifting as defined in Sec. 5.2.2 in [3GPPTS38212_LDPC]_. We assume B < K_cb, thus B'= B and C = 1, i.e., no additional CRC is appended. Thus, K' = B'/C = B and B is our K. Z is the lifting factor. i_ls is the set index ranging from 0...7 (specifying the exact bg selection). k_b is the number of information bit columns in the basegraph. """ # lifting set according to 38.212 Tab 5.3.2-1 s_val = [[2, 4, 8, 16, 32, 64, 128, 256], [3, 6, 12, 24, 48, 96, 192, 384], [5, 10, 20, 40, 80, 160, 320], [7, 14, 28, 56, 112, 224], [9, 18, 36, 72, 144, 288], [11, 22, 44, 88, 176, 352], [13, 26, 52, 104, 208], [15, 30, 60, 120, 240]] if bg == "bg1": k_b = 22 else: if k > 640: k_b = 10 elif k > 560: k_b = 9 elif k > 192: k_b = 8 else: k_b = 6 # find the min of Z from Tab. 5.3.2-1 s.t. k_b*Z>=K' min_val = 100000 z = 0 i_ls = 0 i = -1 for s in s_val: i += 1 for s1 in s: x = k_b *s1 if x >= k: # valid solution if x < min_val: min_val = x z = s1 i_ls = i # and set K=22*Z for bg1 and K=10Z for bg2 if bg == "bg1": k_b = 22 else: k_b = 10 return z, i_ls, k_b def _gen_submat(self, bm, k_b, z, bg): """Split the basegraph into multiple sub-matrices such that efficient encoding is possible. """ g = 4 # code property (always fixed for 5G) mb = bm.shape[0] # number of CN rows in basegraph (BG property) bm_a = bm[0:g, 0:k_b] bm_b = bm[0:g, k_b:(k_b+g)] bm_c1 = bm[g:mb, 0:k_b] bm_c2 = bm[g:mb, k_b:(k_b+g)] # H could be sliced immediately (but easier to implement if based on B) hm_a = self._lift_basegraph(bm_a, z) # not required for encoding, but helpful for debugging # hm_b = self._lift_basegraph(bm_b, z) hm_c1 = self._lift_basegraph(bm_c1, z) hm_c2 = self._lift_basegraph(bm_c2, z) hm_b_inv = self._find_hm_b_inv(bm_b, z, bg) return hm_a, hm_b_inv, hm_c1, hm_c2 def _find_hm_b_inv(self, bm_b, z, bg): """ For encoding we need to find the inverse of `hm_b` such that `hm_b^-1 * hm_b = I`. Could be done sparse For BG1 the structure of hm_b is given as (for all values of i_ls) hm_b = [P_A I 0 0 P_B I I 0 0 0 I I P_A 0 0 I] where P_B and P_A are shifted identities. The inverse can be found by solving a linear system of equations hm_b_inv = [P_B^-1, P_B^-1, P_B^-1, P_B^-1, I + P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, I+P_A*P_B^-1, I+P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, I+P_A*P_B^-1]. For bg2 the structure of hm_b is given as (for all values of i_ls) hm_b = [P_A I 0 0 0 I I 0 P_B 0 I I P_A 0 0 I] where P_B and P_A are shifted identities The inverse can be found by solving a linear system of equations hm_b_inv = [P_B^-1, P_B^-1, P_B^-1, P_B^-1, I + P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, I+P_A*P_B^-1, I+P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, P_A*P_B^-1, I+P_A*P_B^-1] Note: the inverse of B is simply a shifted identity matrix with negative shift direction. """ # permutation indices pm_a= int(bm_b[0,0]) if bg=="bg1": pm_b_inv = int(-bm_b[1, 0]) else: # structure of B is slightly different for bg2 pm_b_inv = int(-bm_b[2, 0]) hm_b_inv = np.zeros([4*z, 4*z]) im = np.eye(z) am = np.roll(im, pm_a, axis=1) b_inv = np.roll(im, pm_b_inv, axis=1) ab_inv = np.matmul(am, b_inv) # row 0 hm_b_inv[0:z, 0:z] = b_inv hm_b_inv[0:z, z:2*z] = b_inv hm_b_inv[0:z, 2*z:3*z] = b_inv hm_b_inv[0:z, 3*z:4*z] = b_inv # row 1 hm_b_inv[z:2*z, 0:z] = im + ab_inv hm_b_inv[z:2*z, z:2*z] = ab_inv hm_b_inv[z:2*z, 2*z:3*z] = ab_inv hm_b_inv[z:2*z, 3*z:4*z] = ab_inv # row 2 if bg=="bg1": hm_b_inv[2*z:3*z, 0:z] = ab_inv hm_b_inv[2*z:3*z, z:2*z] = ab_inv hm_b_inv[2*z:3*z, 2*z:3*z] = im + ab_inv hm_b_inv[2*z:3*z, 3*z:4*z] = im + ab_inv else: # for bg2 the structure is slightly different hm_b_inv[2*z:3*z, 0:z] = im + ab_inv hm_b_inv[2*z:3*z, z:2*z] = im + ab_inv hm_b_inv[2*z:3*z, 2*z:3*z] = ab_inv hm_b_inv[2*z:3*z, 3*z:4*z] = ab_inv # row 3 hm_b_inv[3*z:4*z, 0:z] = ab_inv hm_b_inv[3*z:4*z, z:2*z] = ab_inv hm_b_inv[3*z:4*z, 2*z:3*z] = ab_inv hm_b_inv[3*z:4*z, 3*z:4*z] = im + ab_inv # return results as sparse matrix return sp.sparse.csr_matrix(hm_b_inv) def _mat_to_ind(self, mat): """Helper to transform matrix into index representation for tf.gather. An index pointing to the `last_ind+1` is used for non-existing edges due to irregular degrees.""" m = mat.shape[0] n = mat.shape[1] # transpose mat for sorted column format c_idx, r_idx, _ = sp.sparse.find(mat.transpose()) # sort indices explicitly, as scipy.sparse.find changed from column to # row sorting in scipy>=1.11 idx = np.argsort(r_idx) c_idx = c_idx[idx] r_idx = r_idx[idx] # find max number of no-zero entries n_max = np.max(mat.getnnz(axis=1)) # init index array with n (pointer to last_ind+1, will be a default # value) gat_idx = np.zeros([m, n_max]) + n r_val = -1 c_val = 0 for idx in range(len(c_idx)): # check if same row or if a new row starts if r_idx[idx] != r_val: r_val = r_idx[idx] c_val = 0 gat_idx[r_val, c_val] = c_idx[idx] c_val += 1 gat_idx = tf.cast(tf.constant(gat_idx), tf.int32) return gat_idx def _matmul_gather(self, mat, vec): """Implements a fast sparse matmul via gather function.""" # add 0 entry for gather-reduce_sum operation # (otherwise ragged Tensors are required) bs = tf.shape(vec)[0] vec = tf.concat([vec, tf.zeros([bs, 1], dtype=self.rdtype)], 1) retval = tf.gather(vec, mat, batch_dims=0, axis=1) retval = tf.reduce_sum(retval, axis=-1) return retval def _encode_fast(self, s): """Main encoding function based on gathering function.""" p_a = self._matmul_gather(self._pcm_a_ind, s) p_a = self._matmul_gather(self._pcm_b_inv_ind, p_a) # calc second part of parity bits p_b # second parities are given by C_1*s' + C_2*p_a' + p_b' = 0 p_b_1 = self._matmul_gather(self._pcm_c1_ind, s) p_b_2 = self._matmul_gather(self._pcm_c2_ind, p_a) p_b = p_b_1 + p_b_2 c = tf.concat([s, p_a, p_b], 1) # faster implementation of mod-2 operation c = tf.math.mod(c, 2) c_uint8 = tf.cast(c, tf.uint8) c_bin = tf.bitwise.bitwise_and(c_uint8, tf.constant(1, tf.uint8)) c = tf.cast(c_bin, self.rdtype) c = tf.expand_dims(c, axis=-1) # returns nx1 vector return c def build(self, input_shape): """"Build block.""" # check if k and input shape match if input_shape[-1]!=self._k: raise ValueError("Last dimension must be of length k.") def call(self, bits): """5G LDPC encoding function including rate-matching. This function returns the encoded codewords as specified by the 3GPP NR Initiative [3GPPTS38212_LDPC]_ including puncturing and shortening. Args: bits (tf.float): Tensor of shape `[...,k]` containing the information bits to be encoded. Returns: `tf.float`: Tensor of shape `[...,n]`. """ # Reshape inputs to [...,k] input_shape = bits.get_shape().as_list() new_shape = [-1, input_shape[-1]] u = tf.reshape(bits, new_shape) # assert if bits are non-binary if self._check_input: tf.debugging.assert_equal( tf.reduce_min( tf.cast( tf.logical_or( tf.equal(u, tf.constant(0, self.rdtype)), tf.equal(u, tf.constant(1, self.rdtype)), ), self.rdtype)), tf.constant(1, self.rdtype), "Input must be binary.") # input datatype consistency should be only evaluated once self._check_input = False batch_size = tf.shape(u)[0] # add "filler" bits to last positions to match info bit length k_ldpc u_fill = tf.concat([u, tf.zeros([batch_size, self._k_ldpc-self._k], self.rdtype)],axis=1) # use optimized encoding based on tf.gather c = self._encode_fast(u_fill) c = tf.reshape(c, [batch_size, self._n_ldpc]) # remove last dim # remove filler bits at pos (k, k_ldpc) c_no_filler1 = tf.slice(c, [0, 0], [batch_size, self._k]) c_no_filler2 = tf.slice(c, [0, self._k_ldpc], [batch_size, self._n_ldpc - self._k_ldpc]) c_no_filler = tf.concat([c_no_filler1, c_no_filler2], 1) # shorten the first 2*Z positions and end after n bits # (remaining parity bits can be used for HARQ) c_short = tf.slice(c_no_filler, [0, 2*self._z], [batch_size, self.n]) # incremental redundancy could be generated by accessing the last bits # if num_bits_per_symbol is provided, apply output interleaver as # specified in Sec. 5.4.2.2 in 38.212 if self._num_bits_per_symbol is not None: c_short = tf.gather(c_short, self._out_int, axis=-1) # Reshape c_short so that it matches the original input dimensions output_shape = input_shape[0:-1] + [self.n] output_shape[0] = -1 c_reshaped = tf.reshape(c_short, output_shape) return c_reshaped