Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/s3.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

615 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8"""Implements file-like objects for reading and writing from/to AWS S3.""" 

9from __future__ import annotations 

10 

11import http 

12import io 

13import functools 

14import itertools 

15import logging 

16import time 

17import warnings 

18from math import inf 

19 

20from typing import ( 

21 Callable, 

22 List, 

23 TYPE_CHECKING, 

24) 

25 

26try: 

27 import boto3 

28 import botocore.client 

29 import botocore.exceptions 

30 import urllib3.exceptions 

31except ImportError: 

32 MISSING_DEPS = True 

33 

34import smart_open.bytebuffer 

35import smart_open.concurrency 

36import smart_open.utils 

37 

38from smart_open import constants 

39 

40 

41if TYPE_CHECKING: 

42 from mypy_boto3_s3.client import S3Client 

43 from typing_extensions import Buffer 

44 

45logger = logging.getLogger(__name__) 

46 

47# 

48# AWS puts restrictions on the part size for multipart uploads. 

49# Each part must be more than 5MB, and less than 5GB. 

50# 

51# On top of that, our MultipartWriter has a min_part_size option. 

52# In retrospect, it's an unfortunate name, because it conflicts with the 

53# minimum allowable part size (5MB), but it's too late to change it, because 

54# people are using that parameter (unlike the MIN, DEFAULT, MAX constants). 

55# It really just means "part size": as soon as you have this many bytes, 

56# write a part to S3 (see the MultipartWriter.write method). 

57# 

58 

59MIN_PART_SIZE = 5 * 1024 ** 2 

60"""The absolute minimum permitted by Amazon.""" 

61 

62DEFAULT_PART_SIZE = 50 * 1024**2 

63"""The default part size for S3 multipart uploads, chosen carefully by smart_open""" 

64 

65MAX_PART_SIZE = 5 * 1024 ** 3 

66"""The absolute maximum permitted by Amazon.""" 

67 

68SCHEMES = ("s3", "s3n", 's3u', "s3a") 

69DEFAULT_PORT = 443 

70DEFAULT_HOST = 's3.amazonaws.com' 

71 

72DEFAULT_BUFFER_SIZE = 128 * 1024 

73 

74URI_EXAMPLES = ( 

75 's3://my_bucket/my_key', 

76 's3://my_key:my_secret@my_bucket/my_key', 

77 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key', 

78) 

79 

80# Returned by AWS when we try to seek beyond EOF. 

81_OUT_OF_RANGE = 'InvalidRange' 

82 

83 

84class Retry: 

85 def __init__(self): 

86 self.attempts: int = 6 

87 self.sleep_seconds: int = 10 

88 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError] 

89 self.client_error_codes: List[str] = ['NoSuchUpload'] 

90 

91 def _do(self, fn: Callable): 

92 for attempt in range(self.attempts): 

93 try: 

94 return fn() 

95 except tuple(self.exceptions) as err: 

96 logger.critical( 

97 'Caught non-fatal %s, retrying %d more times', 

98 err, 

99 self.attempts - attempt - 1, 

100 ) 

101 logger.exception(err) 

102 time.sleep(self.sleep_seconds) 

103 except botocore.exceptions.ClientError as err: 

104 error_code = err.response['Error'].get('Code') 

105 if error_code not in self.client_error_codes: 

106 raise 

107 logger.critical( 

108 'Caught non-fatal ClientError (%s), retrying %d more times', 

109 error_code, 

110 self.attempts - attempt - 1, 

111 ) 

112 logger.exception(err) 

113 time.sleep(self.sleep_seconds) 

114 else: 

115 logger.critical('encountered too many non-fatal errors, giving up') 

116 raise IOError('%s failed after %d attempts', fn.func, self.attempts) 

117 

118 

119# 

120# The retry mechanism for this submodule. Client code may modify it, e.g. by 

121# updating RETRY.sleep_seconds and friends. 

122# 

123if 'MISSING_DEPS' not in locals(): 

124 RETRY = Retry() 

125 

126 

127class _ClientWrapper: 

128 """Wraps a client to inject the appropriate keyword args into each method call. 

129 

130 The keyword args are a dictionary keyed by the fully qualified method name. 

131 For example, S3.Client.create_multipart_upload. 

132 

133 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client 

134 

135 This wrapper behaves identically to the client otherwise. 

136 """ 

137 def __init__(self, client, kwargs): 

138 self.client = client 

139 self.kwargs = kwargs 

140 

141 def __getattr__(self, method_name): 

142 method = getattr(self.client, method_name) 

143 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {}) 

144 return functools.partial(method, **kwargs) 

145 

146 

147def parse_uri(uri_as_string): 

148 # 

149 # Restrictions on bucket names and labels: 

150 # 

151 # - Bucket names must be at least 3 and no more than 63 characters long. 

152 # - Bucket names must be a series of one or more labels. 

153 # - Adjacent labels are separated by a single period (.). 

154 # - Bucket names can contain lowercase letters, numbers, and hyphens. 

155 # - Each label must start and end with a lowercase letter or a number. 

156 # 

157 # We use the above as a guide only, and do not perform any validation. We 

158 # let boto3 take care of that for us. 

159 # 

160 split_uri = smart_open.utils.safe_urlsplit(uri_as_string) 

161 assert split_uri.scheme in SCHEMES 

162 

163 port = DEFAULT_PORT 

164 host = DEFAULT_HOST 

165 ordinary_calling_format = False 

166 # 

167 # These defaults tell boto3 to look for credentials elsewhere 

168 # 

169 access_id, access_secret = None, None 

170 

171 # 

172 # Common URI template [secret:key@][host[:port]@]bucket/object 

173 # 

174 # The urlparse function doesn't handle the above schema, so we have to do 

175 # it ourselves. 

176 # 

177 uri = split_uri.netloc + split_uri.path 

178 

179 # 

180 # Attempt to extract edge-case authentication details from the URL. 

181 # 

182 # See: 

183 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/ 

184 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases 

185 # 

186 if '@' in uri: 

187 maybe_auth, rest = uri.split('@', 1) 

188 if ':' in maybe_auth: 

189 maybe_id, maybe_secret = maybe_auth.split(':', 1) 

190 if '/' not in maybe_id: 

191 access_id, access_secret = maybe_id, maybe_secret 

192 uri = rest 

193 

194 head, key_id = uri.split('/', 1) 

195 if '@' in head and ':' in head: 

196 ordinary_calling_format = True 

197 host_port, bucket_id = head.split('@') 

