Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/s3.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

600 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8"""Implements file-like objects for reading and writing from/to AWS S3.""" 

9from __future__ import annotations 

10 

11import http 

12import io 

13import functools 

14import logging 

15import time 

16import warnings 

17 

18from typing import ( 

19 Callable, 

20 List, 

21 TYPE_CHECKING, 

22) 

23 

24try: 

25 import boto3 

26 import botocore.client 

27 import botocore.exceptions 

28 import urllib3.exceptions 

29except ImportError: 

30 MISSING_DEPS = True 

31 

32import smart_open.bytebuffer 

33import smart_open.concurrency 

34import smart_open.utils 

35 

36from smart_open import constants 

37 

38 

39if TYPE_CHECKING: 

40 from mypy_boto3_s3.client import S3Client 

41 from typing_extensions import Buffer 

42 

43logger = logging.getLogger(__name__) 

44 

45# 

46# AWS puts restrictions on the part size for multipart uploads. 

47# Each part must be more than 5MB, and less than 5GB. 

48# 

49# On top of that, our MultipartWriter has a min_part_size option. 

50# In retrospect, it's an unfortunate name, because it conflicts with the 

51# minimum allowable part size (5MB), but it's too late to change it, because 

52# people are using that parameter (unlike the MIN, DEFAULT, MAX constants). 

53# It really just means "part size": as soon as you have this many bytes, 

54# write a part to S3 (see the MultipartWriter.write method). 

55# 

56 

57MIN_PART_SIZE = 5 * 1024 ** 2 

58"""The absolute minimum permitted by Amazon.""" 

59 

60DEFAULT_PART_SIZE = 50 * 1024**2 

61"""The default part size for S3 multipart uploads, chosen carefully by smart_open""" 

62 

63MAX_PART_SIZE = 5 * 1024 ** 3 

64"""The absolute maximum permitted by Amazon.""" 

65 

66SCHEMES = ("s3", "s3n", 's3u', "s3a") 

67DEFAULT_PORT = 443 

68DEFAULT_HOST = 's3.amazonaws.com' 

69 

70DEFAULT_BUFFER_SIZE = 128 * 1024 

71 

72URI_EXAMPLES = ( 

73 's3://my_bucket/my_key', 

74 's3://my_key:my_secret@my_bucket/my_key', 

75 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key', 

76) 

77 

78# Returned by AWS when we try to seek beyond EOF. 

79_OUT_OF_RANGE = 'InvalidRange' 

80 

81 

82class Retry: 

83 def __init__(self): 

84 self.attempts: int = 6 

85 self.sleep_seconds: int = 10 

86 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError] 

87 self.client_error_codes: List[str] = ['NoSuchUpload'] 

88 

89 def _do(self, fn: Callable): 

90 for attempt in range(self.attempts): 

91 try: 

92 return fn() 

93 except tuple(self.exceptions) as err: 

94 logger.critical( 

95 'Caught non-fatal %s, retrying %d more times', 

96 err, 

97 self.attempts - attempt - 1, 

98 ) 

99 logger.exception(err) 

100 time.sleep(self.sleep_seconds) 

101 except botocore.exceptions.ClientError as err: 

102 error_code = err.response['Error'].get('Code') 

103 if error_code not in self.client_error_codes: 

104 raise 

105 logger.critical( 

106 'Caught non-fatal ClientError (%s), retrying %d more times', 

107 error_code, 

108 self.attempts - attempt - 1, 

109 ) 

110 logger.exception(err) 

111 time.sleep(self.sleep_seconds) 

112 else: 

113 logger.critical('encountered too many non-fatal errors, giving up') 

114 raise IOError('%s failed after %d attempts', fn.func, self.attempts) 

115 

116 

117# 

118# The retry mechanism for this submodule. Client code may modify it, e.g. by 

119# updating RETRY.sleep_seconds and friends. 

120# 

121if 'MISSING_DEPS' not in locals(): 

122 RETRY = Retry() 

123 

124 

125class _ClientWrapper: 

126 """Wraps a client to inject the appropriate keyword args into each method call. 

127 

128 The keyword args are a dictionary keyed by the fully qualified method name. 

129 For example, S3.Client.create_multipart_upload. 

130 

131 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client 

132 

133 This wrapper behaves identically to the client otherwise. 

134 """ 

135 def __init__(self, client, kwargs): 

136 self.client = client 

137 self.kwargs = kwargs 

138 

139 def __getattr__(self, method_name): 

140 method = getattr(self.client, method_name) 

141 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {}) 

142 return functools.partial(method, **kwargs) 

143 

144 

145def parse_uri(uri_as_string): 

146 # 

147 # Restrictions on bucket names and labels: 

148 # 

149 # - Bucket names must be at least 3 and no more than 63 characters long. 

150 # - Bucket names must be a series of one or more labels. 

151 # - Adjacent labels are separated by a single period (.). 

152 # - Bucket names can contain lowercase letters, numbers, and hyphens. 

153 # - Each label must start and end with a lowercase letter or a number. 

154 # 

155 # We use the above as a guide only, and do not perform any validation. We 

156 # let boto3 take care of that for us. 

157 # 

158 split_uri = smart_open.utils.safe_urlsplit(uri_as_string) 

159 assert split_uri.scheme in SCHEMES 

160 

161 port = DEFAULT_PORT 

162 host = DEFAULT_HOST 

163 ordinary_calling_format = False 

164 # 

165 # These defaults tell boto3 to look for credentials elsewhere 

166 # 

167 access_id, access_secret = None, None 

168 

169 # 

170 # Common URI template [secret:key@][host[:port]@]bucket/object 

171 # 

172 # The urlparse function doesn't handle the above schema, so we have to do 

173 # it ourselves. 

174 # 

175 uri = split_uri.netloc + split_uri.path 

176 

177 # 

178 # Attempt to extract edge-case authentication details from the URL. 

179 # 

180 # See: 

181 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/ 

182 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases 

183 # 

184 if '@' in uri: 

