Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/bandwidth.py: 30%

151 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import threading 

14import time 

15 

16 

17class RequestExceededException(Exception): 

18 def __init__(self, requested_amt, retry_time): 

19 """Error when requested amount exceeds what is allowed 

20 

21 The request that raised this error should be retried after waiting 

22 the time specified by ``retry_time``. 

23 

24 :type requested_amt: int 

25 :param requested_amt: The originally requested byte amount 

26 

27 :type retry_time: float 

28 :param retry_time: The length in time to wait to retry for the 

29 requested amount 

30 """ 

31 self.requested_amt = requested_amt 

32 self.retry_time = retry_time 

33 msg = 'Request amount {} exceeded the amount available. Retry in {}'.format( 

34 requested_amt, retry_time 

35 ) 

36 super().__init__(msg) 

37 

38 

39class RequestToken: 

40 """A token to pass as an identifier when consuming from the LeakyBucket""" 

41 

42 pass 

43 

44 

45class TimeUtils: 

46 def time(self): 

47 """Get the current time back 

48 

49 :rtype: float 

50 :returns: The current time in seconds 

51 """ 

52 return time.time() 

53 

54 def sleep(self, value): 

55 """Sleep for a designated time 

56 

57 :type value: float 

58 :param value: The time to sleep for in seconds 

59 """ 

60 return time.sleep(value) 

61 

62 

63class BandwidthLimiter: 

64 def __init__(self, leaky_bucket, time_utils=None): 

65 """Limits bandwidth for shared S3 transfers 

66 

67 :type leaky_bucket: LeakyBucket 

68 :param leaky_bucket: The leaky bucket to use limit bandwidth 

69 

70 :type time_utils: TimeUtils 

71 :param time_utils: Time utility to use for interacting with time. 

72 """ 

73 self._leaky_bucket = leaky_bucket 

74 self._time_utils = time_utils 

75 if time_utils is None: 

76 self._time_utils = TimeUtils() 

77 

78 def get_bandwith_limited_stream( 

79 self, fileobj, transfer_coordinator, enabled=True 

80 ): 

81 """Wraps a fileobj in a bandwidth limited stream wrapper 

82 

83 :type fileobj: file-like obj 

84 :param fileobj: The file-like obj to wrap 

85 

86 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

87 param transfer_coordinator: The coordinator for the general transfer 

88 that the wrapped stream is a part of 

89 

90 :type enabled: boolean 

91 :param enabled: Whether bandwidth limiting should be enabled to start 

92 """ 

93 stream = BandwidthLimitedStream( 

94 fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils 

95 ) 

96 if not enabled: 

97 stream.disable_bandwidth_limiting() 

98 return stream 

99 

100 

101class BandwidthLimitedStream: 

102 def __init__( 

103 self, 

104 fileobj, 

105 leaky_bucket, 

106 transfer_coordinator, 

107 time_utils=None, 

108 bytes_threshold=256 * 1024, 

109 ): 

110 """Limits bandwidth for reads on a wrapped stream 

111 

112 :type fileobj: file-like object 

113 :param fileobj: The file like object to wrap 

114 

115 :type leaky_bucket: LeakyBucket 

116 :param leaky_bucket: The leaky bucket to use to throttle reads on 

117 the stream 

118 

119 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

120 param transfer_coordinator: The coordinator for the general transfer 

121 that the wrapped stream is a part of 

122 

123 :type time_utils: TimeUtils 

124 :param time_utils: The time utility to use for interacting with time 

125 """ 

126 self._fileobj = fileobj 

127 self._leaky_bucket = leaky_bucket 

128 self._transfer_coordinator = transfer_coordinator 

129 self._time_utils = time_utils 

130 if time_utils is None: 

131 self._time_utils = TimeUtils() 

132 self._bandwidth_limiting_enabled = True 

133 self._request_token = RequestToken() 

134 self._bytes_seen = 0 

135 self._bytes_threshold = bytes_threshold 

136 

137 def enable_bandwidth_limiting(self): 

138 """Enable bandwidth limiting on reads to the stream""" 

139 self._bandwidth_limiting_enabled = True 

140 

141 def disable_bandwidth_limiting(self): 

142 """Disable bandwidth limiting on reads to the stream""" 

143 self._bandwidth_limiting_enabled = False 

144 

145 def read(self, amount): 

146 """Read a specified amount 

147 

148 Reads will only be throttled if bandwidth limiting is enabled. 

149 """ 

150 if not self._bandwidth_limiting_enabled: 

151 return self._fileobj.read(amount) 

152 

153 # We do not want to be calling consume on every read as the read 

154 # amounts can be small causing the lock of the leaky bucket to 

155 # introduce noticeable overhead. So instead we keep track of 

156 # how many bytes we have seen and only call consume once we pass a 

157 # certain threshold. 

158 self._bytes_seen += amount 

159 if self._bytes_seen < self._bytes_threshold: 

160 return self._fileobj.read(amount) 