198 host, port = host_port.split(':', 1) 

199 port = int(port) 

200 elif '@' in head: 

201 ordinary_calling_format = True 

202 host, bucket_id = head.split('@') 

203 else: 

204 bucket_id = head 

205 

206 return dict( 

207 scheme=split_uri.scheme, 

208 bucket_id=bucket_id, 

209 key_id=key_id, 

210 port=port, 

211 host=host, 

212 ordinary_calling_format=ordinary_calling_format, 

213 access_id=access_id, 

214 access_secret=access_secret, 

215 ) 

216 

217 

218def _consolidate_params(uri, transport_params): 

219 """Consolidates the parsed Uri with the additional parameters. 

220 

221 This is necessary because the user can pass some of the parameters can in 

222 two different ways: 

223 

224 1) Via the URI itself 

225 2) Via the transport parameters 

226 

227 These are not mutually exclusive, but we have to pick one over the other 

228 in a sensible way in order to proceed. 

229 

230 """ 

231 transport_params = dict(transport_params) 

232 

233 def inject(**kwargs): 

234 try: 

235 client_kwargs = transport_params['client_kwargs'] 

236 except KeyError: 

237 client_kwargs = transport_params['client_kwargs'] = {} 

238 

239 try: 

240 init_kwargs = client_kwargs['S3.Client'] 

241 except KeyError: 

242 init_kwargs = client_kwargs['S3.Client'] = {} 

243 

244 init_kwargs.update(**kwargs) 

245 

246 client = transport_params.get('client') 

247 if client is not None and (uri['access_id'] or uri['access_secret']): 

248 logger.warning( 

249 'ignoring credentials parsed from URL because they conflict with ' 

250 'transport_params["client"]. Set transport_params["client"] to None ' 

251 'to suppress this warning.' 

252 ) 

253 uri.update(access_id=None, access_secret=None) 

254 elif (uri['access_id'] and uri['access_secret']): 

255 inject( 

256 aws_access_key_id=uri['access_id'], 

257 aws_secret_access_key=uri['access_secret'], 

258 ) 

259 uri.update(access_id=None, access_secret=None) 

260 

261 if client is not None and uri['host'] != DEFAULT_HOST: 

262 logger.warning( 

263 'ignoring endpoint_url parsed from URL because they conflict with ' 

264 'transport_params["client"]. Set transport_params["client"] to None ' 

265 'to suppress this warning.' 

266 ) 

267 uri.update(host=None) 

268 elif uri['host'] != DEFAULT_HOST: 

269 if uri['scheme'] == 's3u': 

270 scheme = 'http' 

271 else: 

272 scheme = 'https' 

273 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri) 

274 uri.update(host=None) 

275 

276 return uri, transport_params 

277 

278 

279def open_uri(uri, mode, transport_params): 

280 deprecated = ( 

281 'multipart_upload_kwargs', 

282 'object_kwargs', 

283 'resource', 

284 'resource_kwargs', 

285 'session', 

286 'singlepart_upload_kwargs', 

287 ) 

288 detected = [k for k in deprecated if k in transport_params] 

289 if detected: 

290 doc_url = ( 

291 'https://github.com/piskvorky/smart_open/blob/develop/' 

292 'MIGRATING_FROM_OLDER_VERSIONS.rst' 

293 ) 

294 # 

295 # We use warnings.warn /w UserWarning instead of logger.warn here because 

296 # 

297 # 1) Not everyone has logging enabled; and 

298 # 2) check_kwargs (below) already uses logger.warn with a similar message 

299 # 

300 # https://github.com/piskvorky/smart_open/issues/614 

301 # 

302 message = ( 

303 'ignoring the following deprecated transport parameters: %r. ' 

304 'See <%s> for details' % (detected, doc_url) 

305 ) 

306 warnings.warn(message, UserWarning) 

307 parsed_uri = parse_uri(uri) 

308 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params) 

309 kwargs = smart_open.utils.check_kwargs(open, transport_params) 

310 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs) 

311 

312 

313def open( 

314 bucket_id, 

315 key_id, 

316 mode, 

317 version_id=None, 

318 buffer_size=DEFAULT_BUFFER_SIZE, 

319 min_part_size=DEFAULT_PART_SIZE, 

320 multipart_upload=True, 

321 defer_seek=False, 

322 client=None, 

323 client_kwargs=None, 

324 writebuffer=None, 

325 range_chunk_size=None, 

326): 

327 """Open an S3 object for reading or writing. 

328 

329 Parameters 

330 ---------- 

331 bucket_id: str 

332 The name of the bucket this object resides in. 

333 key_id: str 

334 The name of the key within the bucket. 

335 mode: str 

336 The mode for opening the object. Must be either "rb" or "wb". 

337 buffer_size: int, optional 

338 Default: 128KB 

339 The buffer size in bytes for reading. Controls memory usage. Data is streamed 

340 from a S3 network stream in buffer_size chunks. Forward seeks within 

341 the current buffer are satisfied without additional GET requests. Backward 

342 seeks always open a new GET request. For forward seek-intensive workloads, 

343 increase buffer_size to reduce GET requests at the cost of higher memory usage. 

344 min_part_size: int, optional 

345 The minimum part size for multipart uploads, in bytes. 

346 When the writebuffer contains this many bytes, smart_open will upload 

347 the bytes to S3 as a single part of a multi-part upload, freeing the 

348 buffer either partially or entirely. When you close the writer, it 

349 will assemble the parts together. 

350 The value determines the upper limit for the writebuffer. If buffer 

351 space is short (e.g. you are buffering to memory), then use a smaller 

352 value for min_part_size, or consider buffering to disk instead (see 

353 the writebuffer option). 

354 The value must be between 5MB and 5GB. If you specify a value outside 

355 of this range, smart_open will adjust it for you, because otherwise the 

356 upload _will_ fail. 

357 For writing only. Does not apply if you set multipart_upload=False. 

358 multipart_upload: bool, optional 

359 Default: `True` 

360 If set to `True`, will use multipart upload for writing to S3. If set 

361 to `False`, S3 upload will use the S3 Single-Part Upload API, which 

362 is more ideal for small file sizes. 

363 For writing only. 

364 version_id: str, optional 

365 Version of the object, used when reading object. 

366 If None, will fetch the most recent version. 

367 defer_seek: boolean, optional 

368 Default: `False` 

369 If set to `True` on a file opened for reading, GetObject will not be 

370 called until the first seek() or read(). 

371 Avoids redundant API queries when seeking before reading. 

372 range_chunk_size: int, optional 

373 Default: `None` 

374 Maximum byte range per S3 GET request when reading. 

375 When None (default), a single GET request is made for the entire file, 

376 and data is streamed from that single botocore.response.StreamingBody 

377 in buffer_size chunks. 

378 When set to a positive integer, multiple GET requests are made, each 

379 limited to at most this many bytes via HTTP Range headers. Each GET 

380 returns a new StreamingBody that is streamed in buffer_size chunks. 

381 Useful for reading small portions of large files without forcing 

382 S3-compatible systems like SeaweedFS/Ceph to load the entire file. 

383 Larger values mean fewer billable GET requests but higher load on S3 

384 servers. Smaller values mean more GET requests but less server load per request. 

385 Values larger than the file size result in a single GET for the whole file. 

386 Affects reading only. Does not affect memory usage (controlled by buffer_size). 

387 client: object, optional 

388 The S3 client to use when working with boto3. 

389 If you don't specify this, then smart_open will create a new client for you. 

390 client_kwargs: dict, optional 

391 Additional parameters to pass to the relevant functions of the client. 

392 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. 

393 The values are kwargs to pass to that method each time it is called. 

394 writebuffer: IO[bytes], optional 

395 By default, this module will buffer data in memory using io.BytesIO 

396 when writing. Pass another binary IO instance here to use it instead. 

397 For example, you may pass a file object to buffer to local disk instead 

398 of in RAM. Use this to keep RAM usage low at the expense of additional 

399 disk IO. If you pass in an open file, then you are responsible for 

400 cleaning it up after writing completes. 

401 """ 