185 maybe_auth, rest = uri.split('@', 1) 

186 if ':' in maybe_auth: 

187 maybe_id, maybe_secret = maybe_auth.split(':', 1) 

188 if '/' not in maybe_id: 

189 access_id, access_secret = maybe_id, maybe_secret 

190 uri = rest 

191 

192 head, key_id = uri.split('/', 1) 

193 if '@' in head and ':' in head: 

194 ordinary_calling_format = True 

195 host_port, bucket_id = head.split('@') 

196 host, port = host_port.split(':', 1) 

197 port = int(port) 

198 elif '@' in head: 

199 ordinary_calling_format = True 

200 host, bucket_id = head.split('@') 

201 else: 

202 bucket_id = head 

203 

204 return dict( 

205 scheme=split_uri.scheme, 

206 bucket_id=bucket_id, 

207 key_id=key_id, 

208 port=port, 

209 host=host, 

210 ordinary_calling_format=ordinary_calling_format, 

211 access_id=access_id, 

212 access_secret=access_secret, 

213 ) 

214 

215 

216def _consolidate_params(uri, transport_params): 

217 """Consolidates the parsed Uri with the additional parameters. 

218 

219 This is necessary because the user can pass some of the parameters can in 

220 two different ways: 

221 

222 1) Via the URI itself 

223 2) Via the transport parameters 

224 

225 These are not mutually exclusive, but we have to pick one over the other 

226 in a sensible way in order to proceed. 

227 

228 """ 

229 transport_params = dict(transport_params) 

230 

231 def inject(**kwargs): 

232 try: 

233 client_kwargs = transport_params['client_kwargs'] 

234 except KeyError: 

235 client_kwargs = transport_params['client_kwargs'] = {} 

236 

237 try: 

238 init_kwargs = client_kwargs['S3.Client'] 

239 except KeyError: 

240 init_kwargs = client_kwargs['S3.Client'] = {} 

241 

242 init_kwargs.update(**kwargs) 

243 

244 client = transport_params.get('client') 

245 if client is not None and (uri['access_id'] or uri['access_secret']): 

246 logger.warning( 

247 'ignoring credentials parsed from URL because they conflict with ' 

248 'transport_params["client"]. Set transport_params["client"] to None ' 

249 'to suppress this warning.' 

250 ) 

251 uri.update(access_id=None, access_secret=None) 

252 elif (uri['access_id'] and uri['access_secret']): 

253 inject( 

254 aws_access_key_id=uri['access_id'], 

255 aws_secret_access_key=uri['access_secret'], 

256 ) 

257 uri.update(access_id=None, access_secret=None) 

258 

259 if client is not None and uri['host'] != DEFAULT_HOST: 

260 logger.warning( 

261 'ignoring endpoint_url parsed from URL because they conflict with ' 

262 'transport_params["client"]. Set transport_params["client"] to None ' 

263 'to suppress this warning.' 

264 ) 

265 uri.update(host=None) 

266 elif uri['host'] != DEFAULT_HOST: 

267 if uri['scheme'] == 's3u': 

268 scheme = 'http' 

269 else: 

270 scheme = 'https' 

271 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri) 

272 uri.update(host=None) 

273 

274 return uri, transport_params 

275 

276 

277def open_uri(uri, mode, transport_params): 

278 deprecated = ( 

279 'multipart_upload_kwargs', 

280 'object_kwargs', 

281 'resource', 

282 'resource_kwargs', 

283 'session', 

284 'singlepart_upload_kwargs', 

285 ) 

286 detected = [k for k in deprecated if k in transport_params] 

287 if detected: 

288 doc_url = ( 

289 'https://github.com/piskvorky/smart_open/blob/develop/' 

290 'MIGRATING_FROM_OLDER_VERSIONS.rst' 

291 ) 

292 # 

293 # We use warnings.warn /w UserWarning instead of logger.warn here because 

294 # 

295 # 1) Not everyone has logging enabled; and 

296 # 2) check_kwargs (below) already uses logger.warn with a similar message 

297 # 

298 # https://github.com/piskvorky/smart_open/issues/614 

299 # 

300 message = ( 

301 'ignoring the following deprecated transport parameters: %r. ' 

302 'See <%s> for details' % (detected, doc_url) 

303 ) 

304 warnings.warn(message, UserWarning) 

305 parsed_uri = parse_uri(uri) 

306 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params) 

307 kwargs = smart_open.utils.check_kwargs(open, transport_params) 

308 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs) 

309 

310 

311def open( 

312 bucket_id, 

313 key_id, 

314 mode, 

315 version_id=None, 

316 buffer_size=DEFAULT_BUFFER_SIZE, 

317 min_part_size=DEFAULT_PART_SIZE, 

318 multipart_upload=True, 

319 defer_seek=False, 

320 client=None, 

321 client_kwargs=None, 

322 writebuffer=None, 

323): 

