Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/bitarray/util.py: 23%

213 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:15 +0000

1# Copyright (c) 2019 - 2023, Ilan Schnell; All Rights Reserved 

2# bitarray is published under the PSF license. 

3# 

4# Author: Ilan Schnell 

5""" 

6Useful utilities for working with bitarrays. 

7""" 

8from __future__ import absolute_import 

9 

10import os 

11import sys 

12 

13from bitarray import bitarray, bits2bytes 

14 

15from bitarray._util import ( 

16 zeros, count_n, rindex, parity, 

17 count_and, count_or, count_xor, any_and, subset, 

18 _correspond_all, 

19 serialize, deserialize, 

20 ba2hex, hex2ba, 

21 ba2base, base2ba, 

22 sc_encode, sc_decode, 

23 vl_encode, vl_decode, 

24 canonical_decode, 

25) 

26 

27__all__ = [ 

28 'zeros', 'urandom', 'pprint', 'make_endian', 'rindex', 'strip', 'count_n', 

29 'parity', 'count_and', 'count_or', 'count_xor', 'any_and', 'subset', 

30 'intervals', 

31 'ba2hex', 'hex2ba', 

32 'ba2base', 'base2ba', 

33 'ba2int', 'int2ba', 

34 'serialize', 'deserialize', 

35 'sc_encode', 'sc_decode', 

36 'vl_encode', 'vl_decode', 

37 'huffman_code', 'canonical_huffman', 'canonical_decode', 

38] 

39 

40 

41_is_py2 = bool(sys.version_info[0] == 2) 

42 

43 

44def urandom(__length, endian=None): 

45 """urandom(length, /, endian=None) -> bitarray 

46 

47Return a bitarray of `length` random bits (uses `os.urandom`). 

48""" 

49 a = bitarray(0, endian) 

50 a.frombytes(os.urandom(bits2bytes(__length))) 

51 del a[__length:] 

52 return a 

53 

54 

55def pprint(__a, stream=None, group=8, indent=4, width=80): 

56 """pprint(bitarray, /, stream=None, group=8, indent=4, width=80) 

57 

58Prints the formatted representation of object on `stream` (which defaults 

59to `sys.stdout`). By default, elements are grouped in bytes (8 elements), 

60and 8 bytes (64 elements) per line. 

61Non-bitarray objects are printed by the standard library 

62function `pprint.pprint()`. 

63""" 

64 if stream is None: 

65 stream = sys.stdout 

66 

67 if not isinstance(__a, bitarray): 

68 import pprint as _pprint 

69 _pprint.pprint(__a, stream=stream, indent=indent, width=width) 

70 return 

71 

72 group = int(group) 

73 if group < 1: 

74 raise ValueError('group must be >= 1') 

75 indent = int(indent) 

76 if indent < 0: 

77 raise ValueError('indent must be >= 0') 

78 width = int(width) 

79 if width <= indent: 

80 raise ValueError('width must be > %d (indent)' % indent) 

81 

82 gpl = (width - indent) // (group + 1) # groups per line 

83 epl = group * gpl # elements per line 

84 if epl == 0: 

85 epl = width - indent - 2 

86 type_name = type(__a).__name__ 

87 # here 4 is len("'()'") 

88 multiline = len(type_name) + 4 + len(__a) + len(__a) // group >= width 

89 if multiline: 

90 quotes = "'''" 

91 elif __a: 

92 quotes = "'" 

93 else: 

94 quotes = "" 

95 

96 stream.write("%s(%s" % (type_name, quotes)) 

97 for i, b in enumerate(__a): 

98 if multiline and i % epl == 0: 

99 stream.write('\n%s' % (indent * ' ')) 

100 if i % group == 0 and i % epl != 0: 

101 stream.write(' ') 

102 stream.write(str(b)) 

103 

104 if multiline: 

105 stream.write('\n') 

106 

107 stream.write("%s)\n" % quotes) 

108 stream.flush() 

109 

110 

111def make_endian(__a, endian): 

112 """make_endian(bitarray, /, endian) -> bitarray 

113 

114When the endianness of the given bitarray is different from `endian`, 

115return a new bitarray, with endianness `endian` and the same elements 

116as the original bitarray. 

117Otherwise (endianness is already `endian`) the original bitarray is returned 

118unchanged. 

119""" 

120 if not isinstance(__a, bitarray): 

121 raise TypeError("bitarray expected, got '%s'" % type(__a).__name__) 

122 

123 if __a.endian() == endian: 

124 return __a 

125 

126 return bitarray(__a, endian) 

127 

128 

129def strip(__a, mode='right'): 

130 """strip(bitarray, /, mode='right') -> bitarray 

131 

132Return a new bitarray with zeros stripped from left, right or both ends. 

133Allowed values for mode are the strings: `left`, `right`, `both` 

134""" 

135 if not isinstance(mode, str): 

136 raise TypeError("str expected for mode, got '%s'" % type(__a).__name__) 

137 if mode not in ('left', 'right', 'both'): 