402 logger.debug('%r', locals()) 

403 if mode not in constants.BINARY_MODES: 

404 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES)) 

405 

406 if (mode == constants.WRITE_BINARY) and (version_id is not None): 

407 raise ValueError("version_id must be None when writing") 

408 

409 if mode == constants.READ_BINARY: 

410 fileobj = Reader( 

411 bucket_id, 

412 key_id, 

413 version_id=version_id, 

414 buffer_size=buffer_size, 

415 defer_seek=defer_seek, 

416 client=client, 

417 client_kwargs=client_kwargs, 

418 range_chunk_size=range_chunk_size, 

419 ) 

420 elif mode == constants.WRITE_BINARY: 

421 if multipart_upload: 

422 fileobj = MultipartWriter( 

423 bucket_id, 

424 key_id, 

425 client=client, 

426 client_kwargs=client_kwargs, 

427 writebuffer=writebuffer, 

428 part_size=min_part_size, 

429 ) 

430 else: 

431 fileobj = SinglepartWriter( 

432 bucket_id, 

433 key_id, 

434 client=client, 

435 client_kwargs=client_kwargs, 

436 writebuffer=writebuffer, 

437 ) 

438 else: 

439 assert False, 'unexpected mode: %r' % mode 

440 

441 fileobj.name = key_id 

442 return fileobj 

443 

444 

445def _get(client, bucket, key, version, range_string): 

446 try: 

447 params = dict(Bucket=bucket, Key=key) 

448 if version: 

449 params["VersionId"] = version 

450 if range_string: 

451 params["Range"] = range_string 

452 

453 return client.get_object(**params) 

454 except botocore.client.ClientError as error: 

455 wrapped_error = IOError( 

456 'unable to access bucket: %r key: %r version: %r error: %s' % ( 

457 bucket, key, version, error 

458 ) 

459 ) 

460 wrapped_error.backend_error = error 

461 raise wrapped_error from error 

462 

463 

464def _unwrap_ioerror(ioe): 

465 """Given an IOError from _get, return the 'Error' dictionary from boto.""" 

466 try: 

467 return ioe.backend_error.response['Error'] 

468 except (AttributeError, KeyError): 

469 return None 

470 

471 

472class _SeekableRawReader(object): 

473 """Read an S3 object. 

474 

475 This class is internal to the S3 submodule. 

476 """ 

477 

478 def __init__( 

479 self, 

480 client, 

481 bucket, 

482 key, 

483 version_id=None, 

484 range_chunk_size=None, 

485 ): 

486 self._client = client 

487 self._bucket = bucket 

488 self._key = key 

489 self._version_id = version_id 

490 self._range_chunk_size = range_chunk_size 

491 

492 self._content_length = None 

493 self._position = 0 

494 self._body = None 

495 

496 @property 

497 def closed(self): 

498 return self._body is None 

499 

500 def close(self): 

501 if not self.closed: 

502 self._body.close() 

503 self._body = None 

504 

505 def seek(self, offset, whence=constants.WHENCE_START): 

506 """Seek to the specified position. 

507 

508 :param int offset: The offset in bytes. 

509 :param int whence: Where the offset is from. 

510 

511 :returns: the position after seeking. 

512 :rtype: int 

513 """ 

514 if whence not in constants.WHENCE_CHOICES: 

515 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES) 

516 

517 # 

518 # Close old body explicitly. 

519 # 

520 self.close() 

521 

522 start = None 

523 stop = None 

524 if whence == constants.WHENCE_START: 

525 start = max(0, offset) 

526 elif whence == constants.WHENCE_CURRENT: 

527 start = max(0, offset + self._position) 

528 elif whence == constants.WHENCE_END: 

529 stop = max(0, -offset) 

530 

531 # 

532 # If we can figure out that we've read past the EOF, then we can save 

533 # an extra API call. 

534 # 

535 if self._content_length is None: # _open_body has not been called yet 

536 if start is None and stop == 0: 

537 # seek(0, WHENCE_END) seeks straight to EOF: 

538 # make a minimal request to populate _content_length 

539 self._open_body(start=0, stop=0) 

540 self.close() 

541 reached_eof = True 

542 else: 

543 reached_eof = False 

544 elif start is not None and start >= self._content_length: 

545 reached_eof = True 

546 elif stop == 0: 

547 reached_eof = True 

548 else: 

549 reached_eof = False 

550 

551 if reached_eof: 

552 self._body = io.BytesIO() 

553 self._position = self._content_length 

554 else: 

555 self._open_body(start, stop) 

556 

557 return self._position 

558 

559 def _open_body(self, start=None, stop=None): 

560 """Open a connection to download the specified range of bytes. Store 

561 the open file handle in self._body. 

562 

563 If no range is specified, start defaults to self._position. 

564 start and stop follow the semantics of the http range header, 

565 so a stop without a start will read bytes beginning at stop. 

566 

567 If self._range_chunk_size is set, the S3 server is protected from open range 

568 headers and stop will be set such that at most self._range_chunk_size bytes 

569 are returned in a single GET request. 

570 

571 As a side effect, set self._content_length. Set self._position 

572 to self._content_length if start is past end of file. 

573 """ 