324 """Open an S3 object for reading or writing. 

325 

326 Parameters 

327 ---------- 

328 bucket_id: str 

329 The name of the bucket this object resides in. 

330 key_id: str 

331 The name of the key within the bucket. 

332 mode: str 

333 The mode for opening the object. Must be either "rb" or "wb". 

334 buffer_size: int, optional 

335 Default: 128KB 

336 The buffer size in bytes for reading. Controls memory usage. Data is streamed 

337 from a S3 network stream in buffer_size chunks. Forward seeks within 

338 the current buffer are satisfied without additional GET requests. Backward 

339 seeks always open a new GET request. For forward seek-intensive workloads, 

340 increase buffer_size to reduce GET requests at the cost of higher memory usage. 

341 min_part_size: int, optional 

342 The minimum part size for multipart uploads, in bytes. 

343 When the writebuffer contains this many bytes, smart_open will upload 

344 the bytes to S3 as a single part of a multi-part upload, freeing the 

345 buffer either partially or entirely. When you close the writer, it 

346 will assemble the parts together. 

347 The value determines the upper limit for the writebuffer. If buffer 

348 space is short (e.g. you are buffering to memory), then use a smaller 

349 value for min_part_size, or consider buffering to disk instead (see 

350 the writebuffer option). 

351 The value must be between 5MB and 5GB. If you specify a value outside 

352 of this range, smart_open will adjust it for you, because otherwise the 

353 upload _will_ fail. 

354 For writing only. Does not apply if you set multipart_upload=False. 

355 multipart_upload: bool, optional 

356 Default: `True` 

357 If set to `True`, will use multipart upload for writing to S3. If set 

358 to `False`, S3 upload will use the S3 Single-Part Upload API, which 

359 is more ideal for small file sizes. 

360 For writing only. 

361 version_id: str, optional 

362 Version of the object, used when reading object. 

363 If None, will fetch the most recent version. 

364 defer_seek: boolean, optional 

365 Default: `False` 

366 If set to `True` on a file opened for reading, GetObject will not be 

367 called until the first seek() or read(). 

368 Avoids redundant API queries when seeking before reading. 

369 client: object, optional 

370 The S3 client to use when working with boto3. 

371 If you don't specify this, then smart_open will create a new client for you. 

372 client_kwargs: dict, optional 

373 Additional parameters to pass to the relevant functions of the client. 

374 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. 

375 The values are kwargs to pass to that method each time it is called. 

376 writebuffer: IO[bytes], optional 

377 By default, this module will buffer data in memory using io.BytesIO 

378 when writing. Pass another binary IO instance here to use it instead. 

379 For example, you may pass a file object to buffer to local disk instead 

380 of in RAM. Use this to keep RAM usage low at the expense of additional 

381 disk IO. If you pass in an open file, then you are responsible for 

382 cleaning it up after writing completes. 

383 """ 

384 logger.debug('%r', locals()) 

385 if mode not in constants.BINARY_MODES: 

386 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES)) 

387 

388 if (mode == constants.WRITE_BINARY) and (version_id is not None): 

389 raise ValueError("version_id must be None when writing") 

390 

391 if mode == constants.READ_BINARY: 

392 fileobj = Reader( 

393 bucket_id, 

394 key_id, 

395 version_id=version_id, 

396 buffer_size=buffer_size, 

397 defer_seek=defer_seek, 

398 client=client, 

399 client_kwargs=client_kwargs, 

400 ) 

401 elif mode == constants.WRITE_BINARY: 

402 if multipart_upload: 

403 fileobj = MultipartWriter( 

404 bucket_id, 

405 key_id, 

406 client=client, 

407 client_kwargs=client_kwargs, 

408 writebuffer=writebuffer, 

409 part_size=min_part_size, 

410 ) 

411 else: 

412 fileobj = SinglepartWriter( 

413 bucket_id, 

414 key_id, 

415 client=client, 

416 client_kwargs=client_kwargs, 

417 writebuffer=writebuffer, 

418 ) 

419 else: 

420 assert False, 'unexpected mode: %r' % mode 

421 

422 fileobj.name = key_id 

423 return fileobj 

424 

425 

426def _get(client, bucket, key, version, range_string): 

427 try: 

428 params = dict(Bucket=bucket, Key=key) 

429 if version: 

430 params["VersionId"] = version 

431 if range_string: 

432 params["Range"] = range_string 

433 

434 return client.get_object(**params) 

435 except botocore.client.ClientError as error: 

436 wrapped_error = IOError( 

437 'unable to access bucket: %r key: %r version: %r error: %s' % ( 

438 bucket, key, version, error 

439 ) 

440 ) 

441 wrapped_error.backend_error = error 

442 raise wrapped_error from error 

443 

444 

445def _unwrap_ioerror(ioe): 

446 """Given an IOError from _get, return the 'Error' dictionary from boto.""" 

447 try: 

448 return ioe.backend_error.response['Error'] 

449 except (AttributeError, KeyError): 

450 return None 

451 

452 

453class _SeekableRawReader(object): 

454 """Read an S3 object. 

455 

456 This class is internal to the S3 submodule. 

457 """ 

458 

459 def __init__( 

460 self, 

461 client, 

462 bucket, 

463 key, 

464 version_id=None, 

465 ): 

466 self._client = client 

467 self._bucket = bucket 

468 self._key = key 

469 self._version_id = version_id 

470 

471 self._content_length = None 

472 self._position = 0 

473 self._body = None 

474 

475 @property 

476 def closed(self): 

477 return self._body is None 

478 

479 def close(self): 

480 if not self.closed: 

481 self._body.close() 

482 self._body = None 

483 

484 def seek(self, offset, whence=constants.WHENCE_START): 

485 """Seek to the specified position. 

486 

487 :param int offset: The offset in bytes. 

488 :param int whence: Where the offset is from. 

489 

490 :returns: the position after seeking. 

491 :rtype: int 

492 """ 

493 if whence not in constants.WHENCE_CHOICES: 

494 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES) 

495 

496 # 

497 # Close old body explicitly. 

498 # 

499 self.close() 

500 

501 start = None 

502 stop = None 

503 if whence == constants.WHENCE_START: 

504 start = max(0, offset) 

505 elif whence == constants.WHENCE_CURRENT: 

506 start = max(0, offset + self._position) 

507 else: 

508 stop = max(0, -offset) 

509 

510 # 

511 # If we can figure out that we've read past the EOF, then we can save 

512 # an extra API call. 

513 # 

514 if self._content_length is None: 

515 reached_eof = False 

516 elif start is not None and start >= self._content_length: 

517 reached_eof = True 

518 elif stop == 0: 

519 reached_eof = True 

520 else: 

521 reached_eof = False 

522 

523 if reached_eof: 

524 self._body = io.BytesIO() 

525 self._position = self._content_length 

526 else: 

527 self._open_body(start, stop) 

528 

529 return self._position 

530 

531 def _open_body(self, start=None, stop=None): 