138 raise ValueError("mode must be 'left', 'right' or 'both', got %r" % 

139 mode) 

140 if mode == 'right': 

141 start = None 

142 else: 

143 try: 

144 start = __a.index(1) 

145 except ValueError: 

146 return __a[:0] 

147 

148 if mode == 'left': 

149 stop = None 

150 else: 

151 try: 

152 stop = rindex(__a) + 1 

153 except ValueError: 

154 return __a[:0] 

155 

156 return __a[start:stop] 

157 

158 

159def intervals(__a): 

160 """intervals(bitarray, /) -> iterator 

161 

162Compute all uninterrupted intervals of 1s and 0s, and return an 

163iterator over tuples `(value, start, stop)`. The intervals are guaranteed 

164to be in order, and their size is always non-zero (`stop - start > 0`). 

165""" 

166 try: 

167 value = __a[0] # value of current interval 

168 except IndexError: 

169 return 

170 n = len(__a) 

171 stop = 0 # "previous" stop - becomes next start 

172 

173 while stop < n: 

174 start = stop 

175 # assert __a[start] == value 

176 try: # find next occurrence of opposite value 

177 stop = __a.index(not value, start) 

178 except ValueError: 

179 stop = n 

180 yield int(value), start, stop 

181 value = not value # next interval has opposite value 

182 

183 

184def ba2int(__a, signed=False): 

185 """ba2int(bitarray, /, signed=False) -> int 

186 

187Convert the given bitarray to an integer. 

188The bit-endianness of the bitarray is respected. 

189`signed` indicates whether two's complement is used to represent the integer. 

190""" 

191 if not isinstance(__a, bitarray): 

192 raise TypeError("bitarray expected, got '%s'" % type(__a).__name__) 

193 length = len(__a) 

194 if length == 0: 

195 raise ValueError("non-empty bitarray expected") 

196 

197 le = bool(__a.endian() == 'little') 

198 if __a.padbits: 

199 pad = zeros(__a.padbits, __a.endian()) 

200 __a = __a + pad if le else pad + __a 

201 

202 if _is_py2: 

203 a = bitarray(__a, 'big') 

204 if le: 

205 a.reverse() 

206 res = int(ba2hex(a), 16) 

207 else: # py3 

208 res = int.from_bytes(__a.tobytes(), byteorder=__a.endian()) 

209 

210 if signed and res >= 1 << (length - 1): 

211 res -= 1 << length 

212 return res 

213 

214 

215def int2ba(__i, length=None, endian=None, signed=False): 

216 """int2ba(int, /, length=None, endian=None, signed=False) -> bitarray 

217 

218Convert the given integer to a bitarray (with given endianness, 

219and no leading (big-endian) / trailing (little-endian) zeros), unless 

220the `length` of the bitarray is provided. An `OverflowError` is raised 

221if the integer is not representable with the given number of bits. 

222`signed` determines whether two's complement is used to represent the integer, 

223and requires `length` to be provided. 

224""" 

225 if not isinstance(__i, (int, long) if _is_py2 else int): 

226 raise TypeError("int expected, got '%s'" % type(__i).__name__) 

227 if length is not None: 

228 if not isinstance(length, int): 

229 raise TypeError("int expected for length") 

230 if length <= 0: 

231 raise ValueError("length must be > 0") 

232 if signed and length is None: 

233 raise TypeError("signed requires length") 

234 

235 if __i == 0: 

236 # there are special cases for 0 which we'd rather not deal with below 

237 return zeros(length or 1, endian) 

238 

239 if signed: 

240 m = 1 << (length - 1) 

241 if not (-m <= __i < m): 

242 raise OverflowError("signed integer not in range(%d, %d), " 

243 "got %d" % (-m, m, __i)) 

244 if __i < 0: 

245 __i += 1 << length 

246 else: # unsigned 

247 if __i < 0: 

248 raise OverflowError("unsigned integer not positive, got %d" % __i) 

249 if length and __i >= (1 << length): 

250 raise OverflowError("unsigned integer not in range(0, %d), " 

251 "got %d" % (1 << length, __i)) 

252 

253 a = bitarray(0, endian) 

254 le = bool(a.endian() == 'little') 

255 if _is_py2: 

256 s = hex(__i)[2:].rstrip('L') 

257 a.extend(hex2ba(s, 'big')) 

258 if le: 

259 a.reverse() 

260 else: # py3 

261 b = __i.to_bytes(bits2bytes(__i.bit_length()), byteorder=a.endian()) 

262 a.frombytes(b) 

263 

264 if length is None: 

265 return strip(a, 'right' if le else 'left') 

266 

267 la = len(a) 

268 if la > length: 

269 a = a[:length] if le else a[-length:] 

270 if la < length: 

271 pad = zeros(length - la, a.endian()) 

272 a = a + pad if le else pad + a 

273 assert len(a) == length 

274 return a 

275 

276# ------------------------------ Huffman coding ----------------------------- 

277 

278def _huffman_tree(__freq_map): 

279 """_huffman_tree(dict, /) -> Node 

280 

281Given a dict mapping symbols to their frequency, construct a Huffman tree 

282and return its root node. 

283""" 

