Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/entropy.py: 41%
80 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2009-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18import hashlib
19import os
20import random
21import threading
22import time
23from typing import Any, Optional
26class EntropyPool:
27 # This is an entropy pool for Python implementations that do not
28 # have a working SystemRandom. I'm not sure there are any, but
29 # leaving this code doesn't hurt anything as the library code
30 # is used if present.
32 def __init__(self, seed: Optional[bytes] = None):
33 self.pool_index = 0
34 self.digest: Optional[bytearray] = None
35 self.next_byte = 0
36 self.lock = threading.Lock()
37 self.hash = hashlib.sha1()
38 self.hash_len = 20
39 self.pool = bytearray(b"\0" * self.hash_len)
40 if seed is not None:
41 self._stir(seed)
42 self.seeded = True
43 self.seed_pid = os.getpid()
44 else:
45 self.seeded = False
46 self.seed_pid = 0
48 def _stir(self, entropy: bytes) -> None:
49 for c in entropy:
50 if self.pool_index == self.hash_len:
51 self.pool_index = 0
52 b = c & 0xFF
53 self.pool[self.pool_index] ^= b
54 self.pool_index += 1
56 def stir(self, entropy: bytes) -> None:
57 with self.lock:
58 self._stir(entropy)
60 def _maybe_seed(self) -> None:
61 if not self.seeded or self.seed_pid != os.getpid():
62 try:
63 seed = os.urandom(16)
64 except Exception: # pragma: no cover
65 try:
66 with open("/dev/urandom", "rb", 0) as r:
67 seed = r.read(16)
68 except Exception:
69 seed = str(time.time()).encode()
70 self.seeded = True
71 self.seed_pid = os.getpid()
72 self.digest = None
73 seed = bytearray(seed)
74 self._stir(seed)
76 def random_8(self) -> int:
77 with self.lock:
78 self._maybe_seed()
79 if self.digest is None or self.next_byte == self.hash_len:
80 self.hash.update(bytes(self.pool))
81 self.digest = bytearray(self.hash.digest())
82 self._stir(self.digest)
83 self.next_byte = 0
84 value = self.digest[self.next_byte]
85 self.next_byte += 1
86 return value
88 def random_16(self) -> int:
89 return self.random_8() * 256 + self.random_8()
91 def random_32(self) -> int:
92 return self.random_16() * 65536 + self.random_16()
94 def random_between(self, first: int, last: int) -> int:
95 size = last - first + 1
96 if size > 4294967296:
97 raise ValueError("too big")
98 if size > 65536:
99 rand = self.random_32
100 max = 4294967295
101 elif size > 256:
102 rand = self.random_16
103 max = 65535
104 else:
105 rand = self.random_8
106 max = 255
107 return first + size * rand() // (max + 1)
110pool = EntropyPool()
112system_random: Optional[Any]
113try:
114 system_random = random.SystemRandom()
115except Exception: # pragma: no cover
116 system_random = None
119def random_16() -> int:
120 if system_random is not None:
121 return system_random.randrange(0, 65536)
122 else:
123 return pool.random_16()
126def between(first: int, last: int) -> int:
127 if system_random is not None:
128 return system_random.randrange(first, last + 1)
129 else:
130 return pool.random_between(first, last)