532 """Open a connection to download the specified range of bytes. Store 

533 the open file handle in self._body. 

534 

535 If no range is specified, start defaults to self._position. 

536 start and stop follow the semantics of the http range header, 

537 so a stop without a start will read bytes beginning at stop. 

538 

539 As a side effect, set self._content_length. Set self._position 

540 to self._content_length if start is past end of file. 

541 """ 

542 if start is None and stop is None: 

543 start = self._position 

544 range_string = smart_open.utils.make_range_string(start, stop) 

545 

546 try: 

547 # Optimistically try to fetch the requested content range. 

548 response = _get( 

549 self._client, 

550 self._bucket, 

551 self._key, 

552 self._version_id, 

553 range_string, 

554 ) 

555 except IOError as ioe: 

556 # Handle requested content range exceeding content size. 

557 error_response = _unwrap_ioerror(ioe) 

558 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE: 

559 raise 

560 

561 actual_object_size = int(error_response.get('ActualObjectSize', 0)) 

562 if ( 

563 # empty file (==) or start is past end of file (>) 

564 (start is not None and start >= actual_object_size) 

565 # negative seek requested more bytes than file has 

566 or (start is None and stop is not None and stop >= actual_object_size) 

567 ): 

568 self._position = self._content_length = actual_object_size 

569 self._body = io.BytesIO() 

570 else: # stop is past end of file: request the correct remainder instead 

571 self._open_body(start=start, stop=actual_object_size - 1) 

572 return 

573 

574 # 

575 # Keep track of how many times boto3's built-in retry mechanism 

576 # activated. 

577 # 

578 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response 

579 # 

580 logger.debug( 

581 '%s: RetryAttempts: %d', 

582 self, 

583 response['ResponseMetadata']['RetryAttempts'], 

584 ) 

585 # 

586 # range request may not always return partial content, see: 

587 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses 

588 # 

589 status_code = response['ResponseMetadata']['HTTPStatusCode'] 

590 if status_code == http.HTTPStatus.PARTIAL_CONTENT: 

591 # 206 guarantees that the response body only contains the requested byte range 

592 _, resp_start, _, length = smart_open.utils.parse_content_range(response['ContentRange']) 

593 self._position = resp_start 

594 self._content_length = length 

595 self._body = response['Body'] 

596 elif status_code == http.HTTPStatus.OK: 

597 # 200 guarantees the response body contains the full file (server ignored range header) 

598 self._position = 0 

599 self._content_length = response["ContentLength"] 

600 self._body = response['Body'] 

601 # 

602 # If we got a full request when we were actually expecting a range, we need to 

603 # read some data to ensure that the body starts in the place that the caller expects 

604 # 

605 if start is not None: 

606 expected_position = min(self._content_length, start) 

607 elif start is None and stop is not None: 

608 expected_position = max(0, self._content_length - stop) 

609 else: 

610 expected_position = 0 

611 if expected_position > 0: 

612 logger.debug( 

613 '%s: discarding %d bytes to reach expected position', 

614 self, 

615 expected_position, 

616 ) 

617 self._position = len(self._body.read(expected_position)) 

618 else: 

619 raise ValueError("Unexpected status code %r" % status_code) 

620 

621 def read(self, size=-1): 

622 """Read from the continuous connection with the remote peer.""" 

623 if self.closed: 

624 # This is necessary for the very first read() after __init__(). 

625 self._open_body() 

626 if self._position >= self._content_length: 

627 return b'' 

628 

629 # 

630 # Boto3 has built-in error handling and retry mechanisms: 

631 # 

632 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html 

633 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

634 # 

635 # Unfortunately, it isn't always enough. There is still a non-zero 

636 # possibility that an exception will slip past these mechanisms and 

637 # terminate the read prematurely. Luckily, at this stage, it's very 

638 # simple to recover from the problem: wait a little bit, reopen the 

639 # HTTP connection and try again. Usually, a single retry attempt is 

640 # enough to recover, but we try multiple times "just in case". 

641 # 

642 for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1): 

643 try: 

644 if size == -1: 

645 binary = self._body.read() 

646 else: 

647 binary = self._body.read(size) 

648 except ( 

649 ConnectionResetError, 

650 botocore.exceptions.BotoCoreError, 

651 urllib3.exceptions.HTTPError, 

652 ) as err: 

653 logger.warning( 

654 '%s: caught %r while reading %d bytes, sleeping %ds before retry', 

655 self, 

656 err, 

657 size, 

658 seconds, 

659 ) 

660 time.sleep(seconds) 

661 self._open_body() 

662 else: 

663 self._position += len(binary) 

664 return binary 

665 

666 raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt)) 

667 

668 def __str__(self): 

669 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) 

670 

671 

672def _initialize_boto3(rw, client, client_kwargs, bucket, key): 

673 """Created the required objects for accessing S3. Ideally, they have 

674 been already created for us and we can just reuse them.""" 

675 if client_kwargs is None: 

676 client_kwargs = {} 

677 

678 if client is None: 

679 init_kwargs = client_kwargs.get('S3.Client', {}) 

680 if 'config' not in init_kwargs: 

681 init_kwargs['config'] = botocore.client.Config( 

682 max_pool_connections=64, 

683 tcp_keepalive=True, 

684 retries={"max_attempts": 6, "mode": "adaptive"} 

685 ) 

686 # boto3.client re-uses the default session which is not thread-safe when this is called 

687 # from within a thread. when using smart_open with multithreading, create a thread-safe 

688 # client with the config above and share it between threads using transport_params 

689 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111 

690 client = boto3.client('s3', **init_kwargs) 

691 assert client 

692 

693 rw._client = _ClientWrapper(client, client_kwargs) 

694 rw._bucket = bucket 

695 rw._key = key 

696 

697 

698class Reader(io.BufferedIOBase): 

699 """Reads bytes from S3. 

700 

701 Implements the io.BufferedIOBase interface of the standard library.""" 

702 

703 def __init__( 

704 self, 

705 bucket, 

706 key, 

707 version_id=None, 

708 buffer_size=DEFAULT_BUFFER_SIZE, 

709 line_terminator=constants.BINARY_NEWLINE, 

710 defer_seek=False, 

711 client=None, 

712 client_kwargs=None, 

713 ): 

