Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/bandwidth.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

152 statements  

1# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import threading 

14import time 

15 

16 

17class RequestExceededException(Exception): 

18 def __init__(self, requested_amt, retry_time): 

19 """Error when requested amount exceeds what is allowed 

20 

21 The request that raised this error should be retried after waiting 

22 the time specified by ``retry_time``. 

23 

24 :type requested_amt: int 

25 :param requested_amt: The originally requested byte amount 

26 

27 :type retry_time: float 

28 :param retry_time: The length in time to wait to retry for the 

29 requested amount 

30 """ 

31 self.requested_amt = requested_amt 

32 self.retry_time = retry_time 

33 msg = f'Request amount {requested_amt} exceeded the amount available. Retry in {retry_time}' 

34 super().__init__(msg) 

35 

36 

37class RequestToken: 

38 """A token to pass as an identifier when consuming from the LeakyBucket""" 

39 

40 pass 

41 

42 

43class TimeUtils: 

44 def time(self): 

45 """Get the current time back 

46 

47 :rtype: float 

48 :returns: The current time in seconds 

49 """ 

50 return time.time() 

51 

52 def sleep(self, value): 

53 """Sleep for a designated time 

54 

55 :type value: float 

56 :param value: The time to sleep for in seconds 

57 """ 

58 return time.sleep(value) 

59 

60 

61class BandwidthLimiter: 

62 def __init__(self, leaky_bucket, time_utils=None): 

63 """Limits bandwidth for shared S3 transfers 

64 

65 :type leaky_bucket: LeakyBucket 

66 :param leaky_bucket: The leaky bucket to use limit bandwidth 

67 

68 :type time_utils: TimeUtils 

69 :param time_utils: Time utility to use for interacting with time. 

70 """ 

71 self._leaky_bucket = leaky_bucket 

72 self._time_utils = time_utils 

73 if time_utils is None: 

74 self._time_utils = TimeUtils() 

75 

76 def get_bandwith_limited_stream( 

77 self, fileobj, transfer_coordinator, enabled=True 

78 ): 

79 """Wraps a fileobj in a bandwidth limited stream wrapper 

80 

81 :type fileobj: file-like obj 

82 :param fileobj: The file-like obj to wrap 

83 

84 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

85 param transfer_coordinator: The coordinator for the general transfer 

86 that the wrapped stream is a part of 

87 

88 :type enabled: boolean 

89 :param enabled: Whether bandwidth limiting should be enabled to start 

90 """ 

91 stream = BandwidthLimitedStream( 

92 fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils 

93 ) 

94 if not enabled: 

95 stream.disable_bandwidth_limiting() 

96 return stream 

97 

98 

99class BandwidthLimitedStream: 

100 def __init__( 

101 self, 

102 fileobj, 

103 leaky_bucket, 

104 transfer_coordinator, 

105 time_utils=None, 

106 bytes_threshold=256 * 1024, 

107 ): 

108 """Limits bandwidth for reads on a wrapped stream 

109 

110 :type fileobj: file-like object 

111 :param fileobj: The file like object to wrap 

112 

113 :type leaky_bucket: LeakyBucket 

114 :param leaky_bucket: The leaky bucket to use to throttle reads on 

115 the stream 

116 

117 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

118 param transfer_coordinator: The coordinator for the general transfer 

119 that the wrapped stream is a part of 

120 

121 :type time_utils: TimeUtils 

122 :param time_utils: The time utility to use for interacting with time 

123 """ 

124 self._fileobj = fileobj 

125 self._leaky_bucket = leaky_bucket 

126 self._transfer_coordinator = transfer_coordinator 

127 self._time_utils = time_utils 

128 if time_utils is None: 

129 self._time_utils = TimeUtils() 

130 self._bandwidth_limiting_enabled = True 

131 self._request_token = RequestToken() 

132 self._bytes_seen = 0 

133 self._bytes_threshold = bytes_threshold 

134 

135 def enable_bandwidth_limiting(self): 

136 """Enable bandwidth limiting on reads to the stream""" 

137 self._bandwidth_limiting_enabled = True 

138 

139 def disable_bandwidth_limiting(self): 

140 """Disable bandwidth limiting on reads to the stream""" 

141 self._bandwidth_limiting_enabled = False 

142 

143 def read(self, amount): 

144 """Read a specified amount 

145 

146 Reads will only be throttled if bandwidth limiting is enabled. 

147 """ 

148 if not self._bandwidth_limiting_enabled: 

149 return self._fileobj.read(amount) 

150 

151 # We do not want to be calling consume on every read as the read 

152 # amounts can be small causing the lock of the leaky bucket to 

153 # introduce noticeable overhead. So instead we keep track of 

154 # how many bytes we have seen and only call consume once we pass a 

155 # certain threshold. 

156 self._bytes_seen += amount 

157 if self._bytes_seen < self._bytes_threshold: 

158 return self._fileobj.read(amount) 

159 