574 if start is None and stop is None: 

575 start = self._position 

576 

577 # Apply chunking: limit the stop position if range_chunk_size is set 

578 if stop is None and self._range_chunk_size is not None: 

579 stop = start + self._range_chunk_size - 1 

580 # Don't request beyond known content length 

581 if self._content_length is not None: 

582 stop = min(stop, self._content_length - 1) 

583 

584 range_string = smart_open.utils.make_range_string(start, stop) 

585 

586 try: 

587 # Optimistically try to fetch the requested content range. 

588 response = _get( 

589 self._client, 

590 self._bucket, 

591 self._key, 

592 self._version_id, 

593 range_string, 

594 ) 

595 except IOError as ioe: 

596 # Handle requested content range exceeding content size. 

597 error_response = _unwrap_ioerror(ioe) 

598 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE: 

599 raise 

600 

601 actual_object_size = int(error_response.get('ActualObjectSize', 0)) 

602 if ( 

603 # empty file (==) or start is past end of file (>) 

604 (start is not None and start >= actual_object_size) 

605 # negative seek requested more bytes than file has 

606 or (start is None and stop is not None and stop >= actual_object_size) 

607 ): 

608 self._position = self._content_length = actual_object_size 

609 self._body = io.BytesIO() 

610 else: # stop is past end of file: request the correct remainder instead 

611 self._open_body(start=start, stop=actual_object_size - 1) 

612 return 

613 

614 # 

615 # Keep track of how many times boto3's built-in retry mechanism 

616 # activated. 

617 # 

618 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response 

619 # 

620 logger.debug( 

621 '%s: RetryAttempts: %d', 

622 self, 

623 response['ResponseMetadata']['RetryAttempts'], 

624 ) 

625 # 

626 # range request may not always return partial content, see: 

627 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses 

628 # 

629 status_code = response['ResponseMetadata']['HTTPStatusCode'] 

630 if status_code == http.HTTPStatus.PARTIAL_CONTENT: 

631 # 206 guarantees that the response body only contains the requested byte range 

632 _, resp_start, _, length = smart_open.utils.parse_content_range(response['ContentRange']) 

633 self._position = resp_start 

634 self._content_length = length 

635 self._body = response['Body'] 

636 elif status_code == http.HTTPStatus.OK: 

637 # 200 guarantees the response body contains the full file (server ignored range header) 

638 self._position = 0 

639 self._content_length = response["ContentLength"] 

640 self._body = response['Body'] 

641 # 

642 # If we got a full request when we were actually expecting a range, we need to 

643 # read some data to ensure that the body starts in the place that the caller expects 

644 # 

645 if start is not None: 

646 expected_position = min(self._content_length, start) 

647 elif start is None and stop is not None: 

648 expected_position = max(0, self._content_length - stop) 

649 else: 

650 expected_position = 0 

651 if expected_position > 0: 

652 logger.debug( 

653 '%s: discarding %d bytes to reach expected position', 

654 self, 

655 expected_position, 

656 ) 

657 self._position = len(self._body.read(expected_position)) 

658 else: 

659 raise ValueError("Unexpected status code %r" % status_code) 

660 

661 def read(self, size=-1): 

662 """Read from the continuous connection with the remote peer.""" 

663 if size < -1: 

664 raise ValueError(f'size must be >= -1, got {size}') 

665 

666 if size == -1: 

667 size = inf # makes for a simple while-condition below 

668 

669 binary_collected = io.BytesIO() 

670 

671 # 

672 # Boto3 has built-in error handling and retry mechanisms: 

673 # 

674 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html 

675 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

676 # 

677 # Unfortunately, it isn't always enough. There is still a non-zero 

678 # possibility that an exception will slip past these mechanisms and 

679 # terminate the read prematurely. Luckily, at this stage, it's very 

680 # simple to recover from the problem: wait a little bit, reopen the 

681 # HTTP connection and try again. Usually, a single retry attempt is 

682 # enough to recover, but we try multiple times "just in case". 

683 # 

684 def retry_read(attempts=(1, 2, 4, 8, 16)) -> bytes: 

685 for seconds in attempts: 

686 if self.closed: 

687 self._open_body() 

688 try: 

689 if size == inf: 

690 return self._body.read() 

691 return self._body.read(size - binary_collected.tell()) 

692 except ( 

693 ConnectionResetError, 

694 botocore.exceptions.BotoCoreError, 

695 urllib3.exceptions.HTTPError, 

696 ) as err: 

697 logger.warning( 

698 '%s: caught %r while reading %d bytes, sleeping %ds before retry', 

699 self, 

700 err, 

701 -1 if size == inf else size, 

702 seconds, 

703 ) 

704 self.close() 

705 time.sleep(seconds) 

706 raise IOError( 

707 '%s: failed to read %d bytes after %d attempts' % 

708 (self, -1 if size == inf else size, len(attempts)), 

709 ) 

710 

711 while ( 

712 self._content_length is None # very first read call 

713 or ( 

714 self._position < self._content_length # not yet end of file 

715 and binary_collected.tell() < size # not yet read enough 

716 ) 

717 ): 

718 binary = retry_read() 

719 self._position += len(binary) 

720 binary_collected.write(binary) 

721 if not binary: # end of stream 

722 self.close() 

723 

724 return binary_collected.getvalue() 

725 

726 def __str__(self): 

727 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) 

728 

729 

730def _initialize_boto3(rw, client, client_kwargs, bucket, key): 

731 """Created the required objects for accessing S3. Ideally, they have 

732 been already created for us and we can just reuse them.""" 

733 if client_kwargs is None: 

734 client_kwargs = {} 

735 

736 if client is None: 

737 init_kwargs = client_kwargs.get('S3.Client', {}) 

738 if 'config' not in init_kwargs: 

739 init_kwargs['config'] = botocore.client.Config( 

740 max_pool_connections=64, 

741 tcp_keepalive=True, 

742 retries={"max_attempts": 6, "mode": "adaptive"} 

743 ) 

744 # boto3.client re-uses the default session which is not thread-safe when this is called 

745 # from within a thread. when using smart_open with multithreading, create a thread-safe 

746 # client with the config above and share it between threads using transport_params 

747 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111 

748 client = boto3.client('s3', **init_kwargs) 

749 assert client 

750 

