Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/bandwidth.py: 30%
151 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import threading
14import time
17class RequestExceededException(Exception):
18 def __init__(self, requested_amt, retry_time):
19 """Error when requested amount exceeds what is allowed
21 The request that raised this error should be retried after waiting
22 the time specified by ``retry_time``.
24 :type requested_amt: int
25 :param requested_amt: The originally requested byte amount
27 :type retry_time: float
28 :param retry_time: The length in time to wait to retry for the
29 requested amount
30 """
31 self.requested_amt = requested_amt
32 self.retry_time = retry_time
33 msg = 'Request amount {} exceeded the amount available. Retry in {}'.format(
34 requested_amt, retry_time
35 )
36 super().__init__(msg)
39class RequestToken:
40 """A token to pass as an identifier when consuming from the LeakyBucket"""
42 pass
45class TimeUtils:
46 def time(self):
47 """Get the current time back
49 :rtype: float
50 :returns: The current time in seconds
51 """
52 return time.time()
54 def sleep(self, value):
55 """Sleep for a designated time
57 :type value: float
58 :param value: The time to sleep for in seconds
59 """
60 return time.sleep(value)
63class BandwidthLimiter:
64 def __init__(self, leaky_bucket, time_utils=None):
65 """Limits bandwidth for shared S3 transfers
67 :type leaky_bucket: LeakyBucket
68 :param leaky_bucket: The leaky bucket to use limit bandwidth
70 :type time_utils: TimeUtils
71 :param time_utils: Time utility to use for interacting with time.
72 """
73 self._leaky_bucket = leaky_bucket
74 self._time_utils = time_utils
75 if time_utils is None:
76 self._time_utils = TimeUtils()
78 def get_bandwith_limited_stream(
79 self, fileobj, transfer_coordinator, enabled=True
80 ):
81 """Wraps a fileobj in a bandwidth limited stream wrapper
83 :type fileobj: file-like obj
84 :param fileobj: The file-like obj to wrap
86 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
87 param transfer_coordinator: The coordinator for the general transfer
88 that the wrapped stream is a part of
90 :type enabled: boolean
91 :param enabled: Whether bandwidth limiting should be enabled to start
92 """
93 stream = BandwidthLimitedStream(
94 fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils
95 )
96 if not enabled:
97 stream.disable_bandwidth_limiting()
98 return stream
101class BandwidthLimitedStream:
102 def __init__(
103 self,
104 fileobj,
105 leaky_bucket,
106 transfer_coordinator,
107 time_utils=None,
108 bytes_threshold=256 * 1024,
109 ):
110 """Limits bandwidth for reads on a wrapped stream
112 :type fileobj: file-like object
113 :param fileobj: The file like object to wrap
115 :type leaky_bucket: LeakyBucket
116 :param leaky_bucket: The leaky bucket to use to throttle reads on
117 the stream
119 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
120 param transfer_coordinator: The coordinator for the general transfer
121 that the wrapped stream is a part of
123 :type time_utils: TimeUtils
124 :param time_utils: The time utility to use for interacting with time
125 """
126 self._fileobj = fileobj
127 self._leaky_bucket = leaky_bucket
128 self._transfer_coordinator = transfer_coordinator
129 self._time_utils = time_utils
130 if time_utils is None:
131 self._time_utils = TimeUtils()
132 self._bandwidth_limiting_enabled = True
133 self._request_token = RequestToken()
134 self._bytes_seen = 0
135 self._bytes_threshold = bytes_threshold
137 def enable_bandwidth_limiting(self):
138 """Enable bandwidth limiting on reads to the stream"""
139 self._bandwidth_limiting_enabled = True
141 def disable_bandwidth_limiting(self):
142 """Disable bandwidth limiting on reads to the stream"""
143 self._bandwidth_limiting_enabled = False
145 def read(self, amount):
146 """Read a specified amount
148 Reads will only be throttled if bandwidth limiting is enabled.
149 """
150 if not self._bandwidth_limiting_enabled:
151 return self._fileobj.read(amount)
153 # We do not want to be calling consume on every read as the read
154 # amounts can be small causing the lock of the leaky bucket to
155 # introduce noticeable overhead. So instead we keep track of
156 # how many bytes we have seen and only call consume once we pass a
157 # certain threshold.
158 self._bytes_seen += amount
159 if self._bytes_seen < self._bytes_threshold:
160 return self._fileobj.read(amount)
162 self._consume_through_leaky_bucket()
163 return self._fileobj.read(amount)
165 def _consume_through_leaky_bucket(self):
166 # NOTE: If the read amount on the stream are high, it will result
167 # in large bursty behavior as there is not an interface for partial
168 # reads. However given the read's on this abstraction are at most 256KB
169 # (via downloads), it reduces the burstiness to be small KB bursts at
170 # worst.
171 while not self._transfer_coordinator.exception:
172 try:
173 self._leaky_bucket.consume(
174 self._bytes_seen, self._request_token
175 )
176 self._bytes_seen = 0
177 return
178 except RequestExceededException as e:
179 self._time_utils.sleep(e.retry_time)
180 else:
181 raise self._transfer_coordinator.exception
183 def signal_transferring(self):
184 """Signal that data being read is being transferred to S3"""
185 self.enable_bandwidth_limiting()
187 def signal_not_transferring(self):
188 """Signal that data being read is not being transferred to S3"""
189 self.disable_bandwidth_limiting()
191 def seek(self, where, whence=0):
192 self._fileobj.seek(where, whence)
194 def tell(self):
195 return self._fileobj.tell()
197 def close(self):
198 if self._bandwidth_limiting_enabled and self._bytes_seen:
199 # This handles the case where the file is small enough to never
200 # trigger the threshold and thus is never subjugated to the
201 # leaky bucket on read(). This specifically happens for small
202 # uploads. So instead to account for those bytes, have
203 # it go through the leaky bucket when the file gets closed.
204 self._consume_through_leaky_bucket()
205 self._fileobj.close()
207 def __enter__(self):
208 return self
210 def __exit__(self, *args, **kwargs):
211 self.close()
214class LeakyBucket:
215 def __init__(
216 self,
217 max_rate,
218 time_utils=None,
219 rate_tracker=None,
220 consumption_scheduler=None,
221 ):
222 """A leaky bucket abstraction to limit bandwidth consumption
224 :type rate: int
225 :type rate: The maximum rate to allow. This rate is in terms of
226 bytes per second.
228 :type time_utils: TimeUtils
229 :param time_utils: The time utility to use for interacting with time
231 :type rate_tracker: BandwidthRateTracker
232 :param rate_tracker: Tracks bandwidth consumption
234 :type consumption_scheduler: ConsumptionScheduler
235 :param consumption_scheduler: Schedules consumption retries when
236 necessary
237 """
238 self._max_rate = float(max_rate)
239 self._time_utils = time_utils
240 if time_utils is None:
241 self._time_utils = TimeUtils()
242 self._lock = threading.Lock()
243 self._rate_tracker = rate_tracker
244 if rate_tracker is None:
245 self._rate_tracker = BandwidthRateTracker()
246 self._consumption_scheduler = consumption_scheduler
247 if consumption_scheduler is None:
248 self._consumption_scheduler = ConsumptionScheduler()
250 def consume(self, amt, request_token):
251 """Consume an a requested amount
253 :type amt: int
254 :param amt: The amount of bytes to request to consume
256 :type request_token: RequestToken
257 :param request_token: The token associated to the consumption
258 request that is used to identify the request. So if a
259 RequestExceededException is raised the token should be used
260 in subsequent retry consume() request.
262 :raises RequestExceededException: If the consumption amount would
263 exceed the maximum allocated bandwidth
265 :rtype: int
266 :returns: The amount consumed
267 """
268 with self._lock:
269 time_now = self._time_utils.time()
270 if self._consumption_scheduler.is_scheduled(request_token):
271 return self._release_requested_amt_for_scheduled_request(
272 amt, request_token, time_now
273 )
274 elif self._projected_to_exceed_max_rate(amt, time_now):
275 self._raise_request_exceeded_exception(
276 amt, request_token, time_now
277 )
278 else:
279 return self._release_requested_amt(amt, time_now)
281 def _projected_to_exceed_max_rate(self, amt, time_now):
282 projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
283 return projected_rate > self._max_rate
285 def _release_requested_amt_for_scheduled_request(
286 self, amt, request_token, time_now
287 ):
288 self._consumption_scheduler.process_scheduled_consumption(
289 request_token
290 )
291 return self._release_requested_amt(amt, time_now)
293 def _raise_request_exceeded_exception(self, amt, request_token, time_now):
294 allocated_time = amt / float(self._max_rate)
295 retry_time = self._consumption_scheduler.schedule_consumption(
296 amt, request_token, allocated_time
297 )
298 raise RequestExceededException(
299 requested_amt=amt, retry_time=retry_time
300 )
302 def _release_requested_amt(self, amt, time_now):
303 self._rate_tracker.record_consumption_rate(amt, time_now)
304 return amt
307class ConsumptionScheduler:
308 def __init__(self):
309 """Schedules when to consume a desired amount"""
310 self._tokens_to_scheduled_consumption = {}
311 self._total_wait = 0
313 def is_scheduled(self, token):
314 """Indicates if a consumption request has been scheduled
316 :type token: RequestToken
317 :param token: The token associated to the consumption
318 request that is used to identify the request.
319 """
320 return token in self._tokens_to_scheduled_consumption
322 def schedule_consumption(self, amt, token, time_to_consume):
323 """Schedules a wait time to be able to consume an amount
325 :type amt: int
326 :param amt: The amount of bytes scheduled to be consumed
328 :type token: RequestToken
329 :param token: The token associated to the consumption
330 request that is used to identify the request.
332 :type time_to_consume: float
333 :param time_to_consume: The desired time it should take for that
334 specific request amount to be consumed in regardless of previously
335 scheduled consumption requests
337 :rtype: float
338 :returns: The amount of time to wait for the specific request before
339 actually consuming the specified amount.
340 """
341 self._total_wait += time_to_consume
342 self._tokens_to_scheduled_consumption[token] = {
343 'wait_duration': self._total_wait,
344 'time_to_consume': time_to_consume,
345 }
346 return self._total_wait
348 def process_scheduled_consumption(self, token):
349 """Processes a scheduled consumption request that has completed
351 :type token: RequestToken
352 :param token: The token associated to the consumption
353 request that is used to identify the request.
354 """
355 scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
356 self._total_wait = max(
357 self._total_wait - scheduled_retry['time_to_consume'], 0
358 )
361class BandwidthRateTracker:
362 def __init__(self, alpha=0.8):
363 """Tracks the rate of bandwidth consumption
365 :type a: float
366 :param a: The constant to use in calculating the exponentional moving
367 average of the bandwidth rate. Specifically it is used in the
368 following calculation:
370 current_rate = alpha * new_rate + (1 - alpha) * current_rate
372 This value of this constant should be between 0 and 1.
373 """
374 self._alpha = alpha
375 self._last_time = None
376 self._current_rate = None
378 @property
379 def current_rate(self):
380 """The current transfer rate
382 :rtype: float
383 :returns: The current tracked transfer rate
384 """
385 if self._last_time is None:
386 return 0.0
387 return self._current_rate
389 def get_projected_rate(self, amt, time_at_consumption):
390 """Get the projected rate using a provided amount and time
392 :type amt: int
393 :param amt: The proposed amount to consume
395 :type time_at_consumption: float
396 :param time_at_consumption: The proposed time to consume at
398 :rtype: float
399 :returns: The consumption rate if that amt and time were consumed
400 """
401 if self._last_time is None:
402 return 0.0
403 return self._calculate_exponential_moving_average_rate(
404 amt, time_at_consumption
405 )
407 def record_consumption_rate(self, amt, time_at_consumption):
408 """Record the consumption rate based off amount and time point
410 :type amt: int
411 :param amt: The amount that got consumed
413 :type time_at_consumption: float
414 :param time_at_consumption: The time at which the amount was consumed
415 """
416 if self._last_time is None:
417 self._last_time = time_at_consumption
418 self._current_rate = 0.0
419 return
420 self._current_rate = self._calculate_exponential_moving_average_rate(
421 amt, time_at_consumption
422 )
423 self._last_time = time_at_consumption
425 def _calculate_rate(self, amt, time_at_consumption):
426 time_delta = time_at_consumption - self._last_time
427 if time_delta <= 0:
428 # While it is really unlikely to see this in an actual transfer,
429 # we do not want to be returning back a negative rate or try to
430 # divide the amount by zero. So instead return back an infinite
431 # rate as the time delta is infinitesimally small.
432 return float('inf')
433 return amt / (time_delta)
435 def _calculate_exponential_moving_average_rate(
436 self, amt, time_at_consumption
437 ):
438 new_rate = self._calculate_rate(amt, time_at_consumption)
439 return self._alpha * new_rate + (1 - self._alpha) * self._current_rate