160 self._consume_through_leaky_bucket() 

161 return self._fileobj.read(amount) 

162 

163 def _consume_through_leaky_bucket(self): 

164 # NOTE: If the read amount on the stream are high, it will result 

165 # in large bursty behavior as there is not an interface for partial 

166 # reads. However given the read's on this abstraction are at most 256KB 

167 # (via downloads), it reduces the burstiness to be small KB bursts at 

168 # worst. 

169 while not self._transfer_coordinator.exception: 

170 try: 

171 self._leaky_bucket.consume( 

172 self._bytes_seen, self._request_token 

173 ) 

174 self._bytes_seen = 0 

175 return 

176 except RequestExceededException as e: 

177 self._time_utils.sleep(e.retry_time) 

178 else: 

179 raise self._transfer_coordinator.exception 

180 

181 def signal_transferring(self): 

182 """Signal that data being read is being transferred to S3""" 

183 self.enable_bandwidth_limiting() 

184 

185 def signal_not_transferring(self): 

186 """Signal that data being read is not being transferred to S3""" 

187 self.disable_bandwidth_limiting() 

188 

189 def seek(self, where, whence=0): 

190 self._fileobj.seek(where, whence) 

191 

192 def tell(self): 

193 return self._fileobj.tell() 

194 

195 def close(self): 

196 if self._bandwidth_limiting_enabled and self._bytes_seen: 

197 # This handles the case where the file is small enough to never 

198 # trigger the threshold and thus is never subjugated to the 

199 # leaky bucket on read(). This specifically happens for small 

200 # uploads. So instead to account for those bytes, have 

201 # it go through the leaky bucket when the file gets closed. 

202 self._consume_through_leaky_bucket() 

203 self._fileobj.close() 

204 

205 def __enter__(self): 

206 return self 

207 

208 def __exit__(self, *args, **kwargs): 

209 self.close() 

210 

211 

212class LeakyBucket: 

213 def __init__( 

214 self, 

215 max_rate, 

216 time_utils=None, 

217 rate_tracker=None, 

218 consumption_scheduler=None, 

219 ): 

220 """A leaky bucket abstraction to limit bandwidth consumption 

221 

222 :type rate: int 

223 :type rate: The maximum rate to allow. This rate is in terms of 

224 bytes per second. 

225 

226 :type time_utils: TimeUtils 

227 :param time_utils: The time utility to use for interacting with time 

228 

229 :type rate_tracker: BandwidthRateTracker 

230 :param rate_tracker: Tracks bandwidth consumption 

231 

232 :type consumption_scheduler: ConsumptionScheduler 

233 :param consumption_scheduler: Schedules consumption retries when 

234 necessary 

235 """ 

236 self._max_rate = float(max_rate) 

237 self._time_utils = time_utils 

238 if time_utils is None: 

239 self._time_utils = TimeUtils() 

240 self._lock = threading.Lock() 

241 self._rate_tracker = rate_tracker 

242 if rate_tracker is None: 

243 self._rate_tracker = BandwidthRateTracker() 

244 self._consumption_scheduler = consumption_scheduler 

245 if consumption_scheduler is None: 

246 self._consumption_scheduler = ConsumptionScheduler() 

247 

248 def consume(self, amt, request_token): 

249 """Consume an a requested amount 

250 

251 :type amt: int 

252 :param amt: The amount of bytes to request to consume 

253 

254 :type request_token: RequestToken 

255 :param request_token: The token associated to the consumption 

256 request that is used to identify the request. So if a 

257 RequestExceededException is raised the token should be used 

258 in subsequent retry consume() request. 

259 

260 :raises RequestExceededException: If the consumption amount would 

261 exceed the maximum allocated bandwidth 

262 

263 :rtype: int 

264 :returns: The amount consumed 

265 """ 

266 with self._lock: 

267 time_now = self._time_utils.time() 

268 if self._consumption_scheduler.is_scheduled(request_token): 

269 return self._release_requested_amt_for_scheduled_request( 

270 amt, request_token, time_now 

271 ) 

272 elif self._projected_to_exceed_max_rate(amt, time_now): 

273 self._raise_request_exceeded_exception( 

274 amt, request_token, time_now 

275 ) 

276 else: 

277 return self._release_requested_amt(amt, time_now) 

278 

279 def _projected_to_exceed_max_rate(self, amt, time_now): 

280 projected_rate = self._rate_tracker.get_projected_rate(amt, time_now) 

281 return projected_rate > self._max_rate 

282 

283 def _release_requested_amt_for_scheduled_request( 

284 self, amt, request_token, time_now 

285 ): 

286 self._consumption_scheduler.process_scheduled_consumption( 

287 request_token 

288 ) 

289 return self._release_requested_amt(amt, time_now) 

290 

291 def _raise_request_exceeded_exception(self, amt, request_token, time_now): 

292 allocated_time = amt / float(self._max_rate) 

293 retry_time = self._consumption_scheduler.schedule_consumption( 

294 amt, request_token, allocated_time 

295 ) 