714 self._version_id = version_id 

715 self._buffer_size = buffer_size 

716 

717 _initialize_boto3(self, client, client_kwargs, bucket, key) 

718 

719 self._raw_reader = _SeekableRawReader( 

720 self._client, 

721 bucket, 

722 key, 

723 self._version_id, 

724 ) 

725 self._current_pos = 0 

726 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) 

727 self._eof = False 

728 self._line_terminator = line_terminator 

729 self._seek_initialized = False 

730 

731 # 

732 # This member is part of the io.BufferedIOBase interface. 

733 # 

734 self.raw = None 

735 

736 if not defer_seek: 

737 self.seek(0) 

738 

739 # 

740 # io.BufferedIOBase methods. 

741 # 

742 

743 def close(self): 

744 """Flush and close this stream.""" 

745 logger.debug("close: called") 

746 pass 

747 

748 def readable(self): 

749 """Return True if the stream can be read from.""" 

750 return True 

751 

752 def read(self, size=-1): 

753 """Read up to size bytes from the object and return them.""" 

754 if size == 0: 

755 return b'' 

756 elif size < 0: 

757 # call read() before setting _current_pos to make sure _content_length is set 

758 out = self._read_from_buffer() + self._raw_reader.read() 

759 self._current_pos = self._raw_reader._content_length 

760 return out 

761 

762 # 

763 # Return unused data first 

764 # 

765 if len(self._buffer) >= size: 

766 return self._read_from_buffer(size) 

767 

768 # 

769 # If the stream is finished, return what we have. 

770 # 

771 if self._eof: 

772 return self._read_from_buffer() 

773 

774 self._fill_buffer(size) 

775 return self._read_from_buffer(size) 

776 

777 def read1(self, size=-1): 

778 """This is the same as read().""" 

779 return self.read(size=size) 

780 

781 def readinto(self, b): 

782 """Read up to len(b) bytes into b, and return the number of bytes 

783 read.""" 

784 data = self.read(len(b)) 

785 if not data: 

786 return 0 

787 b[:len(data)] = data 

788 return len(data) 

789 

790 def readline(self, limit=-1): 

791 """Read up to and including the next newline. Returns the bytes read.""" 

792 if limit != -1: 

793 raise NotImplementedError('limits other than -1 not implemented yet') 

794 

795 # 

796 # A single line may span multiple buffers. 

797 # 

798 line = io.BytesIO() 

799 while not (self._eof and len(self._buffer) == 0): 

800 line_part = self._buffer.readline(self._line_terminator) 

801 line.write(line_part) 

802 self._current_pos += len(line_part) 

803 

804 if line_part.endswith(self._line_terminator): 

805 break 

806 else: 

807 self._fill_buffer() 

808 

809 return line.getvalue() 

810 

811 def seekable(self): 

812 """If False, seek(), tell() and truncate() will raise IOError. 

813 

814 We offer only seek support, and no truncate support.""" 

815 return True 

816 

817 def seek(self, offset, whence=constants.WHENCE_START): 

818 """Seek to the specified position. 

819 

820 :param int offset: The offset in bytes. 

821 :param int whence: Where the offset is from. 

822 

823 Returns the position after seeking.""" 

824 # Convert relative offset to absolute, since self._raw_reader 

825 # doesn't know our current position. 

826 if whence == constants.WHENCE_CURRENT: 

827 whence = constants.WHENCE_START 

828 offset += self._current_pos 

829 

830 # Check if we can satisfy seek from buffer 

831 if whence == constants.WHENCE_START and offset > self._current_pos: 

832 buffer_end = self._current_pos + len(self._buffer) 

833 if offset <= buffer_end: 

834 # Forward seek within buffered data - avoid S3 request 

835 self._buffer.read(offset - self._current_pos) 

836 self._current_pos = offset 

837 return self._current_pos 

838 

839 if not self._seek_initialized or not ( 

840 whence == constants.WHENCE_START and offset == self._current_pos 

841 ): 

842 self._current_pos = self._raw_reader.seek(offset, whence) 

843 self._buffer.empty() 

844 

845 self._eof = self._current_pos == self._raw_reader._content_length 

846 

847 self._seek_initialized = True 

848 return self._current_pos 

849 

850 def tell(self): 

851 """Return the current position within the file.""" 

852 return self._current_pos 

853 

854 def truncate(self, size=None): 

855 """Unsupported.""" 

856 raise io.UnsupportedOperation 

857 

858 def detach(self): 

859 """Unsupported.""" 

860 raise io.UnsupportedOperation 

861 

862 def terminate(self): 

863 """Do nothing.""" 

864 pass 

865 

866 def to_boto3(self, resource): 

867 """Create an **independent** `boto3.s3.Object` instance that points to 

868 the same S3 object as this instance. 

869 Changes to the returned object will not affect the current instance. 

870 """ 

871 assert resource, 'resource must be a boto3.resource instance' 

872 obj = resource.Object(self._bucket, self._key) 

873 if self._version_id is not None: 

874 return obj.Version(self._version_id) 

875 else: 

876 return obj 

877 

878 # 

879 # Internal methods. 

880 # 

881 def _read_from_buffer(self, size=-1): 

882 """Remove at most size bytes from our buffer and return them.""" 

883 size = size if size >= 0 else len(self._buffer) 

884 part = self._buffer.read(size) 

885 self._current_pos += len(part) 

886 return part 

887 

888 def _fill_buffer(self, size=-1): 

889 size = max(size, self._buffer._chunk_size) 

890 while len(self._buffer) < size and not self._eof: 

891 bytes_read = self._buffer.fill(self._raw_reader) 

892 if bytes_read == 0: 

893 logger.debug('%s: reached EOF while filling buffer', self) 

894 self._eof = True 

895 

896 def __str__(self): 

897 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key) 

898 

899 def __repr__(self): 

