Source code for digcommpy.decoders

from abc import ABC, abstractmethod
import inspect
import pickle

import numpy as np
from joblib import Parallel, delayed, cpu_count
from sklearn import svm
from hpelm import ELM
from keras import layers
from keras.models import Sequential
from keras.utils.np_utils import to_categorical

from .channels import Channel
from .messages import unpack_to_bits, pack_to_dec, generate_data
from .encoders import PolarEncoder, Encoder, PolarWiretapEncoder
from .modulators import Modulator


def _logdomain_sum(x, y):
    if x < y:
        z = y + np.log1p(np.exp(x-y))
    else:
        z = x + np.log1p(np.exp(y-x))
    return z

def _logdomain_sum_multiple(x, y):
    _logpart = np.log1p(np.exp(-np.abs(x-y)))
    z = np.maximum(x, y) + _logpart
    return z


[docs]class Decoder(ABC): """Abstract decoder class.""" def __init__(self, code_length, info_length, base=2, parallel=True): self.code_length = code_length self.info_length = info_length self.base = base self.parallel = parallel
[docs] @abstractmethod def decode_messages(self, messages, channel=None): pass
[docs]class IdentityDecoder(Decoder): """Identity decoder. Simply returns the input."""
[docs] @staticmethod def decode_messages(messages, channel=None): return messages
[docs]class RepetitionDecoder(Decoder): def __init__(self, *args, **kwargs): pass
[docs] @staticmethod def decode_messages(messages, channel=None): decoded = np.zeros((len(messages), 1)) for idx, message in enumerate(messages): val, counts = np.unique(message, return_counts=True) _decision = np.argmax(counts) decoded[idx] = val[_decision] return decoded
[docs]class LinearDecoder(Decoder): """Linear block decoder. Parameters ---------- TODO """
[docs] def decode_messages(self, messages, channel=None): raise NotImplementedError()
[docs]class PolarDecoder(Decoder): """Polar code decoder. Taken from **polarcodes.com** The decoder for BAWGN channels expects a channel output of noisy codewords which are modulated to +1 and -1. Parameters ---------- code_length : int Length of the code. info_length : int Length of the messages. design_channel : str or Channel Name of the used channel. Valid choices are currently "BAWGN" and "BSC". design_channelstate : float, optional State of the design channel. For "BAWGN" channels, this corresponds to the SNR value in dB. For "BSC" channels, this corresponds to the bit-flip probability. pos_lookup : array, optional Position lookup of the polar code, where -1 indicates message bits, while 0 and 1 denote the frozenbits. frozenbits : array, optional Bits used for the frozen bit positions. This is ignored, if `pos_lookup` is provided. parallel : bool, optional If True, parallel processing is used. This might not be available on all machines and causes higher use of system resources. """ def __init__(self, code_length, info_length, design_channel, design_channelstate=0., pos_lookup=None, frozenbits=None, parallel=True, **kwargs): if isinstance(design_channel, Channel): channel_name = design_channel.name design_channelstate = design_channel.get_channelstate() else: channel_name = design_channel self.design_channel = channel_name self.design_channelstate = design_channelstate if pos_lookup is None: self.pos_lookup = PolarEncoder.construct_polar_code( code_length, info_length, design_channel, design_channelstate, frozenbits) else: self.pos_lookup = np.array(pos_lookup) self.rev_index = self._reverse_index(code_length) self.idx_first_one = self._index_first_num_from_msb(code_length, 1) self.idx_first_zero = self._index_first_num_from_msb(code_length, 0) super().__init__(code_length, info_length, parallel=parallel) @staticmethod def _reverse_index(code_length): _n = int(np.ceil(np.log2(code_length))) rev_idx = [pack_to_dec(np.flip(unpack_to_bits([idx], _n), axis=1))[0][0] for idx in range(code_length)] return rev_idx @staticmethod def _index_first_num_from_msb(code_length, number): _n = int(np.ceil(np.log2(code_length))) idx_list = np.zeros(code_length) for idx in range(code_length): idx_bin = unpack_to_bits([idx], _n)[0] try: last_level = np.where(idx_bin == number)[0][0] except IndexError: last_level = _n-1 idx_list[idx] = last_level return idx_list
[docs] def decode_messages(self, messages, channel=None): """Decode polar encoded messages. Parameters ---------- messages : array Array of received (noisy) codewords which were created by polar encoding messages. Each row represents one received word. channel : float or Channel, optional This can either be a channel state, e.g., SNR in an AWGN channel, of the channel model used for constructing the decoder or a `channels.Channel` object. If None, the design parameters are used. Returns ------- decoded_messages : array Array containing the estimated messages after decoding the channel output. """ #decoded = np.zeros((len(messages), self.info_length)) decoded = np.zeros((len(messages), self.code_length)) channel_name = self.design_channel if channel is None: channel_state = self.design_channelstate elif isinstance(channel, Channel): channel_name = channel.name if channel_name != self.design_channel: Warning("The channel you passed for decoding ('{}') is different " "to the one you used for constructing the decoder ('{}')!" .format(channel_name, self.design_channel)) channel_state = channel.get_channelstate() else: channel_state = channel if channel_name == "BAWGN": snr = 10**(channel_state/10.) initial_llr = -2*np.sqrt(2*(self.info_length/self.code_length)*snr)*messages #if self.parallel: # num_cores = cpu_count() # decoded = Parallel(n_jobs=num_cores)( # delayed(self._polar_llr_decode)(k) for k in initial_llr) # decoded = np.array(decoded) #else: # for idx, _llr_codeword in enumerate(initial_llr): # decoded[idx] = self._polar_llr_decode(_llr_codeword) decoded = self._polar_llr_decode_multiple(initial_llr) elif channel_name == "BSC": llr = np.log(channel_state) - np.log(1-channel_state) initial_llr = (2*messages - 1) * llr if self.parallel: num_cores = cpu_count() decoded = Parallel(n_jobs=num_cores)( delayed(self._polar_llr_decode)(k) for k in initial_llr) decoded = np.array(decoded) else: for idx, _llr_codeword in enumerate(initial_llr): decoded[idx] = self._polar_llr_decode(_llr_codeword) decoded = self._get_info_bit_positions(decoded) return decoded
def _get_info_bit_positions(self, decoded): return decoded[:, self.pos_lookup == -1] def _polar_llr_decode(self, initial_llr): llr = np.zeros(2*self.code_length-1) llr[self.code_length-1:] = initial_llr bit_branch = np.zeros((2, self.code_length-1)) decoded = np.zeros(self.code_length) for j in range(self.code_length): rev_idx = self.rev_index[j] llr = self._update_llr(llr, bit_branch, rev_idx) if self.pos_lookup[rev_idx] <= -1: if llr[0] > 0: decoded[rev_idx] = 0 else: decoded[rev_idx] = 1 else: decoded[rev_idx] = self.pos_lookup[rev_idx] bit_branch = self._update_bit_branch(decoded[rev_idx], rev_idx, bit_branch) #return decoded[self.pos_lookup == -1] return decoded def _update_llr(self, llr, bit_branch, rev_idx): _n = int(np.ceil(np.log2(self.code_length))) if rev_idx == 0: next_level = _n else: last_level = int(self.idx_first_one[rev_idx]+1) st = int(2**(last_level-1)) ed = int(2**(last_level)-1) for idx in range(st-1, ed): llr[idx] = self._lowerconv( bit_branch[0, idx], llr[ed+2*(idx+1-st)], llr[ed+2*(idx+1-st)+1]) next_level = last_level - 1 for level in np.arange(next_level, 0, -1): st = int(2**(level-1)) ed = int(2**(level) - 1) for idx in range(st-1, ed): llr[idx] = self._upperconv(llr[ed+2*(idx+1-st)], llr[ed+2*(idx+1-st)+1]) return llr @staticmethod def _lowerconv(upper_decision, upper_llr, lower_llr): if upper_decision == 0: llr = lower_llr + upper_llr else: llr = lower_llr - upper_llr return llr @staticmethod def _upperconv(llr1, llr2): llr = _logdomain_sum(llr1+llr2, 0) - _logdomain_sum(llr1, llr2) return llr def _update_bit_branch(self, bit, rev_idx, bit_branch): _n = int(np.ceil(np.log2(self.code_length))) if rev_idx == self.code_length-1: return elif rev_idx < self.code_length/2: bit_branch[0, 0] = bit else: last_level = int(self.idx_first_zero[rev_idx]+1) bit_branch[1, 0] = bit for level in range(1, last_level-2+1): st = int(2**(level-1)) ed = int(2**(level)-1) for idx in range(st-1, ed): bit_branch[1, ed+2*(idx+1-st)] = np.mod(bit_branch[0, idx]+bit_branch[1, idx], 2) bit_branch[1, ed+2*(idx+1-st)+1] = bit_branch[1, idx] level = last_level-1 st = int(2**(level-1)) ed = int(2**(level)-1) for idx in range(st-1, ed): bit_branch[0, ed+2*(idx+1-st)] = np.mod(bit_branch[0, idx]+bit_branch[1, idx], 2) bit_branch[0, ed+2*(idx+1-st)+1] = bit_branch[1, idx] return bit_branch ##### def _polar_llr_decode_multiple(self, initial_llr): llr = np.zeros((len(initial_llr), 2*self.code_length-1)) llr[:, self.code_length-1:] = initial_llr bit_branch = np.zeros((len(initial_llr), 2, self.code_length-1)) decoded = np.zeros((len(initial_llr), self.code_length)) for j in range(self.code_length): rev_idx = self.rev_index[j] llr = self._update_llr_multiple(llr, bit_branch, rev_idx) if self.pos_lookup[rev_idx] <= -1: #decoded[:, rev_idx] = 0 _idx = np.where(llr[:, 0] <= 0)[0] decoded[_idx, rev_idx] = 1 else: decoded[:, rev_idx] = self.pos_lookup[rev_idx] bit_branch = self._update_bit_branch_multiple( decoded[:, rev_idx], rev_idx, bit_branch) #return decoded[self.pos_lookup == -1] return decoded def _update_llr_multiple(self, llr, bit_branch, rev_idx): _n = int(np.ceil(np.log2(self.code_length))) if rev_idx == 0: next_level = _n else: last_level = int(self.idx_first_one[rev_idx]+1) st = int(2**(last_level-1)) ed = int(2**(last_level)-1) for idx in range(st-1, ed): llr[:, idx] = self._lowerconv_multiple( bit_branch[:, 0, idx], llr[:, ed+2*(idx+1-st)], llr[:, ed+2*(idx+1-st)+1]) next_level = last_level - 1 for level in np.arange(next_level, 0, -1): st = int(2**(level-1)) ed = int(2**(level) - 1) for idx in range(st-1, ed): llr[:, idx] = self._upperconv_multiple( llr[:, ed+2*(idx+1-st)], llr[:, ed+2*(idx+1-st)+1]) return llr def _update_bit_branch_multiple(self, bit, rev_idx, bit_branch): _n = int(np.ceil(np.log2(self.code_length))) if rev_idx == self.code_length-1: return elif rev_idx < self.code_length/2: bit_branch[:, 0, 0] = bit else: last_level = int(self.idx_first_zero[rev_idx]+1) bit_branch[:, 1, 0] = bit for level in range(1, last_level-2+1): st = int(2**(level-1)) ed = int(2**(level)-1) for idx in range(st-1, ed): bit_branch[:, 1, ed+2*(idx+1-st)] = np.mod(bit_branch[:, 0, idx]+bit_branch[:, 1, idx], 2) bit_branch[:, 1, ed+2*(idx+1-st)+1] = bit_branch[:, 1, idx] level = last_level-1 st = int(2**(level-1)) ed = int(2**(level)-1) for idx in range(st-1, ed): bit_branch[:, 0, ed+2*(idx+1-st)] = np.mod(bit_branch[:, 0, idx]+bit_branch[:, 1, idx], 2) bit_branch[:, 0, ed+2*(idx+1-st)+1] = bit_branch[:, 1, idx] return bit_branch @staticmethod def _lowerconv_multiple(upper_decision, upper_llr, lower_llr): llr = lower_llr - upper_llr idx = np.where(upper_decision == 0) llr[idx] = lower_llr[idx] + upper_llr[idx] return llr @staticmethod def _upperconv_multiple(llr1, llr2): llr = _logdomain_sum_multiple(llr1+llr2, 0) - _logdomain_sum_multiple(llr1, llr2) return llr
####
[docs]class PolarWiretapDecoder(PolarDecoder): """Decoder class for decoding polar wiretap codes. You can either provide both channels (to Bob and Eve) or provide the main channel to Bob and the position lookup of the already constructed code. Parameters ---------- code_length : int Length of the codewords. design_channel_bob : str Channel name of the main channel to Bob. Valid choices are the channel models which are supported by the PolarDecoder. design_channel_eve : str, optional Channel name of the side channel to Eve. Valid choices are the channel models which are supported by the PolarEncoder. design_channelstate_bob : float, optional Channelstate of the main channel. design_channelstate_eve : float, optional Channelstate of the side channel. pos_lookup : array, optional Position lookup of the constructed wiretap code. If this is provided, no additional code is constructed and the values of Eve's channel are ignored. """ def __init__(self, code_length, design_channel_bob, design_channel_eve=None, design_channelstate_bob=0, design_channelstate_eve=0., pos_lookup=None, frozenbits=None, parallel=True, info_length_bob=None, random_length=None, **kwargs): if pos_lookup is None: pos_lookup = PolarWiretapEncoder.construct_polar_wiretap_code( code_length, design_channel_bob, design_channel_eve, design_channelstate_bob, design_channelstate_eve, frozenbits, info_length_bob, random_length) info_length = np.count_nonzero(pos_lookup == -1) info_length_bob = np.count_nonzero(pos_lookup < 0) super().__init__(code_length, info_length, design_channel_bob, design_channelstate=design_channelstate_bob, pos_lookup=pos_lookup, frozenbits=frozenbits, parallel=parallel, **kwargs)
[docs]class MachineLearningDecoder(Decoder): """Decoder class using Machine Learning algorithms as decoder. Parameters ---------- code_length : int Length of the block code. info_length : int Number of information bits. training_data : list or tuple (arrays or [Encoder, Modulator]) Two arrays containing the codewords and corresponding messages (binary) or an Encoder class where the whole codebook is used as training data. """ def __init__(self, code_length, info_length, **kwargs): super().__init__(code_length, info_length, parallel=False)#, **kwargs) self.decoder = self._create_decoder(**kwargs) def _create_decoder(**kwargs): raise NotImplementedError("The general MachineLearning decoder is not " "implemented.") # def __init__(self, code_length, info_length, training_data, **kwargs): # if isinstance(training_data[0], Encoder): # training_info = generate_data(info_length) # training_info_bit = unpack_to_bits(training_info, info_length) # training_code = training_data[0].encode_messages(training_info_bit) # if isinstance(training_data[1], Modulator): # training_code = training_data[1].modulate_symbols(training_code) # else: # training_code, training_info_bit = training_data # training_info = pack_to_dec(training_info_bit) # # self.decoder = None # self._train_system(training_code, training_info, training_info_bit, # **kwargs) # super().__init__(code_length, info_length, **kwargs)
[docs] def train_system(self, training_data, **kwargs): """Train the ML system using training data. Parameters ---------- training_data : list List of objects to train the systems. Possible options are: `numpy arrays` in order: X, y or `Encoder`, `Modulator`. If the `Encoder` is used, all possible information words are generated and encoded. kwargs : keyword arguments All arguments that are accepted by the training method of the used algorithm. """ if isinstance(training_data[0], Encoder): training_info_bit, training_code = training_data[0].generate_codebook() training_info = pack_to_dec(training_info_bit) if isinstance(training_data[1], Modulator): training_code = training_data[1].modulate_symbols(training_code) else: training_code, training_info_bit = training_data training_info = pack_to_dec(training_info_bit) self.decoder = self._train_system(training_code, training_info, training_info_bit, **kwargs)
def _train_system(self, training_code, training_info, training_info_bit, **kwargs): """Train the ML algorithm""" raise NotImplementedError("The general ML decoder is not implemented") @staticmethod def _create_decoder(model, **kwargs): _arguments = inspect.getargspec(model) _options = {k: v for k, v in kwargs.items() if k in _arguments.args} _decoder = model(**_options) return _decoder
[docs]class SvmDecoder(MachineLearningDecoder): """ Decoder class which uses Support Vector Machines (SVM) for decoding. The sklearn.svm implementation is used. All parameters which are accepted for creating a SVM can be used here. Parameters ---------- C : float, optional Penalty score. kernel : str, optional Kernel function. See the sklearn.svm documentation for implemented kernel functions. gamma : float, optional Kernel parameter. This might be ignored, depending on the kernel. """ # def __init__(self, code_length, info_length, C=1, kernel='rbf', gamma=1, # **kwargs): # pass def _create_decoder(self, **kwargs): return svm.SVC(**kwargs) def _train_system(self, training_code, training_info, training_info_bit, **kwargs): _decoder = self.decoder _decoder.fit(training_code, np.ravel(training_info)) return _decoder
[docs] def decode_messages(self, messages, channel=None): _pred_info = self.decoder.predict(messages) _pred_info_bit = unpack_to_bits(_pred_info, self.info_length) return _pred_info_bit
def __str__(self): return "SvmDecoder" def __repr__(self): return repr(self.decoder)
[docs]class ElmDecoder(MachineLearningDecoder): """ Decoder class which uses Extreme Learning Machines (ELM) for decoding. The hpelm implementation is used. Parameters ---------- neurons : list of tuples List of tuples, where each tuple looks like the following: (num_neuron, activation). The activation function can be either a string accepted by the ELM class or a numpy function. To be precise: the tuple is passed to the ELM.add_neurons method. Note that the output layer is linear. """ def __init__(self, code_length, info_length, neurons, **kwargs): #TODO:Add some logging and storing of history... super().__init__(code_length, info_length, neurons=neurons, **kwargs) def _create_decoder(self, neurons, **kwargs): _decoder = ELM(self.code_length, self.info_length, **kwargs) for _neurons in neurons: _decoder.add_neurons(*_neurons) return _decoder def _train_system(self, training_code, training_info, training_info_bit, **kwargs): _decoder = self.decoder _decoder.train(training_code, training_info_bit, **kwargs) return _decoder
[docs] def decode_messages(self, messages, channel=None): _pred_info = self.decoder.predict(messages) _pred_info_bit = np.clip(_pred_info, 0, 1) _pred_info_bit = np.round(_pred_info_bit) return _pred_info_bit
def __str__(self): return "ElmDecoder" def __repr__(self): return repr(self.decoder)
[docs]class NeuralNetDecoder(MachineLearningDecoder): """Decoder class to decode channel codes using feed forward neural networks Parameters ---------- layer : list (int) Number of nodes in each layer. train_snr : float Training SNR in dB. activation : str Activation function used for all hidden layers. See the Keras documentation for details. loss : str, optional Name of loss function. Default: 'binary_crossentropy'. kwargs : keyword arguments All arguements that are accepted by the Keras `keras.models.compile` method. """ def __init__(self, code_length, info_length, layer, train_snr=2., activation='relu', one_hot=False, optimizer='adam', loss='binary_crossentropy', **kwargs): #TODO:Add some logging and storing of history... self.one_hot = one_hot self.train_snr = train_snr super().__init__(code_length, info_length, activation=activation, layer=layer, optimizer=optimizer, loss=loss, **kwargs) # Copied from https://github.com/gruberto/D-ChannelDecoding (1701.07738v1) @staticmethod def __compose_model(layers): model = Sequential() for layer in layers: model.add(layer) return model def _create_decoder(self, layer, activation, **kwargs): if not layer: _layers = [layers.InputLayer(input_shape=(self.code_length,))] else: _layers = [layers.Dense(layer[0], input_shape=(self.code_length,), activation=activation)] _layers.extend([layers.Dense(k, activation=activation) for k in layer[1:]]) if self.one_hot: kwargs['loss'] = 'categorical_crossentropy' _layers.append(layers.Dense(2**self.info_length, activation='softmax')) else: _layers.append(layers.Dense(self.info_length, activation='sigmoid')) decoder = self.__compose_model(_layers) decoder.compile(**kwargs) #train_snr_lin = 10**(train_snr/10.) #TODO:This is only for BPSK with +-1 modulation #noise_layer = layers.GaussianNoise( # np.sqrt(1./(2*(self.info_length/self.code_length)*train_snr_lin)), # input_shape=(self.code_length,)) #train_model = self.__compose_model([noise_layer]+_layers) #train_model.compile(**kwargs) #return decoder#, train_model return decoder, kwargs def _train_system(self, training_code, training_info, training_info_bit, train_snr='self', store_history=False, **kwargs): """Accept all kwargs from `keras.models.fit`""" if train_snr == 'self': train_snr = self.train_snr decoder, compile_options = self.decoder train_snr_lin = 10**(train_snr/10.) input_power = np.var(training_code) #_train_noise_power = input_power/(2*(self.info_length/self.code_length)*train_snr_lin) _train_noise_power = input_power/(2*train_snr_lin) noise_layer = layers.GaussianNoise(np.sqrt(_train_noise_power), input_shape=(self.code_length,)) train_model = self.__compose_model([noise_layer]+decoder.layers) train_model.compile(**compile_options) if self.one_hot: _train_info = to_categorical(training_info, 2**self.info_length) else: _train_info = training_info_bit history = train_model.fit(training_code, _train_info, verbose=0, **kwargs) if store_history: _str_layers = "_".join([str(k.units) for k in decoder.layers]) history_file = "L{}-T{}-OH{}.hist".format(_str_layers, train_snr, int(self.one_hot)) with open(history_file, 'wb') as _history_file: pickle.dump(history.history, _history_file) return decoder
[docs] def decode_messages(self, messages, channel=None): pred = self.decoder.predict(messages) if self.one_hot: pred_info = np.argmax(pred, axis=1) pred_info = unpack_to_bits(pred_info, self.info_length) else: pred_info = np.round(pred) return pred_info