751 rw._client = _ClientWrapper(client, client_kwargs) 

752 rw._bucket = bucket 

753 rw._key = key 

754 

755 

756class Reader(io.BufferedIOBase): 

757 """Reads bytes from S3. 

758 

759 Implements the io.BufferedIOBase interface of the standard library.""" 

760 

761 def __init__( 

762 self, 

763 bucket, 

764 key, 

765 version_id=None, 

766 buffer_size=DEFAULT_BUFFER_SIZE, 

767 line_terminator=constants.BINARY_NEWLINE, 

768 defer_seek=False, 

769 client=None, 

770 client_kwargs=None, 

771 range_chunk_size=None, 

772 ): 

773 self._version_id = version_id 

774 self._buffer_size = buffer_size 

775 

776 _initialize_boto3(self, client, client_kwargs, bucket, key) 

777 

778 self._raw_reader = _SeekableRawReader( 

779 self._client, 

780 bucket, 

781 key, 

782 self._version_id, 

783 range_chunk_size=range_chunk_size, 

784 ) 

785 self._current_pos = 0 

786 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) 

787 self._eof = False 

788 self._line_terminator = line_terminator 

789 self._seek_initialized = False 

790 

791 if not defer_seek: 

792 self.seek(0) 

793 

794 # 

795 # io.BufferedIOBase methods. 

796 # 

797 

798 def close(self): 

799 """Flush and close this stream.""" 

800 logger.debug("close: called") 

801 pass 

802 

803 def readable(self): 

804 """Return True if the stream can be read from.""" 

805 return True 

806 

807 def read(self, size=-1): 

808 """Read up to size bytes from the object and return them.""" 

809 if size == 0: 

810 return b'' 

811 elif size < 0: 

812 # call read() before setting _current_pos to make sure _content_length is set 

813 out = self._read_from_buffer() + self._raw_reader.read() 

814 self._current_pos = self._raw_reader._content_length 

815 return out 

816 

817 # 

818 # Return unused data first 

819 # 

820 if len(self._buffer) >= size: 

821 return self._read_from_buffer(size) 

822 

823 # 

824 # If the stream is finished, return what we have. 

825 # 

826 if self._eof: 

827 return self._read_from_buffer() 

828 

829 self._fill_buffer(size) 

830 return self._read_from_buffer(size) 

831 

832 def read1(self, size=-1): 

833 """This is the same as read().""" 

834 return self.read(size=size) 

835 

836 def readinto(self, b): 

837 """Read up to len(b) bytes into b, and return the number of bytes 

838 read.""" 

839 data = self.read(len(b)) 

840 if not data: 

841 return 0 

842 b[:len(data)] = data 

843 return len(data) 

844 

845 def readline(self, limit=-1): 

846 """Read up to and including the next newline. Returns the bytes read.""" 

847 if limit != -1: 

848 raise NotImplementedError('limits other than -1 not implemented yet') 

849 

850 # 

851 # A single line may span multiple buffers. 

852 # 

853 line = io.BytesIO() 

854 while not (self._eof and len(self._buffer) == 0): 

855 line_part = self._buffer.readline(self._line_terminator) 

856 line.write(line_part) 

857 self._current_pos += len(line_part) 

858 

859 if line_part.endswith(self._line_terminator): 

860 break 

861 else: 

862 self._fill_buffer() 

863 

864 return line.getvalue() 

865 

866 def seekable(self): 

867 """If False, seek(), tell() and truncate() will raise IOError. 

868 

869 We offer only seek support, and no truncate support.""" 

870 return True 

871 

872 def seek(self, offset, whence=constants.WHENCE_START): 

873 """Seek to the specified position. 

874 

875 :param int offset: The offset in bytes. 

876 :param int whence: Where the offset is from. 

877 

878 Returns the position after seeking.""" 

879 # Convert relative offset to absolute, since self._raw_reader 

880 # doesn't know our current position. 

881 if whence == constants.WHENCE_CURRENT: 

882 whence = constants.WHENCE_START 

883 offset += self._current_pos 

884 

885 # Check if we can satisfy seek from buffer 

886 if whence == constants.WHENCE_START and offset > self._current_pos: 

887 buffer_end = self._current_pos + len(self._buffer) 

888 if offset <= buffer_end: 

889 # Forward seek within buffered data - avoid S3 request 

890 self._buffer.read(offset - self._current_pos) 

891 self._current_pos = offset 

892 return self._current_pos 

893 

894 if not self._seek_initialized or not ( 

895 whence == constants.WHENCE_START and offset == self._current_pos 

896 ): 

897 self._current_pos = self._raw_reader.seek(offset, whence) 

898 self._buffer.empty() 

899 

900 self._eof = self._current_pos == self._raw_reader._content_length 

901 

902 self._seek_initialized = True 

903 return self._current_pos 

904 

905 def tell(self): 

906 """Return the current position within the file.""" 

907 return self._current_pos 

908 

909 def truncate(self, size=None): 

910 """Unsupported.""" 

911 raise io.UnsupportedOperation 

912 

913 def detach(self): 

914 """Unsupported.""" 

915 raise io.UnsupportedOperation 

916 

917 def terminate(self): 

918 """Do nothing.""" 

919 pass 

920 

921 def to_boto3(self, resource): 

922 """Create an **independent** `boto3.s3.Object` instance that points to 

923 the same S3 object as this instance. 

924 Changes to the returned object will not affect the current instance. 

925 """ 

926 assert resource, 'resource must be a boto3.resource instance' 

927 obj = resource.Object(self._bucket, self._key) 

928 if self._version_id is not None: 

929 return obj.Version(self._version_id) 

930 else: 

931 return obj 

932 

933 # 

934 # Internal methods. 

935 # 

936 def _read_from_buffer(self, size=-1): 

937 """Remove at most size bytes from our buffer and return them.""" 

938 size = size if size >= 0 else len(self._buffer) 

939 part = self._buffer.read(size) 

940 self._current_pos += len(part) 

941 return part 

942 

943 def _fill_buffer(self, size=-1): 

944 size = max(size, self._buffer._chunk_size) 

945 while len(self._buffer) < size and not self._eof: 

946 bytes_read = self._buffer.fill(self._raw_reader) 

947 if bytes_read == 0: 

948 logger.debug('%s: reached EOF while filling buffer', self) 

949 self._eof = True 

950 

951 def __str__(self): 

952 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key) 

953 

954 def __repr__(self): 