161 

162 self._consume_through_leaky_bucket() 

163 return self._fileobj.read(amount) 

164 

165 def _consume_through_leaky_bucket(self): 

166 # NOTE: If the read amount on the stream are high, it will result 

167 # in large bursty behavior as there is not an interface for partial 

168 # reads. However given the read's on this abstraction are at most 256KB 

169 # (via downloads), it reduces the burstiness to be small KB bursts at 

170 # worst. 

171 while not self._transfer_coordinator.exception: 

172 try: 

173 self._leaky_bucket.consume( 

174 self._bytes_seen, self._request_token 

175 ) 

176 self._bytes_seen = 0 

177 return 

178 except RequestExceededException as e: 

179 self._time_utils.sleep(e.retry_time) 

180 else: 

181 raise self._transfer_coordinator.exception 

182 

183 def signal_transferring(self): 

184 """Signal that data being read is being transferred to S3""" 

185 self.enable_bandwidth_limiting() 

186 

187 def signal_not_transferring(self): 

188 """Signal that data being read is not being transferred to S3""" 

189 self.disable_bandwidth_limiting() 

190 

191 def seek(self, where, whence=0): 

192 self._fileobj.seek(where, whence) 

193 

194 def tell(self): 

195 return self._fileobj.tell() 

196 

197 def close(self): 

198 if self._bandwidth_limiting_enabled and self._bytes_seen: 

199 # This handles the case where the file is small enough to never 

200 # trigger the threshold and thus is never subjugated to the 

201 # leaky bucket on read(). This specifically happens for small 

202 # uploads. So instead to account for those bytes, have 

203 # it go through the leaky bucket when the file gets closed. 

204 self._consume_through_leaky_bucket() 

205 self._fileobj.close() 

206 

207 def __enter__(self): 

208 return self 

209 

210 def __exit__(self, *args, **kwargs): 

211 self.close() 

212 

213 

214class LeakyBucket: 

215 def __init__( 

216 self, 

217 max_rate, 

218 time_utils=None, 

219 rate_tracker=None, 

220 consumption_scheduler=None, 

221 ): 

222 """A leaky bucket abstraction to limit bandwidth consumption 

223 

224 :type rate: int 

225 :type rate: The maximum rate to allow. This rate is in terms of 

226 bytes per second. 

227 

228 :type time_utils: TimeUtils 

229 :param time_utils: The time utility to use for interacting with time 

230 

231 :type rate_tracker: BandwidthRateTracker 

232 :param rate_tracker: Tracks bandwidth consumption 

233 

234 :type consumption_scheduler: ConsumptionScheduler 

235 :param consumption_scheduler: Schedules consumption retries when 

236 necessary 

237 """ 

238 self._max_rate = float(max_rate) 

239 self._time_utils = time_utils 

240 if time_utils is None: 

241 self._time_utils = TimeUtils() 

242 self._lock = threading.Lock() 

243 self._rate_tracker = rate_tracker 

244 if rate_tracker is None: 

245 self._rate_tracker = BandwidthRateTracker() 

246 self._consumption_scheduler = consumption_scheduler 

247 if consumption_scheduler is None: 

248 self._consumption_scheduler = ConsumptionScheduler() 

249 

250 def consume(self, amt, request_token): 

251 """Consume an a requested amount 

252 

253 :type amt: int 

254 :param amt: The amount of bytes to request to consume 

255 

256 :type request_token: RequestToken 

257 :param request_token: The token associated to the consumption 

258 request that is used to identify the request. So if a 

259 RequestExceededException is raised the token should be used 

260 in subsequent retry consume() request. 

261 

262 :raises RequestExceededException: If the consumption amount would 

263 exceed the maximum allocated bandwidth 

264 

265 :rtype: int 

266 :returns: The amount consumed 

267 """ 

268 with self._lock: 

269 time_now = self._time_utils.time() 

270 if self._consumption_scheduler.is_scheduled(request_token): 

271 return self._release_requested_amt_for_scheduled_request( 

272 amt, request_token, time_now 

273 ) 

274 elif self._projected_to_exceed_max_rate(amt, time_now): 

275 self._raise_request_exceeded_exception( 

276 amt, request_token, time_now 

277 ) 

278 else: 

279 return self._release_requested_amt(amt, time_now) 

280 

281 def _projected_to_exceed_max_rate(self, amt, time_now): 

282 projected_rate = self._rate_tracker.get_projected_rate(amt, time_now) 

283 return projected_rate > self._max_rate 

284 

285 def _release_requested_amt_for_scheduled_request( 

286 self, amt, request_token, time_now 

287 ): 

288 self._consumption_scheduler.process_scheduled_consumption( 

289 request_token 

290 ) 

291 return self._release_requested_amt(amt, time_now) 

292 

293 def _raise_request_exceeded_exception(self, amt, request_token, time_now): 

294 allocated_time = amt / float(self._max_rate) 