296 raise RequestExceededException( 

297 requested_amt=amt, retry_time=retry_time 

298 ) 

299 

300 def _release_requested_amt(self, amt, time_now): 

301 self._rate_tracker.record_consumption_rate(amt, time_now) 

302 return amt 

303 

304 

305class ConsumptionScheduler: 

306 def __init__(self): 

307 """Schedules when to consume a desired amount""" 

308 self._tokens_to_scheduled_consumption = {} 

309 self._total_wait = 0 

310 

311 def is_scheduled(self, token): 

312 """Indicates if a consumption request has been scheduled 

313 

314 :type token: RequestToken 

315 :param token: The token associated to the consumption 

316 request that is used to identify the request. 

317 """ 

318 return token in self._tokens_to_scheduled_consumption 

319 

320 def schedule_consumption(self, amt, token, time_to_consume): 

321 """Schedules a wait time to be able to consume an amount 

322 

323 :type amt: int 

324 :param amt: The amount of bytes scheduled to be consumed 

325 

326 :type token: RequestToken 

327 :param token: The token associated to the consumption 

328 request that is used to identify the request. 

329 

330 :type time_to_consume: float 

331 :param time_to_consume: The desired time it should take for that 

332 specific request amount to be consumed in regardless of previously 

333 scheduled consumption requests 

334 

335 :rtype: float 

336 :returns: The amount of time to wait for the specific request before 

337 actually consuming the specified amount. 

338 """ 

339 self._total_wait += time_to_consume 

340 self._tokens_to_scheduled_consumption[token] = { 

341 'wait_duration': self._total_wait, 

342 'time_to_consume': time_to_consume, 

343 } 

344 return self._total_wait 

345 

346 def process_scheduled_consumption(self, token): 

347 """Processes a scheduled consumption request that has completed 

348 

349 :type token: RequestToken 

350 :param token: The token associated to the consumption 

351 request that is used to identify the request. 

352 """ 

353 scheduled_retry = self._tokens_to_scheduled_consumption.pop(token) 

354 self._total_wait = max( 

355 self._total_wait - scheduled_retry['time_to_consume'], 0 

356 ) 

357 

358 

359class BandwidthRateTracker: 

360 def __init__(self, alpha=0.8): 

361 """Tracks the rate of bandwidth consumption 

362 

363 :type a: float 

364 :param a: The constant to use in calculating the exponentional moving 

365 average of the bandwidth rate. Specifically it is used in the 

366 following calculation: 

367 

368 current_rate = alpha * new_rate + (1 - alpha) * current_rate 

369 

370 This value of this constant should be between 0 and 1. 

371 """ 

372 self._alpha = alpha 

373 self._last_time = None 

374 self._current_rate = None 

375 

376 @property 

377 def current_rate(self): 

378 """The current transfer rate 

379 

380 :rtype: float 

381 :returns: The current tracked transfer rate 

382 """ 

383 if self._last_time is None: 

384 return 0.0 

385 return self._current_rate 

386 

387 def get_projected_rate(self, amt, time_at_consumption): 

388 """Get the projected rate using a provided amount and time 

389 

390 :type amt: int 

391 :param amt: The proposed amount to consume 

392 

393 :type time_at_consumption: float 

394 :param time_at_consumption: The proposed time to consume at 

395 

396 :rtype: float 

397 :returns: The consumption rate if that amt and time were consumed 

398 """ 

399 if self._last_time is None: 

400 return 0.0 

401 return self._calculate_exponential_moving_average_rate( 

402 amt, time_at_consumption 

403 ) 

404 

405 def record_consumption_rate(self, amt, time_at_consumption): 

406 """Record the consumption rate based off amount and time point 

407 

408 :type amt: int 

409 :param amt: The amount that got consumed 

410 

411 :type time_at_consumption: float 

412 :param time_at_consumption: The time at which the amount was consumed 

413 """ 

414 if self._last_time is None: 

415 self._last_time = time_at_consumption 

416 self._current_rate = 0.0 

417 return 

418 self._current_rate = self._calculate_exponential_moving_average_rate( 

419 amt, time_at_consumption 

420 ) 

421 self._last_time = time_at_consumption 

422 

423 def _calculate_rate(self, amt, time_at_consumption): 

424 time_delta = time_at_consumption - self._last_time 

425 if time_delta <= 0: 

426 # While it is really unlikely to see this in an actual transfer, 

427 # we do not want to be returning back a negative rate or try to 

428 # divide the amount by zero. So instead return back an infinite 

429 # rate as the time delta is infinitesimally small. 

430 return float('inf') 

431 return amt / (time_delta) 

432 

433 def _calculate_exponential_moving_average_rate( 

434 self, amt, time_at_consumption 

435 ): 

436 new_rate = self._calculate_rate(amt, time_at_consumption) 

437 return self._alpha * new_rate + (1 - self._alpha) * self._current_rate