955 return ( 

956 "smart_open.s3.Reader(" 

957 "bucket=%r, " 

958 "key=%r, " 

959 "version_id=%r, " 

960 "buffer_size=%r, " 

961 "line_terminator=%r)" 

962 ) % ( 

963 self._bucket, 

964 self._key, 

965 self._version_id, 

966 self._buffer_size, 

967 self._line_terminator, 

968 ) 

969 

970 

971class MultipartWriter(io.BufferedIOBase): 

972 """Writes bytes to S3 using the multi part API. 

973 

974 Implements the io.BufferedIOBase interface of the standard library.""" 

975 _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called 

976 

977 def __init__( 

978 self, 

979 bucket, 

980 key, 

981 part_size=DEFAULT_PART_SIZE, 

982 client=None, 

983 client_kwargs=None, 

984 writebuffer: io.BytesIO | None = None, 

985 ): 

986 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE) 

987 if part_size != adjusted_ps: 

988 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}") 

989 part_size = adjusted_ps 

990 self._part_size = part_size 

991 

992 _initialize_boto3(self, client, client_kwargs, bucket, key) 

993 self._client: S3Client 

994 self._bucket: str 

995 self._key: str 

996 

997 try: 

998 partial = functools.partial( 

999 self._client.create_multipart_upload, 

1000 Bucket=bucket, 

1001 Key=key, 

1002 ) 

1003 self._upload_id = RETRY._do(partial)['UploadId'] 

1004 except botocore.client.ClientError as error: 

1005 raise ValueError( 

1006 'the bucket %r does not exist, or is forbidden for access (%r)' % ( 

1007 bucket, error 

1008 ) 

1009 ) from error 

1010 

1011 if writebuffer is None: 

1012 self._buf = io.BytesIO() 

1013 else: 

1014 self._buf = writebuffer 

1015 

1016 self._total_bytes = 0 

1017 self._total_parts = 0 

1018 self._parts: list[dict[str, object]] = [] 

1019 

1020 def flush(self): 

1021 pass 

1022 

1023 # 

1024 # Override some methods from io.IOBase. 

1025 # 

1026 def close(self): 

1027 logger.debug("close: called") 

1028 if self.closed: 

1029 return 

1030 

1031 if self._buf.tell(): 

1032 self._upload_next_part() 

1033 

1034 logger.debug('%s: completing multipart upload', self) 

1035 if self._total_bytes and self._upload_id: 

1036 partial = functools.partial( 

1037 self._client.complete_multipart_upload, 

1038 Bucket=self._bucket, 

1039 Key=self._key, 

1040 UploadId=self._upload_id, 

1041 MultipartUpload={'Parts': self._parts}, 

1042 ) 

1043 RETRY._do(partial) 

1044 logger.debug('%s: completed multipart upload', self) 

1045 elif self._upload_id: 

1046 # 

1047 # AWS complains with "The XML you provided was not well-formed or 

1048 # did not validate against our published schema" when the input is 

1049 # completely empty => abort the upload, no file created. 

1050 # 

1051 # We work around this by creating an empty file explicitly. 

1052 # 

1053 self._client.abort_multipart_upload( 

1054 Bucket=self._bucket, 

1055 Key=self._key, 

1056 UploadId=self._upload_id, 

1057 ) 

1058 self._client.put_object( 

1059 Bucket=self._bucket, 

1060 Key=self._key, 

1061 Body=b'', 

1062 ) 

1063 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) 

1064 self._upload_id = None 

1065 

1066 @property 

1067 def closed(self): 

1068 return self._upload_id is None 

1069 

1070 def writable(self): 

1071 """Return True if the stream supports writing.""" 

1072 return True 

1073 

1074 def seekable(self): 

1075 """If False, seek(), tell() and truncate() will raise IOError. 

1076 

1077 We offer only tell support, and no seek or truncate support.""" 

1078 return True 

1079 

1080 def seek(self, offset, whence=constants.WHENCE_START): 

1081 """Unsupported.""" 

1082 raise io.UnsupportedOperation 

1083 

1084 def truncate(self, size=None): 

1085 """Unsupported.""" 

1086 raise io.UnsupportedOperation 

1087 

1088 def tell(self): 

1089 """Return the current stream position.""" 

1090 return self._total_bytes 

1091 

1092 # 

1093 # io.BufferedIOBase methods. 

1094 # 

1095 def detach(self): 

1096 raise io.UnsupportedOperation("detach() not supported") 

1097 

1098 def write(self, b: Buffer) -> int: 

1099 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1100 interface implementation) to the S3 file. 

1101 

1102 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html 

1103 

1104 There's buffering happening under the covers, so this may not actually 

1105 do any HTTP transfer right away.""" 

1106 offset = 0 

1107 mv = memoryview(b) 

1108 self._total_bytes += len(mv) 

1109 

1110 # 

1111 # botocore does not accept memoryview, otherwise we could've gotten 

1112 # away with not needing to write a copy to the buffer aside from cases 

1113 # where b is smaller than part_size 

1114 # 

1115 while offset < len(mv): 

1116 start = offset 

1117 end = offset + self._part_size - self._buf.tell() 

1118 self._buf.write(mv[start:end]) 

1119 if self._buf.tell() < self._part_size: 

1120 # 

1121 # Not enough data to write a new part just yet. The assert 

1122 # ensures that we've consumed all of the input buffer. 

1123 # 

1124 assert end >= len(mv) 

1125 return len(mv) 

1126 

1127 self._upload_next_part() 

1128 offset = end 

1129 return len(mv) 

1130 

1131 def terminate(self): 

1132 """Cancel the underlying multipart upload.""" 

1133 if self.closed: 

1134 return 

1135 logger.debug('%s: terminating multipart upload', self) 

1136 self._client.abort_multipart_upload( 

1137 Bucket=self._bucket, 

1138 Key=self._key, 

1139 UploadId=self._upload_id, 

1140 ) 

1141 self._upload_id = None 

1142 logger.debug('%s: terminated multipart upload', self) 

1143 

1144 def to_boto3(self, resource): 

1145 """Create an **independent** `boto3.s3.Object` instance that points to 

1146 the same S3 object as this instance. 

1147 Changes to the returned object will not affect the current instance. 