900 return ( 

901 "smart_open.s3.Reader(" 

902 "bucket=%r, " 

903 "key=%r, " 

904 "version_id=%r, " 

905 "buffer_size=%r, " 

906 "line_terminator=%r)" 

907 ) % ( 

908 self._bucket, 

909 self._key, 

910 self._version_id, 

911 self._buffer_size, 

912 self._line_terminator, 

913 ) 

914 

915 

916class MultipartWriter(io.BufferedIOBase): 

917 """Writes bytes to S3 using the multi part API. 

918 

919 Implements the io.BufferedIOBase interface of the standard library.""" 

920 _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called 

921 

922 def __init__( 

923 self, 

924 bucket, 

925 key, 

926 part_size=DEFAULT_PART_SIZE, 

927 client=None, 

928 client_kwargs=None, 

929 writebuffer: io.BytesIO | None = None, 

930 ): 

931 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE) 

932 if part_size != adjusted_ps: 

933 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}") 

934 part_size = adjusted_ps 

935 self._part_size = part_size 

936 

937 _initialize_boto3(self, client, client_kwargs, bucket, key) 

938 self._client: S3Client 

939 self._bucket: str 

940 self._key: str 

941 

942 try: 

943 partial = functools.partial( 

944 self._client.create_multipart_upload, 

945 Bucket=bucket, 

946 Key=key, 

947 ) 

948 self._upload_id = RETRY._do(partial)['UploadId'] 

949 except botocore.client.ClientError as error: 

950 raise ValueError( 

951 'the bucket %r does not exist, or is forbidden for access (%r)' % ( 

952 bucket, error 

953 ) 

954 ) from error 

955 

956 if writebuffer is None: 

957 self._buf = io.BytesIO() 

958 else: 

959 self._buf = writebuffer 

960 

961 self._total_bytes = 0 

962 self._total_parts = 0 

963 self._parts: list[dict[str, object]] = [] 

964 

965 # 

966 # This member is part of the io.BufferedIOBase interface. 

967 # 

968 self.raw = None # type: ignore[assignment] 

969 

970 def flush(self): 

971 pass 

972 

973 # 

974 # Override some methods from io.IOBase. 

975 # 

976 def close(self): 

977 logger.debug("close: called") 

978 if self.closed: 

979 return 

980 

981 if self._buf.tell(): 

982 self._upload_next_part() 

983 

984 logger.debug('%s: completing multipart upload', self) 

985 if self._total_bytes and self._upload_id: 

986 partial = functools.partial( 

987 self._client.complete_multipart_upload, 

988 Bucket=self._bucket, 

989 Key=self._key, 

990 UploadId=self._upload_id, 

991 MultipartUpload={'Parts': self._parts}, 

992 ) 

993 RETRY._do(partial) 

994 logger.debug('%s: completed multipart upload', self) 

995 elif self._upload_id: 

996 # 

997 # AWS complains with "The XML you provided was not well-formed or 

998 # did not validate against our published schema" when the input is 

999 # completely empty => abort the upload, no file created. 

1000 # 

1001 # We work around this by creating an empty file explicitly. 

1002 # 

1003 self._client.abort_multipart_upload( 

1004 Bucket=self._bucket, 

1005 Key=self._key, 

1006 UploadId=self._upload_id, 

1007 ) 

1008 self._client.put_object( 

1009 Bucket=self._bucket, 

1010 Key=self._key, 

1011 Body=b'', 

1012 ) 

1013 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) 

1014 self._upload_id = None 

1015 

1016 @property 

1017 def closed(self): 

1018 return self._upload_id is None 

1019 

1020 def writable(self): 

1021 """Return True if the stream supports writing.""" 

1022 return True 

1023 

1024 def seekable(self): 

1025 """If False, seek(), tell() and truncate() will raise IOError. 

1026 

1027 We offer only tell support, and no seek or truncate support.""" 

1028 return True 

1029 

1030 def seek(self, offset, whence=constants.WHENCE_START): 

1031 """Unsupported.""" 

1032 raise io.UnsupportedOperation 

1033 

1034 def truncate(self, size=None): 

1035 """Unsupported.""" 

1036 raise io.UnsupportedOperation 

1037 

1038 def tell(self): 

1039 """Return the current stream position.""" 

1040 return self._total_bytes 

1041 

1042 # 

1043 # io.BufferedIOBase methods. 

1044 # 

1045 def detach(self): 

1046 raise io.UnsupportedOperation("detach() not supported") 

1047 

1048 def write(self, b: Buffer) -> int: 

1049 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1050 interface implementation) to the S3 file. 

1051 

1052 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html 

1053 

1054 There's buffering happening under the covers, so this may not actually 

1055 do any HTTP transfer right away.""" 

1056 offset = 0 

1057 mv = memoryview(b) 

1058 self._total_bytes += len(mv) 

1059 

1060 # 

1061 # botocore does not accept memoryview, otherwise we could've gotten 

1062 # away with not needing to write a copy to the buffer aside from cases 

1063 # where b is smaller than part_size 

1064 # 

1065 while offset < len(mv): 

1066 start = offset 

1067 end = offset + self._part_size - self._buf.tell() 

1068 self._buf.write(mv[start:end]) 

1069 if self._buf.tell() < self._part_size: 

1070 # 

1071 # Not enough data to write a new part just yet. The assert 

1072 # ensures that we've consumed all of the input buffer. 

1073 # 

1074 assert end >= len(mv) 

1075 return len(mv) 

1076 

1077 self._upload_next_part() 

1078 offset = end 

1079 return len(mv) 

1080 

1081 def terminate(self): 

1082 """Cancel the underlying multipart upload.""" 

1083 if self.closed: 

1084 return 

1085 logger.debug('%s: terminating multipart upload', self) 

1086 self._client.abort_multipart_upload( 

1087 Bucket=self._bucket, 

1088 Key=self._key, 

1089 UploadId=self._upload_id, 

1090 ) 

1091 self._upload_id = None 

1092 logger.debug('%s: terminated multipart upload', self) 

1093 

1094 def to_boto3(self, resource): 

1095 """Create an **independent** `boto3.s3.Object` instance that points to 

