Source code for digcommpy.channels

from abc import ABC, abstractmethod

import numpy as np

from .checks import is_binary_message
from . import information_theory as it


[docs]class Channel(ABC): """Abstract channel class.""" def __init__(self, *args, **kwargs): self.name = "channel"
[docs] @abstractmethod def transmit_data(self, messages): pass
[docs] @abstractmethod def capacity(): pass
[docs] @abstractmethod def set_params(self, new_params): pass
[docs] @abstractmethod def get_channelstate(self): pass
[docs]class BinaryInputChannel(Channel, ABC): """Abstract channel class for binary input channels"""
[docs] @abstractmethod def transmit_data(self, messages): if not is_binary_message(messages): raise TypeError("Messages for binary input channels have to be " "binary (only 0 and 1).")
[docs]class AwgnChannel(Channel): """Additive white Gaussian noise channel. Parameters ---------- snr_db : float Signal-to-noise ratio in dB. rate : float, optional Rate of the information bits to transmitted symbols (includes code and modulation rate) input_power : float, optional Input power of the symbols. If None, the power is estimated. """ def __init__(self, snr_db, rate=1., input_power=None): self.name = "AWGN" self.snr_db = snr_db if 0 < rate <= 1.: self.rate = rate else: raise ValueError("The rate has to be in (0, 1]") self.input_power = input_power if input_power is not None: snr_lin = 10**(snr_db/10.) self.noise_var = input_power/(2.*rate*snr_lin)
[docs] def transmit_data(self, messages): return self._awgn(messages, self.snr_db, self.rate, self.input_power)
@staticmethod def _awgn(messages, snr=0., rate=1., input_power=None): """Additive White Gaussian Noise channel. Parameters ---------- messages : array Array of shape (num_messages, num_uses) which holds the channel inputs. snr : float, optional Signal-to-noise-ratio in dB. rate : float, optional Float between [0, 1] which indicates the coding and modulation rate. input_power : float, optional Specify the input power. If `None`, the power is estimated as variance of the messages. Returns ------- output : array Channel output with the same dimensions as the input. """ if input_power is None: #input_power = np.mean(np.var(messages, axis=1)) input_power = np.var(messages) snr_lin = 10**(snr/10.) noise_power = input_power/(2.*rate*snr_lin) noise = np.random.normal(loc=0, scale=np.sqrt(noise_power), size=np.shape(messages)) output = messages + noise return output
[docs] def capacity(self): capacity = np.log2(1.+self.snr_db) return capacity
[docs] def set_params(self, snr_db): self.snr_db = snr_db
[docs] def get_channelstate(self): return self.snr_db
[docs]class BawgnChannel(AwgnChannel): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.name = "BAWGN"
[docs] def capacity(self): capacity = it._capacity_bawgn(self.snr_db) return capacity
[docs]class BecChannel(BinaryInputChannel): """Binary erasure channel.""" def __init__(self, epsilon): self.name = "BEC" if 0 <= epsilon <= 1: self.epsilon = epsilon else: raise ValueError("Erasure probability needs to be in [0, 1].")
[docs] def get_channelstate(self): return self.epsilon
[docs] def transmit_data(self, messages): super().transmit_data(messages) return self._bec(messages, self.epsilon)
@staticmethod def _bec(messages, epsilon): """Binary erasure channel. Binary messages will be mapped to (0, 1, -1) where -1 denotes the erasure symbol. Parameters ---------- messages : array Array of shape (num_messages, num_uses) which holds the channel inputs. epsilon : float Erasure probability (channel parameters) Returns ------- output: array Channel output with the same dimensions as the input. -1 denotes the erasure symbol. """ idx_noise = np.random.choice([True, False], p=[epsilon, 1.-epsilon], size=np.shape(messages)) output = np.array(messages) output[idx_noise] = -1 return output
[docs] def capacity(self): return 1.-self.epsilon
[docs] def set_params(self, epsilon): self.epsilon = epsilon
[docs]class BscChannel(BinaryInputChannel): """Binary symmetric channel""" def __init__(self, prob): self.name = "BSC" if 0 <= prob <= 1: self.prob = prob else: raise ValueError("Bit flip probability needs to be in [0, 1].")
[docs] def get_channelstate(self): return self.prob
[docs] def transmit_data(self, messages): super().transmit_data(messages) return self._bsc(messages, self.prob)
@staticmethod def _bsc(messages, prob): flips = np.random.choice([1, 0], p=[prob, 1.-prob], size=np.shape(messages)) output = np.array(messages) + flips output = np.mod(output, 2) return output
[docs] def capacity(self): return 1.-it.binary_entropy(self.prob)
[docs] def set_params(self, prob): if 0 <= prob <= 1: self.prob = prob else: raise ValueError("Bit flip probability needs to be in [0, 1].")