-
Notifications
You must be signed in to change notification settings - Fork 0
/
scdc.py
202 lines (172 loc) · 5.48 KB
/
scdc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""
S,C-dense coding in Python 3
Please refer to readme for theoretical information.
Roman Kotenko, 2017
"""
from collections import defaultdict
from hashlib import md5
from os import makedirs
from os.path import isdir, isfile, join
from re import findall
from timeit import default_timer
PATTERN = '[\w\'"-]+|[ \n.,!?:;]'
CODE_FILE = '%d.txt'
DECODE_DIR = 'decode'
ENCODE_DIR = 'encode'
VOCAB_FILE = 'vocab.txt'
TEXT_FILE = 'text.txt'
text = None
split = None
vocab = None
def scdc_encode(s):
"""
SCDC encode text.
:param s: s parameter for scdc
:return: wall execution time in seconds, encoded bytes size
"""
start = default_timer()
c = 256 - s
out = b''
global text, split, vocab
for item in split:
if item == ' ':
i = vocab.index('%space%')
elif item == '\n':
i = vocab.index('%newline%')
else:
i = vocab.index(item)
cur = (i % s + c).to_bytes(1, 'big')
x = i // s
while x > 0:
x -= 1
cur += (x % c).to_bytes(1, 'big')
x //= c
out += cur[::-1]
with open(join(ENCODE_DIR, CODE_FILE % s), 'wb') as file_out:
file_out.write(out)
return default_timer() - start, len(out)
def scdc_decode(s):
"""
Decode SCDC encoded text.
:param s: s parameter for scdc
:return: wall execution time in seconds
"""
def generate_base():
"""
Generate base values for decoding.
base[0] = 0, base[1] = s, base[2] = s + sc, base[3] = s + sc + sc^2 ...
Please refer to http://vios.dc.fi.udc.es/codes/semistatic.html
:return: base values - list
"""
yield 0
yield s
cc = 256 - s
prev = s
while True:
prev = prev + s * cc
yield prev
cc *= cc
def write_decode(text):
"""
Write decode result to corresponding file.
:param text: decoded text
:return: None
"""
with open(join(DECODE_DIR, CODE_FILE % s), 'w', encoding='utf-8') as file_out:
for item in text:
if item == '%space%':
file_out.write(' ')
elif item == '%newline%':
file_out.write('\n')
else:
file_out.write(item)
start = default_timer()
filename = join(ENCODE_DIR, CODE_FILE % s)
if not isfile(filename):
print("file '%s' not found for decoding" % filename)
return None
with open(filename, 'rb') as file_in:
code = file_in.read()
c = 256 - s
global vocab
out = [] # decoded text
cur = 0 # index in vocabulary of item represented by current byte sequence
base = generate_base()
for x in code:
if x < c:
# This is a non-stopping byte.
cur = cur * c + x
# With each non-stopping byte just move on to next base value,
# so that we have the value we need when the stopping byte is hit.
next(base)
else:
# We hit stopping byte in the sequence.
cur = cur * s + x - c + next(base)
out.append(vocab[cur])
# Reset these before moving to next byte sequence.
cur = 0
base = generate_base()
write_decode(out)
return default_timer() - start
def scdc_prepare():
"""
Create required directories, read text from file, split the text, and get vocabulary (unique words) for it.
:return: None
"""
def md5calc(filename):
"""
Calculate file's MD5 hashsum.
:param filename: you know
:return: MD5 sum
"""
hashsum = md5()
with open(filename, "rb") as f:
for chunk in iter(lambda: f.read(4096), b''):
hashsum.update(chunk)
return hashsum.hexdigest()
def generate_vocab():
"""
Generate vocabulary from input text and write it to file.
First line will be MD5 hash of original text file, following lines -- one vocabulary item each.
:return: None
"""
vocab_dict = defaultdict(int)
global text, split, vocab
for item in split:
if item == ' ':
vocab_dict['%space%'] += 1
elif item == '\n':
vocab_dict['%newline%'] += 1
else:
vocab_dict[item] += 1
vocab = [item[0] for item in sorted(vocab_dict.items(), key=lambda a: a[1], reverse=True)]
with open(VOCAB_FILE, 'w', encoding='utf-8') as file_out:
file_out.write(md5hash + '\n')
for item in vocab:
file_out.write(item + '\n')
global text, split
with open(TEXT_FILE, 'r', encoding='utf-8') as file_in:
text = file_in.read()
split = findall(PATTERN, text)
md5hash = md5calc(TEXT_FILE)
if isfile(VOCAB_FILE):
with open(VOCAB_FILE, 'r', encoding='utf-8') as file_in:
global vocab
vocab = []
for line in file_in:
vocab.append(line.rstrip())
# Compare md5 of TEXT_FILE and the file vocab was compiled for.
if vocab[0] == md5hash:
del vocab[0]
else:
generate_vocab()
else:
generate_vocab()
for dir in (ENCODE_DIR, DECODE_DIR):
if not isdir(dir):
makedirs(dir)
if __name__ == '__main__':
scdc_prepare()
for f in (scdc_encode, scdc_decode):
for s in range(1, 256):
print(f(s))