1096 the same S3 object as this instance. 

1097 Changes to the returned object will not affect the current instance. 

1098 """ 

1099 assert resource, 'resource must be a boto3.resource instance' 

1100 return resource.Object(self._bucket, self._key) 

1101 

1102 # 

1103 # Internal methods. 

1104 # 

1105 def _upload_next_part(self) -> None: 

1106 part_num = self._total_parts + 1 

1107 logger.info( 

1108 "%s: uploading part_num: %i, %i bytes (total %.3fGB)", 

1109 self, 

1110 part_num, 

1111 self._buf.tell(), 

1112 self._total_bytes / 1024.0 ** 3, 

1113 ) 

1114 self._buf.seek(0) 

1115 

1116 # 

1117 # Network problems in the middle of an upload are particularly 

1118 # troublesome. We don't want to abort the entire upload just because 

1119 # of a temporary connection problem, so this part needs to be 

1120 # especially robust. 

1121 # 

1122 upload = RETRY._do( 

1123 functools.partial( 

1124 self._client.upload_part, 

1125 Bucket=self._bucket, 

1126 Key=self._key, 

1127 UploadId=self._upload_id, 

1128 PartNumber=part_num, 

1129 Body=self._buf, 

1130 ) 

1131 ) 

1132 

1133 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) 

1134 logger.debug("%s: upload of part_num #%i finished", self, part_num) 

1135 

1136 self._total_parts += 1 

1137 

1138 self._buf.seek(0) 

1139 self._buf.truncate(0) 

1140 

1141 def __enter__(self): 

1142 return self 

1143 

1144 def __exit__(self, exc_type, exc_val, exc_tb): 

1145 if exc_type is not None: 

1146 self.terminate() 

1147 else: 

1148 self.close() 

1149 

1150 def __str__(self): 

1151 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key) 

1152 

1153 def __repr__(self): 

1154 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % ( 

1155 self._bucket, 

1156 self._key, 

1157 self._part_size, 

1158 ) 

1159 

1160 

1161class SinglepartWriter(io.BufferedIOBase): 

1162 """Writes bytes to S3 using the single part API. 

1163 

1164 Implements the io.BufferedIOBase interface of the standard library. 

1165 

1166 This class buffers all of its input in memory until its `close` method is called. Only then will 

1167 the data be written to S3 and the buffer is released.""" 

1168 _buf = None # so `closed` property works in case __init__ fails and __del__ is called 

1169 

1170 def __init__( 

1171 self, 

1172 bucket, 

1173 key, 

1174 client=None, 

1175 client_kwargs=None, 

1176 writebuffer=None, 

1177 ): 

1178 _initialize_boto3(self, client, client_kwargs, bucket, key) 

1179 

1180 if writebuffer is None: 

1181 self._buf = io.BytesIO() 

1182 elif not writebuffer.seekable(): 

1183 raise ValueError('writebuffer needs to be seekable') 

1184 else: 

1185 self._buf = writebuffer 

1186 

1187 def flush(self): 

1188 pass 

1189 

1190 # 

1191 # Override some methods from io.IOBase. 

1192 # 

1193 def close(self): 

1194 logger.debug("close: called") 

1195 if self.closed: 

1196 return 

1197 

1198 self.seek(0) 

1199 

1200 try: 

1201 self._client.put_object( 

1202 Bucket=self._bucket, 

1203 Key=self._key, 

1204 Body=self._buf, 

1205 ) 

1206 except botocore.client.ClientError as e: 

1207 raise ValueError( 

1208 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e 

1209 

1210 logger.debug("%s: direct upload finished", self) 

1211 self._buf.close() 

1212 

1213 @property 

1214 def closed(self): 

1215 return self._buf is None or self._buf.closed 

1216 

1217 def readable(self): 

1218 """Propagate.""" 

1219 return self._buf.readable() 

1220 

1221 def writable(self): 

1222 """Propagate.""" 

1223 return self._buf.writable() 

1224 

1225 def seekable(self): 

1226 """Propagate.""" 

1227 return self._buf.seekable() 

1228 

1229 def seek(self, offset, whence=constants.WHENCE_START): 

1230 """Propagate.""" 

1231 return self._buf.seek(offset, whence) 

1232 

1233 def truncate(self, size=None): 

1234 """Propagate.""" 

1235 return self._buf.truncate(size) 

1236 

1237 def tell(self): 

1238 """Propagate.""" 

1239 return self._buf.tell() 

1240 

1241 def write(self, b): 

1242 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1243 interface implementation) into the buffer. Content of the buffer will be 

1244 written to S3 on close as a single-part upload. 

1245 

1246 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html""" 

1247 return self._buf.write(b) 

1248 

1249 def read(self, size=-1): 

1250 """Propagate.""" 

1251 return self._buf.read(size) 

1252 

1253 def read1(self, size=-1): 

1254 """Propagate.""" 

1255 return self._buf.read1(size) 

1256 

1257 def terminate(self): 

1258 """Close buffer and skip upload.""" 

1259 self._buf.close() 

1260 logger.debug('%s: terminated singlepart upload', self) 

1261 

1262 # 

1263 # Internal methods. 

1264 # 

1265 def __enter__(self): 

1266 return self 

1267 

1268 def __exit__(self, exc_type, exc_val, exc_tb): 

1269 if exc_type is not None: 

1270 self.terminate() 

1271 else: 

1272 self.close() 

1273 

1274 def __str__(self): 

1275 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key) 

1276 

1277 def __repr__(self): 

1278 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) 

1279 

1280 

1281def _accept_all(key): 

1282 return True 

1283 

1284 

1285def iter_bucket( 

1286 bucket_name, 

1287 prefix='', 

1288 accept_key=None, 

1289 key_limit=None, 

1290 workers=16, 

1291 retries=3, 

1292 **session_kwargs): 