284 from heapq import heappush, heappop 

285 

286 class Node(object): 

287 """ 

288 A Node instance will either have a 'symbol' (leaf node) or 

289 a 'child' (a tuple with both children) attribute. 

290 The 'freq' attribute will always be present. 

291 """ 

292 def __lt__(self, other): 

293 # heapq needs to be able to compare the nodes 

294 return self.freq < other.freq 

295 

296 minheap = [] 

297 # create all leaf nodes and push them onto the queue 

298 for sym, f in __freq_map.items(): 

299 leaf = Node() 

300 leaf.symbol = sym 

301 leaf.freq = f 

302 heappush(minheap, leaf) 

303 

304 # repeat the process until only one node remains 

305 while len(minheap) > 1: 

306 # take the two nodes with lowest frequencies from the queue 

307 # to construct a new node and push it onto the queue 

308 parent = Node() 

309 parent.child = heappop(minheap), heappop(minheap) 

310 parent.freq = parent.child[0].freq + parent.child[1].freq 

311 heappush(minheap, parent) 

312 

313 # the single remaining node is the root of the Huffman tree 

314 return minheap[0] 

315 

316 

317def huffman_code(__freq_map, endian=None): 

318 """huffman_code(dict, /, endian=None) -> dict 

319 

320Given a frequency map, a dictionary mapping symbols to their frequency, 

321calculate the Huffman code, i.e. a dict mapping those symbols to 

322bitarrays (with given endianness). Note that the symbols are not limited 

323to being strings. Symbols may may be any hashable object (such as `None`). 

324""" 

325 if not isinstance(__freq_map, dict): 

326 raise TypeError("dict expected, got '%s'" % type(__freq_map).__name__) 

327 

328 b0 = bitarray('0', endian) 

329 b1 = bitarray('1', endian) 

330 

331 if len(__freq_map) < 2: 

332 if len(__freq_map) == 0: 

333 raise ValueError("cannot create Huffman code with no symbols") 

334 # Only one symbol: Normally if only one symbol is given, the code 

335 # could be represented with zero bits. However here, the code should 

336 # be at least one bit for the .encode() and .decode() methods to work. 

337 # So we represent the symbol by a single code of length one, in 

338 # particular one 0 bit. This is an incomplete code, since if a 1 bit 

339 # is received, it has no meaning and will result in an error. 

340 return {list(__freq_map)[0]: b0} 

341 

342 result = {} 

343 

344 def traverse(nd, prefix=bitarray(0, endian)): 

345 try: # leaf 

346 result[nd.symbol] = prefix 

347 except AttributeError: # parent, so traverse each of the children 

348 traverse(nd.child[0], prefix + b0) 

349 traverse(nd.child[1], prefix + b1) 

350 

351 traverse(_huffman_tree(__freq_map)) 

352 return result 

353 

354 

355def canonical_huffman(__freq_map): 

356 """canonical_huffman(dict, /) -> tuple 

357 

358Given a frequency map, a dictionary mapping symbols to their frequency, 

359calculate the canonical Huffman code. Returns a tuple containing: 

360 

3610. the canonical Huffman code as a dict mapping symbols to bitarrays 

3621. a list containing the number of symbols of each code length 

3632. a list of symbols in canonical order 

364 

365Note: the two lists may be used as input for `canonical_decode()`. 

366""" 

367 if not isinstance(__freq_map, dict): 

368 raise TypeError("dict expected, got '%s'" % type(__freq_map).__name__) 

369 

370 if len(__freq_map) < 2: 

371 if len(__freq_map) == 0: 

372 raise ValueError("cannot create Huffman code with no symbols") 

373 # Only one symbol: see note above in huffman_code() 

374 sym = list(__freq_map)[0] 

375 return {sym: bitarray('0', 'big')}, [0, 1], [sym] 

376 

377 code_length = {} # map symbols to their code length 

378 

379 def traverse(nd, length=0): 

380 # traverse the Huffman tree, but (unlike in huffman_code() above) we 

381 # now just simply record the length for reaching each symbol 

382 try: # leaf 

383 code_length[nd.symbol] = length 

384 except AttributeError: # parent, so traverse each of the children 

385 traverse(nd.child[0], length + 1) 

386 traverse(nd.child[1], length + 1) 

387 

388 traverse(_huffman_tree(__freq_map)) 

389 

390 # we now have a mapping of symbols to their code length, 

391 # which is all we need 

392 

393 table = sorted(code_length.items(), key=lambda item: (item[1], item[0])) 

394 

395 maxbits = max(item[1] for item in table) 

396 codedict = {} 

397 count = (maxbits + 1) * [0] 

398 

399 code = 0 

400 for i, (sym, length) in enumerate(table): 

401 codedict[sym] = int2ba(code, length, 'big') 

402 count[length] += 1 

403 if i + 1 < len(table): 

404 code += 1 

405 code <<= table[i + 1][1] - length 

406 

407 return codedict, count, [item[0] for item in table]