1148 """ 

1149 assert resource, 'resource must be a boto3.resource instance' 

1150 return resource.Object(self._bucket, self._key) 

1151 

1152 # 

1153 # Internal methods. 

1154 # 

1155 def _upload_next_part(self) -> None: 

1156 part_num = self._total_parts + 1 

1157 logger.info( 

1158 "%s: uploading part_num: %i, %i bytes (total %.3fGB)", 

1159 self, 

1160 part_num, 

1161 self._buf.tell(), 

1162 self._total_bytes / 1024.0 ** 3, 

1163 ) 

1164 self._buf.seek(0) 

1165 

1166 # 

1167 # Network problems in the middle of an upload are particularly 

1168 # troublesome. We don't want to abort the entire upload just because 

1169 # of a temporary connection problem, so this part needs to be 

1170 # especially robust. 

1171 # 

1172 upload = RETRY._do( 

1173 functools.partial( 

1174 self._client.upload_part, 

1175 Bucket=self._bucket, 

1176 Key=self._key, 

1177 UploadId=self._upload_id, 

1178 PartNumber=part_num, 

1179 Body=self._buf, 

1180 ) 

1181 ) 

1182 

1183 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) 

1184 logger.debug("%s: upload of part_num #%i finished", self, part_num) 

1185 

1186 self._total_parts += 1 

1187 

1188 self._buf.seek(0) 

1189 self._buf.truncate(0) 

1190 

1191 def __enter__(self): 

1192 return self 

1193 

1194 def __exit__(self, exc_type, exc_val, exc_tb): 

1195 if exc_type is not None: 

1196 self.terminate() 

1197 else: 

1198 self.close() 

1199 

1200 def __str__(self): 

1201 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key) 

1202 

1203 def __repr__(self): 

1204 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % ( 

1205 self._bucket, 

1206 self._key, 

1207 self._part_size, 

1208 ) 

1209 

1210 

1211class SinglepartWriter(io.BufferedIOBase): 

1212 """Writes bytes to S3 using the single part API. 

1213 

1214 Implements the io.BufferedIOBase interface of the standard library. 

1215 

1216 This class buffers all of its input in memory until its `close` method is called. Only then will 

1217 the data be written to S3 and the buffer is released.""" 

1218 _buf = None # so `closed` property works in case __init__ fails and __del__ is called 

1219 

1220 def __init__( 

1221 self, 

1222 bucket, 

1223 key, 

1224 client=None, 

1225 client_kwargs=None, 

1226 writebuffer=None, 

1227 ): 

1228 _initialize_boto3(self, client, client_kwargs, bucket, key) 

1229 

1230 if writebuffer is None: 

1231 self._buf = io.BytesIO() 

1232 elif not writebuffer.seekable(): 

1233 raise ValueError('writebuffer needs to be seekable') 

1234 else: 

1235 self._buf = writebuffer 

1236 

1237 def flush(self): 

1238 pass 

1239 

1240 # 

1241 # Override some methods from io.IOBase. 

1242 # 

1243 def close(self): 

1244 logger.debug("close: called") 

1245 if self.closed: 

1246 return 

1247 

1248 self.seek(0) 

1249 

1250 try: 

1251 self._client.put_object( 

1252 Bucket=self._bucket, 

1253 Key=self._key, 

1254 Body=self._buf, 

1255 ) 

1256 except botocore.client.ClientError as e: 

1257 raise ValueError( 

1258 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e 

1259 else: 

1260 logger.debug("%s: direct upload finished", self) 

1261 finally: 

1262 self._buf.close() 

1263 

1264 @property 

1265 def closed(self): 

1266 return self._buf is None or self._buf.closed 

1267 

1268 def readable(self): 

1269 """Propagate.""" 

1270 return self._buf.readable() 

1271 

1272 def writable(self): 

1273 """Propagate.""" 

1274 return self._buf.writable() 

1275 

1276 def seekable(self): 

1277 """Propagate.""" 

1278 return self._buf.seekable() 

1279 

1280 def seek(self, offset, whence=constants.WHENCE_START): 

1281 """Propagate.""" 

1282 return self._buf.seek(offset, whence) 

1283 

1284 def truncate(self, size=None): 

1285 """Propagate.""" 

1286 return self._buf.truncate(size) 

1287 

1288 def tell(self): 

1289 """Propagate.""" 

1290 return self._buf.tell() 

1291 

1292 def write(self, b): 

1293 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1294 interface implementation) into the buffer. Content of the buffer will be 

1295 written to S3 on close as a single-part upload. 

1296 

1297 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html""" 

1298 return self._buf.write(b) 

1299 

1300 def read(self, size=-1): 

1301 """Propagate.""" 

1302 return self._buf.read(size) 

1303 

1304 def read1(self, size=-1): 

1305 """Propagate.""" 

1306 return self._buf.read1(size) 

1307 

1308 def terminate(self): 

1309 """Close buffer and skip upload.""" 

1310 self._buf.close() 

1311 logger.debug('%s: terminated singlepart upload', self) 

1312 

1313 # 

1314 # Internal methods. 

1315 # 

1316 def __enter__(self): 

1317 return self 

1318 

1319 def __exit__(self, exc_type, exc_val, exc_tb): 

1320 if exc_type is not None: 

1321 self.terminate() 

1322 else: 

1323 self.close() 

1324 

1325 def __str__(self): 

1326 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key) 

1327 

1328 def __repr__(self): 

1329 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) 

1330 

1331 

1332def _accept_all(key): 

1333 return True 

1334 

1335 

1336def iter_bucket( 

1337 bucket_name, 

1338 prefix='', 

1339 accept_key=None, 

1340 key_limit=None, 

1341 workers=16, 

1342 retries=3, 

1343 max_threads_per_fileobj=4, 

1344 client_kwargs=None, 

1345 **session_kwargs, # double star notation for backwards compatibility 

1346): 