1293 """ 

1294 Iterate and download all S3 objects under `s3://bucket_name/prefix`. 

1295 

1296 Parameters 

1297 ---------- 

1298 bucket_name: str 

1299 The name of the bucket. 

1300 prefix: str, optional 

1301 Limits the iteration to keys starting with the prefix. 

1302 accept_key: callable, optional 

1303 This is a function that accepts a key name (unicode string) and 

1304 returns True/False, signalling whether the given key should be downloaded. 

1305 The default behavior is to accept all keys. 

1306 key_limit: int, optional 

1307 If specified, the iterator will stop after yielding this many results. 

1308 workers: int, optional 

1309 The number of subprocesses to use. 

1310 retries: int, optional 

1311 The number of time to retry a failed download. 

1312 session_kwargs: dict, optional 

1313 Keyword arguments to pass when creating a new session. 

1314 For a list of available names and values, see: 

1315 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session 

1316 

1317 

1318 Yields 

1319 ------ 

1320 str 

1321 The full key name (does not include the bucket name). 

1322 bytes 

1323 The full contents of the key. 

1324 

1325 Notes 

1326 ----- 

1327 The keys are processed in parallel, using `workers` processes (default: 16), 

1328 to speed up downloads greatly. If multiprocessing is not available, thus 

1329 _MULTIPROCESSING is False, this parameter will be ignored. 

1330 

1331 Examples 

1332 -------- 

1333 

1334 >>> # get all JSON files under "mybucket/foo/" 

1335 >>> for key, content in iter_bucket( 

1336 ... bucket_name, prefix='foo/', 

1337 ... accept_key=lambda key: key.endswith('.json')): 

1338 ... print key, len(content) 

1339 

1340 >>> # limit to 10k files, using 32 parallel workers (default is 16) 

1341 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32): 

1342 ... print key, len(content) 

1343 """ 

1344 if accept_key is None: 

1345 accept_key = _accept_all 

1346 

1347 # 

1348 # If people insist on giving us bucket instances, silently extract the name 

1349 # before moving on. Works for boto3 as well as boto. 

1350 # 

1351 try: 

1352 bucket_name = bucket_name.name 

1353 except AttributeError: 

1354 pass 

1355 

1356 total_size, key_no = 0, -1 

1357 key_iterator = _list_bucket( 

1358 bucket_name, 

1359 prefix=prefix, 

1360 accept_key=accept_key, 

1361 **session_kwargs) 

1362 download_key = functools.partial( 

1363 _download_key, 

1364 bucket_name=bucket_name, 

1365 retries=retries, 

1366 **session_kwargs) 

1367 

1368 with smart_open.concurrency.create_pool(processes=workers) as pool: 

1369 result_iterator = pool.imap_unordered(download_key, key_iterator) 

1370 key_no = 0 

1371 while True: 

1372 try: 

1373 (key, content) = result_iterator.__next__() 

1374 if key_no % 1000 == 0: 

1375 logger.info( 

1376 "yielding key #%i: %s, size %i (total %.1fMB)", 

1377 key_no, key, len(content), total_size / 1024.0 ** 2 

1378 ) 

1379 yield key, content 

1380 total_size += len(content) 

1381 if key_limit is not None and key_no + 1 >= key_limit: 

1382 # we were asked to output only a limited number of keys => we're done 

1383 break 

1384 except botocore.exceptions.ClientError as err: 

1385 # 

1386 # ignore 404 not found errors: they mean the object was deleted 

1387 # after we listed the contents of the bucket, but before we 

1388 # downloaded the object. 

1389 # 

1390 if not ('Error' in err.response and err.response['Error'].get('Code') == '404'): 

1391 raise err 

1392 except StopIteration: 

1393 break 

1394 key_no += 1 

1395 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size)) 

1396 

1397 

1398def _list_bucket( 

1399 bucket_name, 

1400 prefix='', 

1401 accept_key=lambda k: True, 

1402 **session_kwargs): 

1403 session = boto3.session.Session(**session_kwargs) 

1404 client = session.client('s3') 

1405 ctoken = None 

1406 

1407 while True: 

1408 # list_objects_v2 doesn't like a None value for ContinuationToken 

1409 # so we don't set it if we don't have one. 

1410 if ctoken: 

1411 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken) 

1412 else: 

1413 kwargs = dict(Bucket=bucket_name, Prefix=prefix) 

1414 response = client.list_objects_v2(**kwargs) 

1415 try: 

1416 content = response['Contents'] 

1417 except KeyError: 

1418 pass 

1419 else: 

1420 for c in content: 

1421 key = c['Key'] 

1422 if accept_key(key): 

1423 yield key 

1424 ctoken = response.get('NextContinuationToken', None) 

1425 if not ctoken: 

1426 break 

1427 

1428 

1429def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs): 

1430 if bucket_name is None: 

1431 raise ValueError('bucket_name may not be None') 

1432 

1433 # 

1434 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-or-multiprocessing-with-resources 

1435 # 

1436 session = boto3.session.Session(**session_kwargs) 

1437 s3 = session.resource('s3') 

1438 bucket = s3.Bucket(bucket_name) 

1439 

1440 # Sometimes, https://github.com/boto/boto/issues/2409 can happen 

1441 # because of network issues on either side. 

1442 # Retry up to 3 times to ensure its not a transient issue. 

1443 for x in range(retries + 1): 

1444 try: 

1445 content_bytes = _download_fileobj(bucket, key_name) 

1446 except botocore.client.ClientError: 

1447 # Actually fail on last pass through the loop 

1448 if x == retries: 

1449 raise 

1450 # Otherwise, try again, as this might be a transient timeout 

1451 pass 

1452 else: 

1453 return key_name, content_bytes 

1454 

1455 

1456def _download_fileobj(bucket, key_name): 

1457 # 

1458 # This is a separate function only because it makes it easier to inject 

1459 # exceptions during tests. 

1460 # 

1461 buf = io.BytesIO() 

1462 bucket.download_fileobj(key_name, buf) 

1463 return buf.getvalue()