from abc import ABC, abstractmethod
import numpy as np
from joblib import Parallel, delayed, cpu_count
from .information_theory import get_info_length
from .channels import Channel
from .messages import generate_data, pack_to_dec
from .parsers import read_codebook_file
def _logdomain_diff(x, y):
#Copied from http://polarcodes.com Matlab package
return x + np.log1p(-np.exp(y-x))
[docs]class Encoder(ABC):
"""Abstract encoder class"""
def __init__(self, code_length, info_length, random_length=0, base=2,
parallel=True, **kwargs):
self.code_length = code_length
self.info_length = info_length
self.random_length = random_length
self.base = base
self.parallel = parallel
[docs] @abstractmethod
def encode_messages(self, messages): pass
[docs] def generate_codebook(self):
all_messages = generate_data(self.info_length, binary=True)
all_codewords = self.encode_messages(all_messages)
return all_messages, all_codewords
[docs]class IdentityEncoder(Encoder):
[docs] @staticmethod
def encode_messages(messages):
return messages
[docs]class CodebookEncoder(Encoder):
"""Generic Encoder class for arbitrary codebooks.
This encoder allows encoding messages using a codebook as lookup table.
Any mapping between messages and codewords may be used.
Parameters
----------
codebook : tuple or dict or str
The mapping between messages and codewords. Either as a tuple of arrays
(messages, codewords) where the rows correspond to each other, or as
dict where the keys are the messages as integers, or as a path to a file
containing a codebook.
wiretap : bool, optional
Set True if the code is a wiretap code.
random_length : int, optional
Number of random bits, if the code is a wiretap code. This is ignored,
if wiretap is False.
"""
def __init__(self, code_length, info_length, codebook, wiretap=False,
random_length=0, **kwargs):
self.wiretap = wiretap
self.codebook, _code_info = self._create_codebook(codebook, wiretap)
if 'code_length' in _code_info:
code_length = _code_info['code_length']
if 'info_length' in _code_info:
info_length = _code_info['info_length']
if wiretap:
if 'random_length' in _code_info:
random_length = _code_info['random_length']
super().__init__(code_length, info_length, random_length=random_length,
**kwargs)
@staticmethod
def _create_codebook(codebook, wiretap=False):
if isinstance(codebook, dict):
code_info = {}
elif isinstance(codebook, (tuple, list)):
messages, codewords = codebook
messages = pack_to_dec(messages)
codebook = {k[0]: v for k, v in zip(messages, codewords)}
code_info = {}
elif isinstance(codebook, str):
codebook, code_info = read_codebook_file(codebook, wiretap)
else:
raise TypeError("The codebook has to be a dict, or list or tuple, "
"or path. Got type: {}".format(type(codebook)))
return codebook, code_info
[docs] def generate_codebook(self):
if not self.wiretap:
all_messages, all_codewords = super().generate_codebook()
else:
all_messages = generate_data(self.info_length+self.random_length, binary=True)
all_codewords = self.encode_messages(all_messages)
all_messages = all_messages[:, :self.info_length]
return all_messages, all_codewords
[docs] def encode_messages(self, messages):
messages = pack_to_dec(messages)
messages = np.ravel(messages)
if self.parallel:
num_cores = cpu_count()
code_words = Parallel(n_jobs=num_cores)(
delayed(self.codebook.get)(k) for k in messages)
code_words = np.array(code_words)
else:
code_words = np.zeros((len(messages), self.code_length))
for idx, message in enumerate(messages):
code_words[idx] = self.codebook.get(message)
return code_words
[docs]class RepetitionEncoder(Encoder):
def __init__(self, code_length, **kwargs):
super().__init__(code_length, info_length=1)
[docs] def encode_messages(self, messages):
messages = np.reshape(messages, (-1, 1))
messages = np.repeat(messages, self.code_length, axis=1)
return messages
[docs]class LinearEncoder(Encoder):
"""Linear block encoder.
Parameters
----------
code_matrix : array
Code generation matrix. Dimension is (info_bits, code_length).
base : int, optional
Base of the field. Default is binary (2).
"""
def __init__(self, code_matrix, base=2, **kwargs):
self.code_matrix = code_matrix
info_length, code_length = np.shape(code_matrix)
super().__init__(code_length, info_length, base=base)
[docs] def encode_messages(self, messages):
codewords = np.matmul(messages, self.code_matrix)
return np.mod(codewords, self.base)
[docs]class PolarEncoder(Encoder):
"""Polar code encoder.
The implementation is copied from the Matlab implementation from
http://www.polarcodes.com
Parameters
----------
code_length : int
Length of the codewords.
info_length : int
Number of information bits.
design_channel : str or Channel object
Design channel name or channel object. Supported are: `AWGN`, `BEC` and
`BSC`.
design_channelstate: float (optional)
State of the design channel. AWGN: SNR, BEC: epsilon, BSC: p.
frozenbits : array (optional)
Array of frozen bits. If not given, all zeros will be used.
parallel : bool (optional)
Use parallel encoding of the codewords. This might not be supported on
all machines.
"""
def __init__(self, code_length, info_length, design_channel,
design_channelstate=0., frozenbits=None, parallel=True, **kwargs):
if isinstance(design_channel, Channel):
channel_name = design_channel.name
design_channelstate = design_channel.get_channelstate()
else:
channel_name = design_channel
self.design_channel = channel_name
self.design_channelstate = design_channelstate
self.frozenbits = frozenbits
self.pos_lookup = self.construct_polar_code(
code_length, info_length, design_channel, design_channelstate,
frozenbits)
super().__init__(code_length, info_length, parallel=parallel)
[docs] @staticmethod
def construct_polar_code(code_length, info_length, design_channel,
design_channelstate, frozenbits=None):
if isinstance(design_channel, Channel):
channel_name = design_channel.name
design_channelstate = design_channel.get_channelstate()
else:
channel_name = design_channel.upper()
z0 = np.zeros(code_length)
if channel_name == "BAWGN":
param = 10**(design_channelstate/10.)
z0[0] = -param*(info_length/code_length)
elif channel_name == "BSC":
z0[0] = (np.log(2) + .5*np.log(design_channelstate) +
.5*np.log(1-design_channelstate))
elif channel_name == "BEC":
z0[0] = np.log(design_channelstate)
for j in range(1, int(np.log2(code_length))+1):
u = 2**j
for t in range(int(u/2)):
T = z0[t]
z0[t] = _logdomain_diff(np.log(2)+T, 2*T)
z0[int(u/2)+t] = 2*T
idx = np.argsort(z0)
idx_good = np.sort(idx[:info_length])
idx_bad = np.sort(idx[info_length:])
A = np.zeros(code_length, dtype=int)
A[idx_good] = -1 # information bits
if frozenbits is None:
frozenbits = np.zeros(code_length-info_length)
A[idx_bad] = frozenbits
return A
[docs] def encode_messages(self, messages):
code_words = self._fill_codewords(messages, self.pos_lookup)
code_words = self._transform_codewords(code_words)
#code_words = np.zeros((len(messages), self.code_length))
#_pos_lookup = np.tile(self.pos_lookup, (len(messages), 1))
#if self.parallel:
# num_cores = cpu_count()
# code_words = Parallel(n_jobs=num_cores)(
# delayed(self._encode_single_message)(
# k, _pos_lookup[idx]) for idx, k in enumerate(messages))
# code_words = np.array(code_words)
#else:
# for idx, message in enumerate(messages):
# code_words[idx] = self._encode_single_message(
# message, _pos_lookup[idx])
return code_words
# The following methods are copied from pencode.m file (polarcodes.com)
def _encode_single_message(self, message, pos_lookup):
code_word = self._fill_single_codeword(message, pos_lookup)
code_word = self._transform_single_codeword(code_word)
return code_word
@staticmethod
def _fill_single_codeword(message, pos_lookup):
#code_length = len(pos_lookup)
code_word = np.array(pos_lookup)
code_word[code_word == -1] = message
return code_word
@staticmethod
def _transform_single_codeword(code_word):
code_length = len(code_word)
n = int(np.ceil(np.log2(code_length)))
for i in range(n):
B = 2**(n-i)
nB = 2**(i)
for j in range(nB):
base = j*B
for l in range(int(B/2)):
code_word[base+l] = np.mod(
code_word[base+l] + code_word[int(base+B/2+l)], 2)
return code_word
@staticmethod
def _fill_codewords(message, pos_lookup):
pos_lookup = np.array(pos_lookup, dtype=float)
code_words = np.tile(pos_lookup, (len(message), 1))
np.place(code_words, code_words == -1, message)
return code_words
@staticmethod
def _transform_codewords(code_words):
code_length = np.shape(code_words)[1]
n = int(np.ceil(np.log2(code_length)))
for i in range(n):
B = 2**(n-i)
nB = 2**(i)
for j in range(nB):
base = j*B
for l in range(int(B/2)):
code_words[:, base+l] = np.mod(
code_words[:, base+l] + code_words[:, int(base+B/2+l)], 2)
return code_words
[docs]class PolarWiretapEncoder(PolarEncoder):
"""Encoder for polar wiretap codes.
It uses [1] for the construction of the codes.
[1] H. Mahdavifar and A. Vardy, "Achieving the Secrecy Capacity of Wiretap
Channels Using Polar Codes" IEEE Trans. Inf. Theory, vol. 57, no. 10,
pp. 6428–6443, Oct. 2011.
Parameters
----------
code_length : int
Length of the code
design_channel_bob : str or Channel object
Name of the design channel to Bob (main channel)
design_channel_eve : str or Channel object
Name of the design channel to Eve (wiretap channel)
design_channelstate_bob : float, optional
Design channelstate of the main channel. It is ignored if the
`design_channel_bob` argument is a Channel like object.
design_channelstate_eve : float, optional
Design chanelstate of the wiretap channel. It is ignored if the
`design_channel_eve` argument is a Channel like object.
frozenbits : array, optional
Array of frozen bits. If not given, all zeros will be used.
info_length_bob : int, optional
Number of information bits to Bob. If None, the number is calculated
based on the main channel's capacity.
random_length : int, optional
Number of random bits. If None, the number is calculated based on the
capacity of the eavesdropper's channel.
"""
def __init__(self, code_length, design_channel_bob, design_channel_eve,
design_channelstate_bob=0, design_channelstate_eve=0,
frozenbits=None, info_length_bob=None, random_length=None,
parallel=True, **kwargs):
self.pos_lookup = self.construct_polar_wiretap_code(code_length,
design_channel_bob, design_channel_eve, design_channelstate_bob,
design_channelstate_eve, frozenbits, info_length_bob, random_length)
info_length = np.count_nonzero(self.pos_lookup == -1)
#self.info_length_bob = np.count_nonzero(self.pos_lookup == -2) + info_length
self.info_length_bob = np.count_nonzero(self.pos_lookup < 0)
random_length = self.info_length_bob - info_length
Encoder.__init__(self, code_length, info_length,
random_length=random_length, parallel=parallel)
[docs] @staticmethod
def construct_polar_wiretap_code(code_length, design_channel_bob,
design_channel_eve, design_channelstate_bob, design_channelstate_eve,
frozenbits=None, info_length_bob=None, random_length=None):
if info_length_bob is None:
info_length_bob = get_info_length(code_length, design_channel_bob,
design_channelstate_bob)
if random_length is None:
info_length_eve = get_info_length(code_length, design_channel_eve,
design_channelstate_eve)
else:
info_length_eve = random_length
good_bob = PolarEncoder.construct_polar_code(code_length,
info_length_bob, design_channel_bob, design_channelstate_bob, frozenbits)
good_eve = PolarEncoder.construct_polar_code(code_length,
info_length_eve, design_channel_eve, design_channelstate_eve, frozenbits)
# info_length_eve, design_channel_bob, design_channelstate_bob, frozenbits)
good_bob = np.where(good_bob == -1)[0]
good_eve = np.where(good_eve == -1)[0]
set_gb = set(good_bob)
set_ge = set(good_eve)
if not set_ge.issubset(set_gb):
raise ValueError("There is something wrong with the sets of good "
"and bad positions.")
sec_bits = np.setdiff1d(good_bob, good_eve)
pos_lookup = np.zeros(code_length)
pos_lookup[good_eve] = -2 # random bits
pos_lookup[sec_bits] = -1 # secure bits
return pos_lookup
@staticmethod
def _fill_single_codeword(message, pos_lookup):
_num_random = np.count_nonzero(pos_lookup == -2)
_num_info = np.count_nonzero(pos_lookup == -1)
if len(message) == np.count_nonzero(pos_lookup < 0):
message, random_bits = message[:_num_info], message[_num_info:]
#message, random_bits = message[-_num_info:], message[:-_num_info]
else:
random_bits = np.random.randint(0, 2, size=_num_random)
code_word = PolarEncoder._fill_single_codeword(message, pos_lookup)
code_word[code_word == -2] = random_bits
return code_word
@staticmethod
def _fill_codewords(message, pos_lookup):
_num_random = np.count_nonzero(pos_lookup == -2)
_num_info = np.count_nonzero(pos_lookup == -1)
if np.shape(message)[1] == np.count_nonzero(pos_lookup < 0):
message, random_bits = message[:, :_num_info], message[:, _num_info:]
#message, random_bits = message[-_num_info:], message[:-_num_info]
else:
random_bits = np.random.randint(0, 2, size=(len(message), _num_random))
code_words = PolarEncoder._fill_codewords(message, pos_lookup)
np.place(code_words, code_words == -2, random_bits)
return code_words
[docs] def generate_codebook(self, return_random=False):
all_combinations = generate_data(self.info_length_bob, binary=True)
all_codewords = self.encode_messages(all_combinations)
all_messages = all_combinations[:, :self.info_length]
if return_random:
all_random = all_combinations[:, self.info_length:]
return all_messages, all_codewords, all_random
return all_messages, all_codewords