295 retry_time = self._consumption_scheduler.schedule_consumption( 

296 amt, request_token, allocated_time 

297 ) 

298 raise RequestExceededException( 

299 requested_amt=amt, retry_time=retry_time 

300 ) 

301 

302 def _release_requested_amt(self, amt, time_now): 

303 self._rate_tracker.record_consumption_rate(amt, time_now) 

304 return amt 

305 

306 

307class ConsumptionScheduler: 

308 def __init__(self): 

309 """Schedules when to consume a desired amount""" 

310 self._tokens_to_scheduled_consumption = {} 

311 self._total_wait = 0 

312 

313 def is_scheduled(self, token): 

314 """Indicates if a consumption request has been scheduled 

315 

316 :type token: RequestToken 

317 :param token: The token associated to the consumption 

318 request that is used to identify the request. 

319 """ 

320 return token in self._tokens_to_scheduled_consumption 

321 

322 def schedule_consumption(self, amt, token, time_to_consume): 

323 """Schedules a wait time to be able to consume an amount 

324 

325 :type amt: int 

326 :param amt: The amount of bytes scheduled to be consumed 

327 

328 :type token: RequestToken 

329 :param token: The token associated to the consumption 

330 request that is used to identify the request. 

331 

332 :type time_to_consume: float 

333 :param time_to_consume: The desired time it should take for that 

334 specific request amount to be consumed in regardless of previously 

335 scheduled consumption requests 

336 

337 :rtype: float 

338 :returns: The amount of time to wait for the specific request before 

339 actually consuming the specified amount. 

340 """ 

341 self._total_wait += time_to_consume 

342 self._tokens_to_scheduled_consumption[token] = { 

343 'wait_duration': self._total_wait, 

344 'time_to_consume': time_to_consume, 

345 } 

346 return self._total_wait 

347 

348 def process_scheduled_consumption(self, token): 

349 """Processes a scheduled consumption request that has completed 

350 

351 :type token: RequestToken 

352 :param token: The token associated to the consumption 

353 request that is used to identify the request. 

354 """ 

355 scheduled_retry = self._tokens_to_scheduled_consumption.pop(token) 

356 self._total_wait = max( 

357 self._total_wait - scheduled_retry['time_to_consume'], 0 

358 ) 

359 

360 

361class BandwidthRateTracker: 

362 def __init__(self, alpha=0.8): 

363 """Tracks the rate of bandwidth consumption 

364 

365 :type a: float 

366 :param a: The constant to use in calculating the exponentional moving 

367 average of the bandwidth rate. Specifically it is used in the 

368 following calculation: 

369 

370 current_rate = alpha * new_rate + (1 - alpha) * current_rate 

371 

372 This value of this constant should be between 0 and 1. 

373 """ 

374 self._alpha = alpha 

375 self._last_time = None 

376 self._current_rate = None 

377 

378 @property 

379 def current_rate(self): 

380 """The current transfer rate 

381 

382 :rtype: float 

383 :returns: The current tracked transfer rate 

384 """ 

385 if self._last_time is None: 

386 return 0.0 

387 return self._current_rate 

388 

389 def get_projected_rate(self, amt, time_at_consumption): 

390 """Get the projected rate using a provided amount and time 

391 

392 :type amt: int 

393 :param amt: The proposed amount to consume 

394 

395 :type time_at_consumption: float 

396 :param time_at_consumption: The proposed time to consume at 

397 

398 :rtype: float 

399 :returns: The consumption rate if that amt and time were consumed 

400 """ 

401 if self._last_time is None: 

402 return 0.0 

403 return self._calculate_exponential_moving_average_rate( 

404 amt, time_at_consumption 

405 ) 

406 

407 def record_consumption_rate(self, amt, time_at_consumption): 

408 """Record the consumption rate based off amount and time point 

409 

410 :type amt: int 

411 :param amt: The amount that got consumed 

412 

413 :type time_at_consumption: float 

414 :param time_at_consumption: The time at which the amount was consumed 

415 """ 

416 if self._last_time is None: 

417 self._last_time = time_at_consumption 

418 self._current_rate = 0.0 

419 return 

420 self._current_rate = self._calculate_exponential_moving_average_rate( 

421 amt, time_at_consumption 

422 ) 

423 self._last_time = time_at_consumption 

424 

425 def _calculate_rate(self, amt, time_at_consumption): 

426 time_delta = time_at_consumption - self._last_time 

427 if time_delta <= 0: 

428 # While it is really unlikely to see this in an actual transfer, 

429 # we do not want to be returning back a negative rate or try to 

430 # divide the amount by zero. So instead return back an infinite 

431 # rate as the time delta is infinitesimally small. 

432 return float('inf') 

433 return amt / (time_delta) 

434 

435 def _calculate_exponential_moving_average_rate( 

436 self, amt, time_at_consumption 

437 ): 

438 new_rate = self._calculate_rate(amt, time_at_consumption) 

439 return self._alpha * new_rate + (1 - self._alpha) * self._current_rate