1# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import threading
14import time
15
16
17class RequestExceededException(Exception):
18 def __init__(self, requested_amt, retry_time):
19 """Error when requested amount exceeds what is allowed
20
21 The request that raised this error should be retried after waiting
22 the time specified by ``retry_time``.
23
24 :type requested_amt: int
25 :param requested_amt: The originally requested byte amount
26
27 :type retry_time: float
28 :param retry_time: The length in time to wait to retry for the
29 requested amount
30 """
31 self.requested_amt = requested_amt
32 self.retry_time = retry_time
33 msg = f'Request amount {requested_amt} exceeded the amount available. Retry in {retry_time}'
34 super().__init__(msg)
35
36
37class RequestToken:
38 """A token to pass as an identifier when consuming from the LeakyBucket"""
39
40 pass
41
42
43class TimeUtils:
44 def time(self):
45 """Get the current time back
46
47 :rtype: float
48 :returns: The current time in seconds
49 """
50 return time.time()
51
52 def sleep(self, value):
53 """Sleep for a designated time
54
55 :type value: float
56 :param value: The time to sleep for in seconds
57 """
58 return time.sleep(value)
59
60
61class BandwidthLimiter:
62 def __init__(self, leaky_bucket, time_utils=None):
63 """Limits bandwidth for shared S3 transfers
64
65 :type leaky_bucket: LeakyBucket
66 :param leaky_bucket: The leaky bucket to use limit bandwidth
67
68 :type time_utils: TimeUtils
69 :param time_utils: Time utility to use for interacting with time.
70 """
71 self._leaky_bucket = leaky_bucket
72 self._time_utils = time_utils
73 if time_utils is None:
74 self._time_utils = TimeUtils()
75
76 def get_bandwith_limited_stream(
77 self, fileobj, transfer_coordinator, enabled=True
78 ):
79 """Wraps a fileobj in a bandwidth limited stream wrapper
80
81 :type fileobj: file-like obj
82 :param fileobj: The file-like obj to wrap
83
84 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
85 param transfer_coordinator: The coordinator for the general transfer
86 that the wrapped stream is a part of
87
88 :type enabled: boolean
89 :param enabled: Whether bandwidth limiting should be enabled to start
90 """
91 stream = BandwidthLimitedStream(
92 fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils
93 )
94 if not enabled:
95 stream.disable_bandwidth_limiting()
96 return stream
97
98
99class BandwidthLimitedStream:
100 def __init__(
101 self,
102 fileobj,
103 leaky_bucket,
104 transfer_coordinator,
105 time_utils=None,
106 bytes_threshold=256 * 1024,
107 ):
108 """Limits bandwidth for reads on a wrapped stream
109
110 :type fileobj: file-like object
111 :param fileobj: The file like object to wrap
112
113 :type leaky_bucket: LeakyBucket
114 :param leaky_bucket: The leaky bucket to use to throttle reads on
115 the stream
116
117 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
118 param transfer_coordinator: The coordinator for the general transfer
119 that the wrapped stream is a part of
120
121 :type time_utils: TimeUtils
122 :param time_utils: The time utility to use for interacting with time
123 """
124 self._fileobj = fileobj
125 self._leaky_bucket = leaky_bucket
126 self._transfer_coordinator = transfer_coordinator
127 self._time_utils = time_utils
128 if time_utils is None:
129 self._time_utils = TimeUtils()
130 self._bandwidth_limiting_enabled = True
131 self._request_token = RequestToken()
132 self._bytes_seen = 0
133 self._bytes_threshold = bytes_threshold
134
135 def enable_bandwidth_limiting(self):
136 """Enable bandwidth limiting on reads to the stream"""
137 self._bandwidth_limiting_enabled = True
138
139 def disable_bandwidth_limiting(self):
140 """Disable bandwidth limiting on reads to the stream"""
141 self._bandwidth_limiting_enabled = False
142
143 def read(self, amount):
144 """Read a specified amount
145
146 Reads will only be throttled if bandwidth limiting is enabled.
147 """
148 if not self._bandwidth_limiting_enabled:
149 return self._fileobj.read(amount)
150
151 # We do not want to be calling consume on every read as the read
152 # amounts can be small causing the lock of the leaky bucket to
153 # introduce noticeable overhead. So instead we keep track of
154 # how many bytes we have seen and only call consume once we pass a
155 # certain threshold.
156 self._bytes_seen += amount
157 if self._bytes_seen < self._bytes_threshold:
158 return self._fileobj.read(amount)
159
160 self._consume_through_leaky_bucket()
161 return self._fileobj.read(amount)
162
163 def _consume_through_leaky_bucket(self):
164 # NOTE: If the read amount on the stream are high, it will result
165 # in large bursty behavior as there is not an interface for partial
166 # reads. However given the read's on this abstraction are at most 256KB
167 # (via downloads), it reduces the burstiness to be small KB bursts at
168 # worst.
169 while not self._transfer_coordinator.exception:
170 try:
171 self._leaky_bucket.consume(
172 self._bytes_seen, self._request_token
173 )
174 self._bytes_seen = 0
175 return
176 except RequestExceededException as e:
177 self._time_utils.sleep(e.retry_time)
178 else:
179 raise self._transfer_coordinator.exception
180
181 def signal_transferring(self):
182 """Signal that data being read is being transferred to S3"""
183 self.enable_bandwidth_limiting()
184
185 def signal_not_transferring(self):
186 """Signal that data being read is not being transferred to S3"""
187 self.disable_bandwidth_limiting()
188
189 def seek(self, where, whence=0):
190 self._fileobj.seek(where, whence)
191
192 def tell(self):
193 return self._fileobj.tell()
194
195 def close(self):
196 if self._bandwidth_limiting_enabled and self._bytes_seen:
197 # This handles the case where the file is small enough to never
198 # trigger the threshold and thus is never subjugated to the
199 # leaky bucket on read(). This specifically happens for small
200 # uploads. So instead to account for those bytes, have
201 # it go through the leaky bucket when the file gets closed.
202 self._consume_through_leaky_bucket()
203 self._fileobj.close()
204
205 def __enter__(self):
206 return self
207
208 def __exit__(self, *args, **kwargs):
209 self.close()
210
211
212class LeakyBucket:
213 def __init__(
214 self,
215 max_rate,
216 time_utils=None,
217 rate_tracker=None,
218 consumption_scheduler=None,
219 ):
220 """A leaky bucket abstraction to limit bandwidth consumption
221
222 :type rate: int
223 :type rate: The maximum rate to allow. This rate is in terms of
224 bytes per second.
225
226 :type time_utils: TimeUtils
227 :param time_utils: The time utility to use for interacting with time
228
229 :type rate_tracker: BandwidthRateTracker
230 :param rate_tracker: Tracks bandwidth consumption
231
232 :type consumption_scheduler: ConsumptionScheduler
233 :param consumption_scheduler: Schedules consumption retries when
234 necessary
235 """
236 self._max_rate = float(max_rate)
237 self._time_utils = time_utils
238 if time_utils is None:
239 self._time_utils = TimeUtils()
240 self._lock = threading.Lock()
241 self._rate_tracker = rate_tracker
242 if rate_tracker is None:
243 self._rate_tracker = BandwidthRateTracker()
244 self._consumption_scheduler = consumption_scheduler
245 if consumption_scheduler is None:
246 self._consumption_scheduler = ConsumptionScheduler()
247
248 def consume(self, amt, request_token):
249 """Consume an a requested amount
250
251 :type amt: int
252 :param amt: The amount of bytes to request to consume
253
254 :type request_token: RequestToken
255 :param request_token: The token associated to the consumption
256 request that is used to identify the request. So if a
257 RequestExceededException is raised the token should be used
258 in subsequent retry consume() request.
259
260 :raises RequestExceededException: If the consumption amount would
261 exceed the maximum allocated bandwidth
262
263 :rtype: int
264 :returns: The amount consumed
265 """
266 with self._lock:
267 time_now = self._time_utils.time()
268 if self._consumption_scheduler.is_scheduled(request_token):
269 return self._release_requested_amt_for_scheduled_request(
270 amt, request_token, time_now
271 )
272 elif self._projected_to_exceed_max_rate(amt, time_now):
273 self._raise_request_exceeded_exception(
274 amt, request_token, time_now
275 )
276 else:
277 return self._release_requested_amt(amt, time_now)
278
279 def _projected_to_exceed_max_rate(self, amt, time_now):
280 projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
281 return projected_rate > self._max_rate
282
283 def _release_requested_amt_for_scheduled_request(
284 self, amt, request_token, time_now
285 ):
286 self._consumption_scheduler.process_scheduled_consumption(
287 request_token
288 )
289 return self._release_requested_amt(amt, time_now)
290
291 def _raise_request_exceeded_exception(self, amt, request_token, time_now):
292 allocated_time = amt / float(self._max_rate)
293 retry_time = self._consumption_scheduler.schedule_consumption(
294 amt, request_token, allocated_time
295 )
296 raise RequestExceededException(
297 requested_amt=amt, retry_time=retry_time
298 )
299
300 def _release_requested_amt(self, amt, time_now):
301 self._rate_tracker.record_consumption_rate(amt, time_now)
302 return amt
303
304
305class ConsumptionScheduler:
306 def __init__(self):
307 """Schedules when to consume a desired amount"""
308 self._tokens_to_scheduled_consumption = {}
309 self._total_wait = 0
310
311 def is_scheduled(self, token):
312 """Indicates if a consumption request has been scheduled
313
314 :type token: RequestToken
315 :param token: The token associated to the consumption
316 request that is used to identify the request.
317 """
318 return token in self._tokens_to_scheduled_consumption
319
320 def schedule_consumption(self, amt, token, time_to_consume):
321 """Schedules a wait time to be able to consume an amount
322
323 :type amt: int
324 :param amt: The amount of bytes scheduled to be consumed
325
326 :type token: RequestToken
327 :param token: The token associated to the consumption
328 request that is used to identify the request.
329
330 :type time_to_consume: float
331 :param time_to_consume: The desired time it should take for that
332 specific request amount to be consumed in regardless of previously
333 scheduled consumption requests
334
335 :rtype: float
336 :returns: The amount of time to wait for the specific request before
337 actually consuming the specified amount.
338 """
339 self._total_wait += time_to_consume
340 self._tokens_to_scheduled_consumption[token] = {
341 'wait_duration': self._total_wait,
342 'time_to_consume': time_to_consume,
343 }
344 return self._total_wait
345
346 def process_scheduled_consumption(self, token):
347 """Processes a scheduled consumption request that has completed
348
349 :type token: RequestToken
350 :param token: The token associated to the consumption
351 request that is used to identify the request.
352 """
353 scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
354 self._total_wait = max(
355 self._total_wait - scheduled_retry['time_to_consume'], 0
356 )
357
358
359class BandwidthRateTracker:
360 def __init__(self, alpha=0.8):
361 """Tracks the rate of bandwidth consumption
362
363 :type a: float
364 :param a: The constant to use in calculating the exponentional moving
365 average of the bandwidth rate. Specifically it is used in the
366 following calculation:
367
368 current_rate = alpha * new_rate + (1 - alpha) * current_rate
369
370 This value of this constant should be between 0 and 1.
371 """
372 self._alpha = alpha
373 self._last_time = None
374 self._current_rate = None
375
376 @property
377 def current_rate(self):
378 """The current transfer rate
379
380 :rtype: float
381 :returns: The current tracked transfer rate
382 """
383 if self._last_time is None:
384 return 0.0
385 return self._current_rate
386
387 def get_projected_rate(self, amt, time_at_consumption):
388 """Get the projected rate using a provided amount and time
389
390 :type amt: int
391 :param amt: The proposed amount to consume
392
393 :type time_at_consumption: float
394 :param time_at_consumption: The proposed time to consume at
395
396 :rtype: float
397 :returns: The consumption rate if that amt and time were consumed
398 """
399 if self._last_time is None:
400 return 0.0
401 return self._calculate_exponential_moving_average_rate(
402 amt, time_at_consumption
403 )
404
405 def record_consumption_rate(self, amt, time_at_consumption):
406 """Record the consumption rate based off amount and time point
407
408 :type amt: int
409 :param amt: The amount that got consumed
410
411 :type time_at_consumption: float
412 :param time_at_consumption: The time at which the amount was consumed
413 """
414 if self._last_time is None:
415 self._last_time = time_at_consumption
416 self._current_rate = 0.0
417 return
418 self._current_rate = self._calculate_exponential_moving_average_rate(
419 amt, time_at_consumption
420 )
421 self._last_time = time_at_consumption
422
423 def _calculate_rate(self, amt, time_at_consumption):
424 time_delta = time_at_consumption - self._last_time
425 if time_delta <= 0:
426 # While it is really unlikely to see this in an actual transfer,
427 # we do not want to be returning back a negative rate or try to
428 # divide the amount by zero. So instead return back an infinite
429 # rate as the time delta is infinitesimally small.
430 return float('inf')
431 return amt / (time_delta)
432
433 def _calculate_exponential_moving_average_rate(
434 self, amt, time_at_consumption
435 ):
436 new_rate = self._calculate_rate(amt, time_at_consumption)
437 return self._alpha * new_rate + (1 - self._alpha) * self._current_rate