1347 """ 

1348 Iterate and download all S3 objects under `s3://bucket_name/prefix`. 

1349 

1350 Parameters 

1351 ---------- 

1352 bucket_name: str 

1353 The name of the bucket. 

1354 prefix: str, optional 

1355 Limits the iteration to keys starting with the prefix. 

1356 accept_key: callable, optional 

1357 This is a function that accepts a key name (unicode string) and 

1358 returns True/False, signalling whether the given key should be downloaded. 

1359 The default behavior is to accept all keys. 

1360 key_limit: int, optional 

1361 If specified, the iterator will stop after yielding this many results. 

1362 workers: int, optional 

1363 The number of objects to download concurrently. The entire operation uses 

1364 a single ThreadPoolExecutor and shared thread-safe boto3 S3.Client. Default: 16 

1365 retries: int, optional 

1366 The number of time to retry a failed download. Default: 3 

1367 max_threads_per_fileobj: int, optional 

1368 The maximum number of download threads per worker. The maximum size of the 

1369 connection pool will be `workers * max_threads_per_fileobj + 1`. Default: 4 

1370 client_kwargs: dict, optional 

1371 Keyword arguments to pass when creating a new session. 

1372 For a list of available names and values, see: 

1373 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client 

1374 session_kwargs: dict, optional 

1375 Keyword arguments to pass when creating a new session. 

1376 For a list of available names and values, see: 

1377 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session 

1378 

1379 

1380 Yields 

1381 ------ 

1382 str 

1383 The full key name (does not include the bucket name). 

1384 bytes 

1385 The full contents of the key. 

1386 

1387 Notes 

1388 ----- 

1389 The keys are processed in parallel, using `workers` processes (default: 16), 

1390 to speed up downloads greatly. If multiprocessing is not available, thus 

1391 _MULTIPROCESSING is False, this parameter will be ignored. 

1392 

1393 Examples 

1394 -------- 

1395 

1396 >>> # get all JSON files under "mybucket/foo/" 

1397 >>> for key, content in iter_bucket( 

1398 ... bucket_name, prefix='foo/', 

1399 ... accept_key=lambda key: key.endswith('.json')): 

1400 ... print key, len(content) 

1401 

1402 >>> # limit to 10k files, using 32 parallel workers (default is 16) 

1403 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32): 

1404 ... print key, len(content) 

1405 """ 

1406 if accept_key is None: 

1407 accept_key = _accept_all 

1408 

1409 # 

1410 # If people insist on giving us bucket instances, silently extract the name 

1411 # before moving on. Works for boto3 as well as boto. 

1412 # 

1413 try: 

1414 bucket_name = bucket_name.name 

1415 except AttributeError: 

1416 pass 

1417 

1418 if bucket_name is None: 

1419 raise ValueError('bucket_name may not be None') 

1420 

1421 total_size, key_no = 0, 0 

1422 

1423 # thread-safe client to share across _list_bucket and _download_key calls 

1424 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111 

1425 session = boto3.session.Session(**session_kwargs) 

1426 if client_kwargs is None: 

1427 client_kwargs = {} 

1428 if 'config' not in client_kwargs: 

1429 client_kwargs['config'] = botocore.client.Config( 

1430 max_pool_connections=workers * max_threads_per_fileobj + 1, # 1 thread for _list_bucket 

1431 tcp_keepalive=True, 

1432 retries={'max_attempts': retries * 2, 'mode': 'adaptive'}, 

1433 ) 

1434 client = session.client('s3', **client_kwargs) 

1435 

1436 transfer_config = boto3.s3.transfer.TransferConfig(max_concurrency=max_threads_per_fileobj) 

1437 

1438 key_iterator = _list_bucket( 

1439 bucket_name=bucket_name, 

1440 prefix=prefix, 

1441 accept_key=accept_key, 

1442 client=client, 

1443 ) 

1444 download_key = functools.partial( 

1445 _download_key, 

1446 bucket_name=bucket_name, 

1447 retries=retries, 

1448 client=client, 

1449 transfer_config=transfer_config, 

1450 ) 

1451 

1452 # Limit the iterator ('infinite' iterators are supported, key_limit=None is supported) 

1453 key_iterator = itertools.islice(key_iterator, key_limit) 

1454 

1455 with smart_open.concurrency.ThreadPoolExecutor(workers) as executor: 

1456 result_iterator = executor.imap(download_key, key_iterator) 

1457 for key_no, (key, content) in enumerate(result_iterator, start=1): 

1458 # Skip deleted objects (404 responses) 

1459 if key is None: 

1460 continue 

1461 

1462 if key_no % 1000 == 0: 

1463 logger.info( 

1464 "yielding key #%i: %s, size %i (total %.1f MB)", 

1465 key_no, key, len(content), total_size / 1024.0 ** 2 

1466 ) 

1467 

1468 yield key, content 

1469 total_size += len(content) 

1470 logger.info( 

1471 "processed %i keys, total size %.1f MB", 

1472 key_no, 

1473 total_size / 1024.0 ** 2, 

1474 ) 

1475 

1476 

1477def _list_bucket( 

1478 *, 

1479 bucket_name, 

1480 client, 

1481 prefix='', 

1482 accept_key=lambda k: True, 

1483): 

1484 ctoken = None 

1485 

1486 while True: 

1487 # list_objects_v2 doesn't like a None value for ContinuationToken 

1488 # so we don't set it if we don't have one. 

1489 if ctoken: 

1490 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken) 

1491 else: 

1492 kwargs = dict(Bucket=bucket_name, Prefix=prefix) 

1493 response = client.list_objects_v2(**kwargs) 

1494 try: 

1495 content = response['Contents'] 

1496 except KeyError: 

1497 pass 

1498 else: 

1499 for c in content: 

1500 key = c['Key'] 

1501 if accept_key(key): 

1502 yield key 

1503 ctoken = response.get('NextContinuationToken', None) 

1504 if not ctoken: 

1505 break 

1506 

1507 

1508def _download_key(key_name, *, client, bucket_name, retries, transfer_config): 

1509 # Sometimes, https://github.com/boto/boto/issues/2409 can happen 

1510 # because of network issues on either side. 

1511 # Retry up to 3 times to ensure its not a transient issue. 

1512 for x in range(retries + 1): 

1513 try: 

1514 content_bytes = _download_fileobj( 

1515 client=client, 

1516 bucket_name=bucket_name, 

1517 key_name=key_name, 

1518 transfer_config=transfer_config, 

1519 ) 

1520 except botocore.exceptions.ClientError as err: 

1521 # 

1522 # ignore 404 not found errors: they mean the object was deleted 

1523 # after we listed the contents of the bucket, but before we 

1524 # downloaded the object. 

1525 # 

1526 if 'Error' in err.response and err.response['Error'].get('Code') == '404': 

1527 return None, None 

1528 # Actually fail on last pass through the loop 

1529 if x == retries: 

1530 raise 

1531 # Otherwise, try again, as this might be a transient timeout 

1532 continue 

1533 return key_name, content_bytes 

1534 

1535 

1536def _download_fileobj(*, client, bucket_name, key_name, transfer_config): 

1537 # 

1538 # This is a separate function only because it makes it easier to inject 

1539 # exceptions during tests. 

1540 # 

1541 buf = io.BytesIO() 

1542 client.download_fileobj( 

1543 Bucket=bucket_name, 

1544 Key=key_name, 

1545 Fileobj=buf, 

1546 Config=transfer_config, 

1547 ) 

1548 return buf.getvalue()