Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/Crypto/Random/Fortuna/FortunaAccumulator.py: 91%
75 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:03 +0000
1# -*- coding: ascii -*-
2#
3# FortunaAccumulator.py : Fortuna's internal accumulator
4#
5# Written in 2008 by Dwayne C. Litzenberger <dlitz@dlitz.net>
6#
7# ===================================================================
8# The contents of this file are dedicated to the public domain. To
9# the extent that dedication to the public domain is not available,
10# everyone is granted a worldwide, perpetual, royalty-free,
11# non-exclusive license to exercise all rights associated with the
12# contents of this file for any purpose whatsoever.
13# No rights are reserved.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22# SOFTWARE.
23# ===================================================================
25__revision__ = "$Id$"
27import sys
28if sys.version_info[0] == 2 and sys.version_info[1] == 1:
29 from Crypto.Util.py21compat import *
30from Crypto.Util.py3compat import *
32from binascii import b2a_hex
33import time
34import warnings
36from Crypto.pct_warnings import ClockRewindWarning
37from . import SHAd256
39# If the system has monotonic time, we'll use it.
40from Crypto.Util._time import maybe_monotonic_time
42from . import FortunaGenerator
44class FortunaPool(object):
45 """Fortuna pool type
47 This object acts like a hash object, with the following differences:
49 - It keeps a count (the .length attribute) of the number of bytes that
50 have been added to the pool
51 - It supports a .reset() method for in-place reinitialization
52 - The method to add bytes to the pool is .append(), not .update().
53 """
55 digest_size = SHAd256.digest_size
57 def __init__(self):
58 self.reset()
60 def append(self, data):
61 self._h.update(data)
62 self.length += len(data)
64 def digest(self):
65 return self._h.digest()
67 def hexdigest(self):
68 if sys.version_info[0] == 2:
69 return b2a_hex(self.digest())
70 else:
71 return b2a_hex(self.digest()).decode()
73 def reset(self):
74 self._h = SHAd256.new()
75 self.length = 0
77def which_pools(r):
78 """Return a list of pools indexes (in range(32)) that are to be included during reseed number r.
80 According to _Practical Cryptography_, chapter 10.5.2 "Pools":
82 "Pool P_i is included if 2**i is a divisor of r. Thus P_0 is used
83 every reseed, P_1 every other reseed, P_2 every fourth reseed, etc."
84 """
85 # This is a separate function so that it can be unit-tested.
86 assert r >= 1
87 retval = []
88 mask = 0
89 for i in range(32):
90 # "Pool P_i is included if 2**i is a divisor of [reseed_count]"
91 if (r & mask) == 0:
92 retval.append(i)
93 else:
94 break # optimization. once this fails, it always fails
95 mask = (mask << 1) | 1
96 return retval
98class FortunaAccumulator(object):
100 # An estimate of how many bytes we must append to pool 0 before it will
101 # contain 128 bits of entropy (with respect to an attack). We reseed the
102 # generator only after pool 0 contains `min_pool_size` bytes. Note that
103 # unlike with some other PRNGs, Fortuna's security does not rely on the
104 # accuracy of this estimate---we can accord to be optimistic here.
105 min_pool_size = 64 # size in bytes
107 # If an attacker can predict some (but not all) of our entropy sources, the
108 # `min_pool_size` check may not be sufficient to prevent a successful state
109 # compromise extension attack. To resist this attack, Fortuna spreads the
110 # input across 32 pools, which are then consumed (to reseed the output
111 # generator) with exponentially decreasing frequency.
112 #
113 # In order to prevent an attacker from gaining knowledge of all 32 pools
114 # before we have a chance to fill them with enough information that the
115 # attacker cannot predict, we impose a rate limit of 10 reseeds/second (one
116 # per 100 ms). This ensures that a hypothetical 33rd pool would only be
117 # needed after a minimum of 13 years of sustained attack.
118 reseed_interval = 0.100 # time in seconds
120 def __init__(self):
121 self.reseed_count = 0
122 self.generator = FortunaGenerator.AESGenerator()
123 self.last_reseed = None
125 # Initialize 32 FortunaPool instances.
126 # NB: This is _not_ equivalent to [FortunaPool()]*32, which would give
127 # us 32 references to the _same_ FortunaPool instance (and cause the
128 # assertion below to fail).
129 self.pools = [FortunaPool() for i in range(32)] # 32 pools
130 assert(self.pools[0] is not self.pools[1])
132 def _forget_last_reseed(self):
133 # This is not part of the standard Fortuna definition, and using this
134 # function frequently can weaken Fortuna's ability to resist a state
135 # compromise extension attack, but we need this in order to properly
136 # implement Crypto.Random.atfork(). Otherwise, forked child processes
137 # might continue to use their parent's PRNG state for up to 100ms in
138 # some cases. (e.g. CVE-2013-1445)
139 self.last_reseed = None
141 def random_data(self, bytes):
142 current_time = maybe_monotonic_time()
143 if (self.last_reseed is not None and self.last_reseed > current_time): # Avoid float comparison to None to make Py3k happy
144 warnings.warn("Clock rewind detected. Resetting last_reseed.", ClockRewindWarning)
145 self.last_reseed = None
146 if (self.pools[0].length >= self.min_pool_size and
147 (self.last_reseed is None or
148 current_time > self.last_reseed + self.reseed_interval)):
149 self._reseed(current_time)
150 # The following should fail if we haven't seeded the pool yet.
151 return self.generator.pseudo_random_data(bytes)
153 def _reseed(self, current_time=None):
154 if current_time is None:
155 current_time = maybe_monotonic_time()
156 seed = []
157 self.reseed_count += 1
158 self.last_reseed = current_time
159 for i in which_pools(self.reseed_count):
160 seed.append(self.pools[i].digest())
161 self.pools[i].reset()
163 seed = b("").join(seed)
164 self.generator.reseed(seed)
166 def add_random_event(self, source_number, pool_number, data):
167 assert 1 <= len(data) <= 32
168 assert 0 <= source_number <= 255
169 assert 0 <= pool_number <= 31
170 self.pools[pool_number].append(bchr(source_number))
171 self.pools[pool_number].append(bchr(len(data)))
172 self.pools[pool_number].append(data)
174# vim:set ts=4 sw=4 sts=4 expandtab: