Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/s3.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

615 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8"""Implements file-like objects for reading and writing from/to AWS S3.""" 

9from __future__ import annotations 

10 

11import http 

12import io 

13import functools 

14import logging 

15import time 

16import warnings 

17from math import inf 

18 

19from typing import ( 

20 Callable, 

21 List, 

22 TYPE_CHECKING, 

23) 

24 

25try: 

26 import boto3 

27 import botocore.client 

28 import botocore.exceptions 

29 import urllib3.exceptions 

30except ImportError: 

31 MISSING_DEPS = True 

32 

33import smart_open.bytebuffer 

34import smart_open.concurrency 

35import smart_open.utils 

36 

37from smart_open import constants 

38 

39 

40if TYPE_CHECKING: 

41 from mypy_boto3_s3.client import S3Client 

42 from typing_extensions import Buffer 

43 

44logger = logging.getLogger(__name__) 

45 

46# 

47# AWS puts restrictions on the part size for multipart uploads. 

48# Each part must be more than 5MB, and less than 5GB. 

49# 

50# On top of that, our MultipartWriter has a min_part_size option. 

51# In retrospect, it's an unfortunate name, because it conflicts with the 

52# minimum allowable part size (5MB), but it's too late to change it, because 

53# people are using that parameter (unlike the MIN, DEFAULT, MAX constants). 

54# It really just means "part size": as soon as you have this many bytes, 

55# write a part to S3 (see the MultipartWriter.write method). 

56# 

57 

58MIN_PART_SIZE = 5 * 1024 ** 2 

59"""The absolute minimum permitted by Amazon.""" 

60 

61DEFAULT_PART_SIZE = 50 * 1024**2 

62"""The default part size for S3 multipart uploads, chosen carefully by smart_open""" 

63 

64MAX_PART_SIZE = 5 * 1024 ** 3 

65"""The absolute maximum permitted by Amazon.""" 

66 

67SCHEMES = ("s3", "s3n", 's3u', "s3a") 

68DEFAULT_PORT = 443 

69DEFAULT_HOST = 's3.amazonaws.com' 

70 

71DEFAULT_BUFFER_SIZE = 128 * 1024 

72 

73URI_EXAMPLES = ( 

74 's3://my_bucket/my_key', 

75 's3://my_key:my_secret@my_bucket/my_key', 

76 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key', 

77) 

78 

79# Returned by AWS when we try to seek beyond EOF. 

80_OUT_OF_RANGE = 'InvalidRange' 

81 

82 

83class Retry: 

84 def __init__(self): 

85 self.attempts: int = 6 

86 self.sleep_seconds: int = 10 

87 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError] 

88 self.client_error_codes: List[str] = ['NoSuchUpload'] 

89 

90 def _do(self, fn: Callable): 

91 for attempt in range(self.attempts): 

92 try: 

93 return fn() 

94 except tuple(self.exceptions) as err: 

95 logger.critical( 

96 'Caught non-fatal %s, retrying %d more times', 

97 err, 

98 self.attempts - attempt - 1, 

99 ) 

100 logger.exception(err) 

101 time.sleep(self.sleep_seconds) 

102 except botocore.exceptions.ClientError as err: 

103 error_code = err.response['Error'].get('Code') 

104 if error_code not in self.client_error_codes: 

105 raise 

106 logger.critical( 

107 'Caught non-fatal ClientError (%s), retrying %d more times', 

108 error_code, 

109 self.attempts - attempt - 1, 

110 ) 

111 logger.exception(err) 

112 time.sleep(self.sleep_seconds) 

113 else: 

114 logger.critical('encountered too many non-fatal errors, giving up') 

115 raise IOError('%s failed after %d attempts', fn.func, self.attempts) 

116 

117 

118# 

119# The retry mechanism for this submodule. Client code may modify it, e.g. by 

120# updating RETRY.sleep_seconds and friends. 

121# 

122if 'MISSING_DEPS' not in locals(): 

123 RETRY = Retry() 

124 

125 

126class _ClientWrapper: 

127 """Wraps a client to inject the appropriate keyword args into each method call. 

128 

129 The keyword args are a dictionary keyed by the fully qualified method name. 

130 For example, S3.Client.create_multipart_upload. 

131 

132 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client 

133 

134 This wrapper behaves identically to the client otherwise. 

135 """ 

136 def __init__(self, client, kwargs): 

137 self.client = client 

138 self.kwargs = kwargs 

139 

140 def __getattr__(self, method_name): 

141 method = getattr(self.client, method_name) 

142 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {}) 

143 return functools.partial(method, **kwargs) 

144 

145 

146def parse_uri(uri_as_string): 

147 # 

148 # Restrictions on bucket names and labels: 

149 # 

150 # - Bucket names must be at least 3 and no more than 63 characters long. 

151 # - Bucket names must be a series of one or more labels. 

152 # - Adjacent labels are separated by a single period (.). 

153 # - Bucket names can contain lowercase letters, numbers, and hyphens. 

154 # - Each label must start and end with a lowercase letter or a number. 

155 # 

156 # We use the above as a guide only, and do not perform any validation. We 

157 # let boto3 take care of that for us. 

158 # 

159 split_uri = smart_open.utils.safe_urlsplit(uri_as_string) 

160 assert split_uri.scheme in SCHEMES 

161 

162 port = DEFAULT_PORT 

163 host = DEFAULT_HOST 

164 ordinary_calling_format = False 

165 # 

166 # These defaults tell boto3 to look for credentials elsewhere 

167 # 

168 access_id, access_secret = None, None 

169 

170 # 

171 # Common URI template [secret:key@][host[:port]@]bucket/object 

172 # 

173 # The urlparse function doesn't handle the above schema, so we have to do 

174 # it ourselves. 

175 # 

176 uri = split_uri.netloc + split_uri.path 

177 

178 # 

179 # Attempt to extract edge-case authentication details from the URL. 

180 # 

181 # See: 

182 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/ 

183 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases 

184 # 

185 if '@' in uri: 

186 maybe_auth, rest = uri.split('@', 1) 

187 if ':' in maybe_auth: 

188 maybe_id, maybe_secret = maybe_auth.split(':', 1) 

189 if '/' not in maybe_id: 

190 access_id, access_secret = maybe_id, maybe_secret 

191 uri = rest 

192 

193 head, key_id = uri.split('/', 1) 

194 if '@' in head and ':' in head: 

195 ordinary_calling_format = True 

196 host_port, bucket_id = head.split('@') 

197 host, port = host_port.split(':', 1) 

198 port = int(port) 

199 elif '@' in head: 

200 ordinary_calling_format = True 

201 host, bucket_id = head.split('@') 

202 else: 

203 bucket_id = head 

204 

205 return dict( 

206 scheme=split_uri.scheme, 

207 bucket_id=bucket_id, 

208 key_id=key_id, 

209 port=port, 

210 host=host, 

211 ordinary_calling_format=ordinary_calling_format, 

212 access_id=access_id, 

213 access_secret=access_secret, 

214 ) 

215 

216 

217def _consolidate_params(uri, transport_params): 

218 """Consolidates the parsed Uri with the additional parameters. 

219 

220 This is necessary because the user can pass some of the parameters can in 

221 two different ways: 

222 

223 1) Via the URI itself 

224 2) Via the transport parameters 

225 

226 These are not mutually exclusive, but we have to pick one over the other 

227 in a sensible way in order to proceed. 

228 

229 """ 

230 transport_params = dict(transport_params) 

231 

232 def inject(**kwargs): 

233 try: 

234 client_kwargs = transport_params['client_kwargs'] 

235 except KeyError: 

236 client_kwargs = transport_params['client_kwargs'] = {} 

237 

238 try: 

239 init_kwargs = client_kwargs['S3.Client'] 

240 except KeyError: 

241 init_kwargs = client_kwargs['S3.Client'] = {} 

242 

243 init_kwargs.update(**kwargs) 

244 

245 client = transport_params.get('client') 

246 if client is not None and (uri['access_id'] or uri['access_secret']): 

247 logger.warning( 

248 'ignoring credentials parsed from URL because they conflict with ' 

249 'transport_params["client"]. Set transport_params["client"] to None ' 

250 'to suppress this warning.' 

251 ) 

252 uri.update(access_id=None, access_secret=None) 

253 elif (uri['access_id'] and uri['access_secret']): 

254 inject( 

255 aws_access_key_id=uri['access_id'], 

256 aws_secret_access_key=uri['access_secret'], 

257 ) 

258 uri.update(access_id=None, access_secret=None) 

259 

260 if client is not None and uri['host'] != DEFAULT_HOST: 

261 logger.warning( 

262 'ignoring endpoint_url parsed from URL because they conflict with ' 

263 'transport_params["client"]. Set transport_params["client"] to None ' 

264 'to suppress this warning.' 

265 ) 

266 uri.update(host=None) 

267 elif uri['host'] != DEFAULT_HOST: 

268 if uri['scheme'] == 's3u': 

269 scheme = 'http' 

270 else: 

271 scheme = 'https' 

272 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri) 

273 uri.update(host=None) 

274 

275 return uri, transport_params 

276 

277 

278def open_uri(uri, mode, transport_params): 

279 deprecated = ( 

280 'multipart_upload_kwargs', 

281 'object_kwargs', 

282 'resource', 

283 'resource_kwargs', 

284 'session', 

285 'singlepart_upload_kwargs', 

286 ) 

287 detected = [k for k in deprecated if k in transport_params] 

288 if detected: 

289 doc_url = ( 

290 'https://github.com/piskvorky/smart_open/blob/develop/' 

291 'MIGRATING_FROM_OLDER_VERSIONS.rst' 

292 ) 

293 # 

294 # We use warnings.warn /w UserWarning instead of logger.warn here because 

295 # 

296 # 1) Not everyone has logging enabled; and 

297 # 2) check_kwargs (below) already uses logger.warn with a similar message 

298 # 

299 # https://github.com/piskvorky/smart_open/issues/614 

300 # 

301 message = ( 

302 'ignoring the following deprecated transport parameters: %r. ' 

303 'See <%s> for details' % (detected, doc_url) 

304 ) 

305 warnings.warn(message, UserWarning) 

306 parsed_uri = parse_uri(uri) 

307 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params) 

308 kwargs = smart_open.utils.check_kwargs(open, transport_params) 

309 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs) 

310 

311 

312def open( 

313 bucket_id, 

314 key_id, 

315 mode, 

316 version_id=None, 

317 buffer_size=DEFAULT_BUFFER_SIZE, 

318 min_part_size=DEFAULT_PART_SIZE, 

319 multipart_upload=True, 

320 defer_seek=False, 

321 client=None, 

322 client_kwargs=None, 

323 writebuffer=None, 

324 range_chunk_size=None, 

325): 

326 """Open an S3 object for reading or writing. 

327 

328 Parameters 

329 ---------- 

330 bucket_id: str 

331 The name of the bucket this object resides in. 

332 key_id: str 

333 The name of the key within the bucket. 

334 mode: str 

335 The mode for opening the object. Must be either "rb" or "wb". 

336 buffer_size: int, optional 

337 Default: 128KB 

338 The buffer size in bytes for reading. Controls memory usage. Data is streamed 

339 from a S3 network stream in buffer_size chunks. Forward seeks within 

340 the current buffer are satisfied without additional GET requests. Backward 

341 seeks always open a new GET request. For forward seek-intensive workloads, 

342 increase buffer_size to reduce GET requests at the cost of higher memory usage. 

343 min_part_size: int, optional 

344 The minimum part size for multipart uploads, in bytes. 

345 When the writebuffer contains this many bytes, smart_open will upload 

346 the bytes to S3 as a single part of a multi-part upload, freeing the 

347 buffer either partially or entirely. When you close the writer, it 

348 will assemble the parts together. 

349 The value determines the upper limit for the writebuffer. If buffer 

350 space is short (e.g. you are buffering to memory), then use a smaller 

351 value for min_part_size, or consider buffering to disk instead (see 

352 the writebuffer option). 

353 The value must be between 5MB and 5GB. If you specify a value outside 

354 of this range, smart_open will adjust it for you, because otherwise the 

355 upload _will_ fail. 

356 For writing only. Does not apply if you set multipart_upload=False. 

357 multipart_upload: bool, optional 

358 Default: `True` 

359 If set to `True`, will use multipart upload for writing to S3. If set 

360 to `False`, S3 upload will use the S3 Single-Part Upload API, which 

361 is more ideal for small file sizes. 

362 For writing only. 

363 version_id: str, optional 

364 Version of the object, used when reading object. 

365 If None, will fetch the most recent version. 

366 defer_seek: boolean, optional 

367 Default: `False` 

368 If set to `True` on a file opened for reading, GetObject will not be 

369 called until the first seek() or read(). 

370 Avoids redundant API queries when seeking before reading. 

371 range_chunk_size: int, optional 

372 Default: `None` 

373 Maximum byte range per S3 GET request when reading. 

374 When None (default), a single GET request is made for the entire file, 

375 and data is streamed from that single botocore.response.StreamingBody 

376 in buffer_size chunks. 

377 When set to a positive integer, multiple GET requests are made, each 

378 limited to at most this many bytes via HTTP Range headers. Each GET 

379 returns a new StreamingBody that is streamed in buffer_size chunks. 

380 Useful for reading small portions of large files without forcing 

381 S3-compatible systems like SeaweedFS/Ceph to load the entire file. 

382 Larger values mean fewer billable GET requests but higher load on S3 

383 servers. Smaller values mean more GET requests but less server load per request. 

384 Values larger than the file size result in a single GET for the whole file. 

385 Affects reading only. Does not affect memory usage (controlled by buffer_size). 

386 client: object, optional 

387 The S3 client to use when working with boto3. 

388 If you don't specify this, then smart_open will create a new client for you. 

389 client_kwargs: dict, optional 

390 Additional parameters to pass to the relevant functions of the client. 

391 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. 

392 The values are kwargs to pass to that method each time it is called. 

393 writebuffer: IO[bytes], optional 

394 By default, this module will buffer data in memory using io.BytesIO 

395 when writing. Pass another binary IO instance here to use it instead. 

396 For example, you may pass a file object to buffer to local disk instead 

397 of in RAM. Use this to keep RAM usage low at the expense of additional 

398 disk IO. If you pass in an open file, then you are responsible for 

399 cleaning it up after writing completes. 

400 """ 

401 logger.debug('%r', locals()) 

402 if mode not in constants.BINARY_MODES: 

403 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES)) 

404 

405 if (mode == constants.WRITE_BINARY) and (version_id is not None): 

406 raise ValueError("version_id must be None when writing") 

407 

408 if mode == constants.READ_BINARY: 

409 fileobj = Reader( 

410 bucket_id, 

411 key_id, 

412 version_id=version_id, 

413 buffer_size=buffer_size, 

414 defer_seek=defer_seek, 

415 client=client, 

416 client_kwargs=client_kwargs, 

417 range_chunk_size=range_chunk_size, 

418 ) 

419 elif mode == constants.WRITE_BINARY: 

420 if multipart_upload: 

421 fileobj = MultipartWriter( 

422 bucket_id, 

423 key_id, 

424 client=client, 

425 client_kwargs=client_kwargs, 

426 writebuffer=writebuffer, 

427 part_size=min_part_size, 

428 ) 

429 else: 

430 fileobj = SinglepartWriter( 

431 bucket_id, 

432 key_id, 

433 client=client, 

434 client_kwargs=client_kwargs, 

435 writebuffer=writebuffer, 

436 ) 

437 else: 

438 assert False, 'unexpected mode: %r' % mode 

439 

440 fileobj.name = key_id 

441 return fileobj 

442 

443 

444def _get(client, bucket, key, version, range_string): 

445 try: 

446 params = dict(Bucket=bucket, Key=key) 

447 if version: 

448 params["VersionId"] = version 

449 if range_string: 

450 params["Range"] = range_string 

451 

452 return client.get_object(**params) 

453 except botocore.client.ClientError as error: 

454 wrapped_error = IOError( 

455 'unable to access bucket: %r key: %r version: %r error: %s' % ( 

456 bucket, key, version, error 

457 ) 

458 ) 

459 wrapped_error.backend_error = error 

460 raise wrapped_error from error 

461 

462 

463def _unwrap_ioerror(ioe): 

464 """Given an IOError from _get, return the 'Error' dictionary from boto.""" 

465 try: 

466 return ioe.backend_error.response['Error'] 

467 except (AttributeError, KeyError): 

468 return None 

469 

470 

471class _SeekableRawReader(object): 

472 """Read an S3 object. 

473 

474 This class is internal to the S3 submodule. 

475 """ 

476 

477 def __init__( 

478 self, 

479 client, 

480 bucket, 

481 key, 

482 version_id=None, 

483 range_chunk_size=None, 

484 ): 

485 self._client = client 

486 self._bucket = bucket 

487 self._key = key 

488 self._version_id = version_id 

489 self._range_chunk_size = range_chunk_size 

490 

491 self._content_length = None 

492 self._position = 0 

493 self._body = None 

494 

495 @property 

496 def closed(self): 

497 return self._body is None 

498 

499 def close(self): 

500 if not self.closed: 

501 self._body.close() 

502 self._body = None 

503 

504 def seek(self, offset, whence=constants.WHENCE_START): 

505 """Seek to the specified position. 

506 

507 :param int offset: The offset in bytes. 

508 :param int whence: Where the offset is from. 

509 

510 :returns: the position after seeking. 

511 :rtype: int 

512 """ 

513 if whence not in constants.WHENCE_CHOICES: 

514 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES) 

515 

516 # 

517 # Close old body explicitly. 

518 # 

519 self.close() 

520 

521 start = None 

522 stop = None 

523 if whence == constants.WHENCE_START: 

524 start = max(0, offset) 

525 elif whence == constants.WHENCE_CURRENT: 

526 start = max(0, offset + self._position) 

527 else: 

528 stop = max(0, -offset) 

529 

530 # 

531 # If we can figure out that we've read past the EOF, then we can save 

532 # an extra API call. 

533 # 

534 if self._content_length is None: 

535 reached_eof = False 

536 elif start is not None and start >= self._content_length: 

537 reached_eof = True 

538 elif stop == 0: 

539 reached_eof = True 

540 else: 

541 reached_eof = False 

542 

543 if reached_eof: 

544 self._body = io.BytesIO() 

545 self._position = self._content_length 

546 else: 

547 self._open_body(start, stop) 

548 

549 return self._position 

550 

551 def _open_body(self, start=None, stop=None): 

552 """Open a connection to download the specified range of bytes. Store 

553 the open file handle in self._body. 

554 

555 If no range is specified, start defaults to self._position. 

556 start and stop follow the semantics of the http range header, 

557 so a stop without a start will read bytes beginning at stop. 

558 

559 If self._range_chunk_size is set, the S3 server is protected from open range 

560 headers and stop will be set such that at most self._range_chunk_size bytes 

561 are returned in a single GET request. 

562 

563 As a side effect, set self._content_length. Set self._position 

564 to self._content_length if start is past end of file. 

565 """ 

566 if start is None and stop is None: 

567 start = self._position 

568 

569 # Apply chunking: limit the stop position if range_chunk_size is set 

570 if stop is None and self._range_chunk_size is not None: 

571 stop = start + self._range_chunk_size - 1 

572 # Don't request beyond known content length 

573 if self._content_length is not None: 

574 stop = min(stop, self._content_length - 1) 

575 

576 range_string = smart_open.utils.make_range_string(start, stop) 

577 

578 try: 

579 # Optimistically try to fetch the requested content range. 

580 response = _get( 

581 self._client, 

582 self._bucket, 

583 self._key, 

584 self._version_id, 

585 range_string, 

586 ) 

587 except IOError as ioe: 

588 # Handle requested content range exceeding content size. 

589 error_response = _unwrap_ioerror(ioe) 

590 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE: 

591 raise 

592 

593 actual_object_size = int(error_response.get('ActualObjectSize', 0)) 

594 if ( 

595 # empty file (==) or start is past end of file (>) 

596 (start is not None and start >= actual_object_size) 

597 # negative seek requested more bytes than file has 

598 or (start is None and stop is not None and stop >= actual_object_size) 

599 ): 

600 self._position = self._content_length = actual_object_size 

601 self._body = io.BytesIO() 

602 else: # stop is past end of file: request the correct remainder instead 

603 self._open_body(start=start, stop=actual_object_size - 1) 

604 return 

605 

606 # 

607 # Keep track of how many times boto3's built-in retry mechanism 

608 # activated. 

609 # 

610 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response 

611 # 

612 logger.debug( 

613 '%s: RetryAttempts: %d', 

614 self, 

615 response['ResponseMetadata']['RetryAttempts'], 

616 ) 

617 # 

618 # range request may not always return partial content, see: 

619 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses 

620 # 

621 status_code = response['ResponseMetadata']['HTTPStatusCode'] 

622 if status_code == http.HTTPStatus.PARTIAL_CONTENT: 

623 # 206 guarantees that the response body only contains the requested byte range 

624 _, resp_start, _, length = smart_open.utils.parse_content_range(response['ContentRange']) 

625 self._position = resp_start 

626 self._content_length = length 

627 self._body = response['Body'] 

628 elif status_code == http.HTTPStatus.OK: 

629 # 200 guarantees the response body contains the full file (server ignored range header) 

630 self._position = 0 

631 self._content_length = response["ContentLength"] 

632 self._body = response['Body'] 

633 # 

634 # If we got a full request when we were actually expecting a range, we need to 

635 # read some data to ensure that the body starts in the place that the caller expects 

636 # 

637 if start is not None: 

638 expected_position = min(self._content_length, start) 

639 elif start is None and stop is not None: 

640 expected_position = max(0, self._content_length - stop) 

641 else: 

642 expected_position = 0 

643 if expected_position > 0: 

644 logger.debug( 

645 '%s: discarding %d bytes to reach expected position', 

646 self, 

647 expected_position, 

648 ) 

649 self._position = len(self._body.read(expected_position)) 

650 else: 

651 raise ValueError("Unexpected status code %r" % status_code) 

652 

653 def read(self, size=-1): 

654 """Read from the continuous connection with the remote peer.""" 

655 if size < -1: 

656 raise ValueError(f'size must be >= -1, got {size}') 

657 

658 if size == -1: 

659 size = inf # makes for a simple while-condition below 

660 

661 binary_collected = io.BytesIO() 

662 

663 # 

664 # Boto3 has built-in error handling and retry mechanisms: 

665 # 

666 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html 

667 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

668 # 

669 # Unfortunately, it isn't always enough. There is still a non-zero 

670 # possibility that an exception will slip past these mechanisms and 

671 # terminate the read prematurely. Luckily, at this stage, it's very 

672 # simple to recover from the problem: wait a little bit, reopen the 

673 # HTTP connection and try again. Usually, a single retry attempt is 

674 # enough to recover, but we try multiple times "just in case". 

675 # 

676 def retry_read(attempts=(1, 2, 4, 8, 16)) -> bytes: 

677 for seconds in attempts: 

678 if self.closed: 

679 self._open_body() 

680 try: 

681 if size == inf: 

682 return self._body.read() 

683 return self._body.read(size - binary_collected.tell()) 

684 except ( 

685 ConnectionResetError, 

686 botocore.exceptions.BotoCoreError, 

687 urllib3.exceptions.HTTPError, 

688 ) as err: 

689 logger.warning( 

690 '%s: caught %r while reading %d bytes, sleeping %ds before retry', 

691 self, 

692 err, 

693 -1 if size == inf else size, 

694 seconds, 

695 ) 

696 self.close() 

697 time.sleep(seconds) 

698 raise IOError( 

699 '%s: failed to read %d bytes after %d attempts' % 

700 (self, -1 if size == inf else size, len(attempts)), 

701 ) 

702 

703 while ( 

704 self._content_length is None # very first read call 

705 or ( 

706 self._position < self._content_length # not yet end of file 

707 and binary_collected.tell() < size # not yet read enough 

708 ) 

709 ): 

710 binary = retry_read() 

711 self._position += len(binary) 

712 binary_collected.write(binary) 

713 if not binary: # end of stream 

714 self.close() 

715 

716 return binary_collected.getvalue() 

717 

718 def __str__(self): 

719 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) 

720 

721 

722def _initialize_boto3(rw, client, client_kwargs, bucket, key): 

723 """Created the required objects for accessing S3. Ideally, they have 

724 been already created for us and we can just reuse them.""" 

725 if client_kwargs is None: 

726 client_kwargs = {} 

727 

728 if client is None: 

729 init_kwargs = client_kwargs.get('S3.Client', {}) 

730 if 'config' not in init_kwargs: 

731 init_kwargs['config'] = botocore.client.Config( 

732 max_pool_connections=64, 

733 tcp_keepalive=True, 

734 retries={"max_attempts": 6, "mode": "adaptive"} 

735 ) 

736 # boto3.client re-uses the default session which is not thread-safe when this is called 

737 # from within a thread. when using smart_open with multithreading, create a thread-safe 

738 # client with the config above and share it between threads using transport_params 

739 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111 

740 client = boto3.client('s3', **init_kwargs) 

741 assert client 

742 

743 rw._client = _ClientWrapper(client, client_kwargs) 

744 rw._bucket = bucket 

745 rw._key = key 

746 

747 

748class Reader(io.BufferedIOBase): 

749 """Reads bytes from S3. 

750 

751 Implements the io.BufferedIOBase interface of the standard library.""" 

752 

753 def __init__( 

754 self, 

755 bucket, 

756 key, 

757 version_id=None, 

758 buffer_size=DEFAULT_BUFFER_SIZE, 

759 line_terminator=constants.BINARY_NEWLINE, 

760 defer_seek=False, 

761 client=None, 

762 client_kwargs=None, 

763 range_chunk_size=None, 

764 ): 

765 self._version_id = version_id 

766 self._buffer_size = buffer_size 

767 

768 _initialize_boto3(self, client, client_kwargs, bucket, key) 

769 

770 self._raw_reader = _SeekableRawReader( 

771 self._client, 

772 bucket, 

773 key, 

774 self._version_id, 

775 range_chunk_size=range_chunk_size, 

776 ) 

777 self._current_pos = 0 

778 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) 

779 self._eof = False 

780 self._line_terminator = line_terminator 

781 self._seek_initialized = False 

782 

783 # 

784 # This member is part of the io.BufferedIOBase interface. 

785 # 

786 self.raw = None 

787 

788 if not defer_seek: 

789 self.seek(0) 

790 

791 # 

792 # io.BufferedIOBase methods. 

793 # 

794 

795 def close(self): 

796 """Flush and close this stream.""" 

797 logger.debug("close: called") 

798 pass 

799 

800 def readable(self): 

801 """Return True if the stream can be read from.""" 

802 return True 

803 

804 def read(self, size=-1): 

805 """Read up to size bytes from the object and return them.""" 

806 if size == 0: 

807 return b'' 

808 elif size < 0: 

809 # call read() before setting _current_pos to make sure _content_length is set 

810 out = self._read_from_buffer() + self._raw_reader.read() 

811 self._current_pos = self._raw_reader._content_length 

812 return out 

813 

814 # 

815 # Return unused data first 

816 # 

817 if len(self._buffer) >= size: 

818 return self._read_from_buffer(size) 

819 

820 # 

821 # If the stream is finished, return what we have. 

822 # 

823 if self._eof: 

824 return self._read_from_buffer() 

825 

826 self._fill_buffer(size) 

827 return self._read_from_buffer(size) 

828 

829 def read1(self, size=-1): 

830 """This is the same as read().""" 

831 return self.read(size=size) 

832 

833 def readinto(self, b): 

834 """Read up to len(b) bytes into b, and return the number of bytes 

835 read.""" 

836 data = self.read(len(b)) 

837 if not data: 

838 return 0 

839 b[:len(data)] = data 

840 return len(data) 

841 

842 def readline(self, limit=-1): 

843 """Read up to and including the next newline. Returns the bytes read.""" 

844 if limit != -1: 

845 raise NotImplementedError('limits other than -1 not implemented yet') 

846 

847 # 

848 # A single line may span multiple buffers. 

849 # 

850 line = io.BytesIO() 

851 while not (self._eof and len(self._buffer) == 0): 

852 line_part = self._buffer.readline(self._line_terminator) 

853 line.write(line_part) 

854 self._current_pos += len(line_part) 

855 

856 if line_part.endswith(self._line_terminator): 

857 break 

858 else: 

859 self._fill_buffer() 

860 

861 return line.getvalue() 

862 

863 def seekable(self): 

864 """If False, seek(), tell() and truncate() will raise IOError. 

865 

866 We offer only seek support, and no truncate support.""" 

867 return True 

868 

869 def seek(self, offset, whence=constants.WHENCE_START): 

870 """Seek to the specified position. 

871 

872 :param int offset: The offset in bytes. 

873 :param int whence: Where the offset is from. 

874 

875 Returns the position after seeking.""" 

876 # Convert relative offset to absolute, since self._raw_reader 

877 # doesn't know our current position. 

878 if whence == constants.WHENCE_CURRENT: 

879 whence = constants.WHENCE_START 

880 offset += self._current_pos 

881 

882 # Check if we can satisfy seek from buffer 

883 if whence == constants.WHENCE_START and offset > self._current_pos: 

884 buffer_end = self._current_pos + len(self._buffer) 

885 if offset <= buffer_end: 

886 # Forward seek within buffered data - avoid S3 request 

887 self._buffer.read(offset - self._current_pos) 

888 self._current_pos = offset 

889 return self._current_pos 

890 

891 if not self._seek_initialized or not ( 

892 whence == constants.WHENCE_START and offset == self._current_pos 

893 ): 

894 self._current_pos = self._raw_reader.seek(offset, whence) 

895 self._buffer.empty() 

896 

897 self._eof = self._current_pos == self._raw_reader._content_length 

898 

899 self._seek_initialized = True 

900 return self._current_pos 

901 

902 def tell(self): 

903 """Return the current position within the file.""" 

904 return self._current_pos 

905 

906 def truncate(self, size=None): 

907 """Unsupported.""" 

908 raise io.UnsupportedOperation 

909 

910 def detach(self): 

911 """Unsupported.""" 

912 raise io.UnsupportedOperation 

913 

914 def terminate(self): 

915 """Do nothing.""" 

916 pass 

917 

918 def to_boto3(self, resource): 

919 """Create an **independent** `boto3.s3.Object` instance that points to 

920 the same S3 object as this instance. 

921 Changes to the returned object will not affect the current instance. 

922 """ 

923 assert resource, 'resource must be a boto3.resource instance' 

924 obj = resource.Object(self._bucket, self._key) 

925 if self._version_id is not None: 

926 return obj.Version(self._version_id) 

927 else: 

928 return obj 

929 

930 # 

931 # Internal methods. 

932 # 

933 def _read_from_buffer(self, size=-1): 

934 """Remove at most size bytes from our buffer and return them.""" 

935 size = size if size >= 0 else len(self._buffer) 

936 part = self._buffer.read(size) 

937 self._current_pos += len(part) 

938 return part 

939 

940 def _fill_buffer(self, size=-1): 

941 size = max(size, self._buffer._chunk_size) 

942 while len(self._buffer) < size and not self._eof: 

943 bytes_read = self._buffer.fill(self._raw_reader) 

944 if bytes_read == 0: 

945 logger.debug('%s: reached EOF while filling buffer', self) 

946 self._eof = True 

947 

948 def __str__(self): 

949 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key) 

950 

951 def __repr__(self): 

952 return ( 

953 "smart_open.s3.Reader(" 

954 "bucket=%r, " 

955 "key=%r, " 

956 "version_id=%r, " 

957 "buffer_size=%r, " 

958 "line_terminator=%r)" 

959 ) % ( 

960 self._bucket, 

961 self._key, 

962 self._version_id, 

963 self._buffer_size, 

964 self._line_terminator, 

965 ) 

966 

967 

968class MultipartWriter(io.BufferedIOBase): 

969 """Writes bytes to S3 using the multi part API. 

970 

971 Implements the io.BufferedIOBase interface of the standard library.""" 

972 _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called 

973 

974 def __init__( 

975 self, 

976 bucket, 

977 key, 

978 part_size=DEFAULT_PART_SIZE, 

979 client=None, 

980 client_kwargs=None, 

981 writebuffer: io.BytesIO | None = None, 

982 ): 

983 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE) 

984 if part_size != adjusted_ps: 

985 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}") 

986 part_size = adjusted_ps 

987 self._part_size = part_size 

988 

989 _initialize_boto3(self, client, client_kwargs, bucket, key) 

990 self._client: S3Client 

991 self._bucket: str 

992 self._key: str 

993 

994 try: 

995 partial = functools.partial( 

996 self._client.create_multipart_upload, 

997 Bucket=bucket, 

998 Key=key, 

999 ) 

1000 self._upload_id = RETRY._do(partial)['UploadId'] 

1001 except botocore.client.ClientError as error: 

1002 raise ValueError( 

1003 'the bucket %r does not exist, or is forbidden for access (%r)' % ( 

1004 bucket, error 

1005 ) 

1006 ) from error 

1007 

1008 if writebuffer is None: 

1009 self._buf = io.BytesIO() 

1010 else: 

1011 self._buf = writebuffer 

1012 

1013 self._total_bytes = 0 

1014 self._total_parts = 0 

1015 self._parts: list[dict[str, object]] = [] 

1016 

1017 # 

1018 # This member is part of the io.BufferedIOBase interface. 

1019 # 

1020 self.raw = None # type: ignore[assignment] 

1021 

1022 def flush(self): 

1023 pass 

1024 

1025 # 

1026 # Override some methods from io.IOBase. 

1027 # 

1028 def close(self): 

1029 logger.debug("close: called") 

1030 if self.closed: 

1031 return 

1032 

1033 if self._buf.tell(): 

1034 self._upload_next_part() 

1035 

1036 logger.debug('%s: completing multipart upload', self) 

1037 if self._total_bytes and self._upload_id: 

1038 partial = functools.partial( 

1039 self._client.complete_multipart_upload, 

1040 Bucket=self._bucket, 

1041 Key=self._key, 

1042 UploadId=self._upload_id, 

1043 MultipartUpload={'Parts': self._parts}, 

1044 ) 

1045 RETRY._do(partial) 

1046 logger.debug('%s: completed multipart upload', self) 

1047 elif self._upload_id: 

1048 # 

1049 # AWS complains with "The XML you provided was not well-formed or 

1050 # did not validate against our published schema" when the input is 

1051 # completely empty => abort the upload, no file created. 

1052 # 

1053 # We work around this by creating an empty file explicitly. 

1054 # 

1055 self._client.abort_multipart_upload( 

1056 Bucket=self._bucket, 

1057 Key=self._key, 

1058 UploadId=self._upload_id, 

1059 ) 

1060 self._client.put_object( 

1061 Bucket=self._bucket, 

1062 Key=self._key, 

1063 Body=b'', 

1064 ) 

1065 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) 

1066 self._upload_id = None 

1067 

1068 @property 

1069 def closed(self): 

1070 return self._upload_id is None 

1071 

1072 def writable(self): 

1073 """Return True if the stream supports writing.""" 

1074 return True 

1075 

1076 def seekable(self): 

1077 """If False, seek(), tell() and truncate() will raise IOError. 

1078 

1079 We offer only tell support, and no seek or truncate support.""" 

1080 return True 

1081 

1082 def seek(self, offset, whence=constants.WHENCE_START): 

1083 """Unsupported.""" 

1084 raise io.UnsupportedOperation 

1085 

1086 def truncate(self, size=None): 

1087 """Unsupported.""" 

1088 raise io.UnsupportedOperation 

1089 

1090 def tell(self): 

1091 """Return the current stream position.""" 

1092 return self._total_bytes 

1093 

1094 # 

1095 # io.BufferedIOBase methods. 

1096 # 

1097 def detach(self): 

1098 raise io.UnsupportedOperation("detach() not supported") 

1099 

1100 def write(self, b: Buffer) -> int: 

1101 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1102 interface implementation) to the S3 file. 

1103 

1104 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html 

1105 

1106 There's buffering happening under the covers, so this may not actually 

1107 do any HTTP transfer right away.""" 

1108 offset = 0 

1109 mv = memoryview(b) 

1110 self._total_bytes += len(mv) 

1111 

1112 # 

1113 # botocore does not accept memoryview, otherwise we could've gotten 

1114 # away with not needing to write a copy to the buffer aside from cases 

1115 # where b is smaller than part_size 

1116 # 

1117 while offset < len(mv): 

1118 start = offset 

1119 end = offset + self._part_size - self._buf.tell() 

1120 self._buf.write(mv[start:end]) 

1121 if self._buf.tell() < self._part_size: 

1122 # 

1123 # Not enough data to write a new part just yet. The assert 

1124 # ensures that we've consumed all of the input buffer. 

1125 # 

1126 assert end >= len(mv) 

1127 return len(mv) 

1128 

1129 self._upload_next_part() 

1130 offset = end 

1131 return len(mv) 

1132 

1133 def terminate(self): 

1134 """Cancel the underlying multipart upload.""" 

1135 if self.closed: 

1136 return 

1137 logger.debug('%s: terminating multipart upload', self) 

1138 self._client.abort_multipart_upload( 

1139 Bucket=self._bucket, 

1140 Key=self._key, 

1141 UploadId=self._upload_id, 

1142 ) 

1143 self._upload_id = None 

1144 logger.debug('%s: terminated multipart upload', self) 

1145 

1146 def to_boto3(self, resource): 

1147 """Create an **independent** `boto3.s3.Object` instance that points to 

1148 the same S3 object as this instance. 

1149 Changes to the returned object will not affect the current instance. 

1150 """ 

1151 assert resource, 'resource must be a boto3.resource instance' 

1152 return resource.Object(self._bucket, self._key) 

1153 

1154 # 

1155 # Internal methods. 

1156 # 

1157 def _upload_next_part(self) -> None: 

1158 part_num = self._total_parts + 1 

1159 logger.info( 

1160 "%s: uploading part_num: %i, %i bytes (total %.3fGB)", 

1161 self, 

1162 part_num, 

1163 self._buf.tell(), 

1164 self._total_bytes / 1024.0 ** 3, 

1165 ) 

1166 self._buf.seek(0) 

1167 

1168 # 

1169 # Network problems in the middle of an upload are particularly 

1170 # troublesome. We don't want to abort the entire upload just because 

1171 # of a temporary connection problem, so this part needs to be 

1172 # especially robust. 

1173 # 

1174 upload = RETRY._do( 

1175 functools.partial( 

1176 self._client.upload_part, 

1177 Bucket=self._bucket, 

1178 Key=self._key, 

1179 UploadId=self._upload_id, 

1180 PartNumber=part_num, 

1181 Body=self._buf, 

1182 ) 

1183 ) 

1184 

1185 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) 

1186 logger.debug("%s: upload of part_num #%i finished", self, part_num) 

1187 

1188 self._total_parts += 1 

1189 

1190 self._buf.seek(0) 

1191 self._buf.truncate(0) 

1192 

1193 def __enter__(self): 

1194 return self 

1195 

1196 def __exit__(self, exc_type, exc_val, exc_tb): 

1197 if exc_type is not None: 

1198 self.terminate() 

1199 else: 

1200 self.close() 

1201 

1202 def __str__(self): 

1203 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key) 

1204 

1205 def __repr__(self): 

1206 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % ( 

1207 self._bucket, 

1208 self._key, 

1209 self._part_size, 

1210 ) 

1211 

1212 

1213class SinglepartWriter(io.BufferedIOBase): 

1214 """Writes bytes to S3 using the single part API. 

1215 

1216 Implements the io.BufferedIOBase interface of the standard library. 

1217 

1218 This class buffers all of its input in memory until its `close` method is called. Only then will 

1219 the data be written to S3 and the buffer is released.""" 

1220 _buf = None # so `closed` property works in case __init__ fails and __del__ is called 

1221 

1222 def __init__( 

1223 self, 

1224 bucket, 

1225 key, 

1226 client=None, 

1227 client_kwargs=None, 

1228 writebuffer=None, 

1229 ): 

1230 _initialize_boto3(self, client, client_kwargs, bucket, key) 

1231 

1232 if writebuffer is None: 

1233 self._buf = io.BytesIO() 

1234 elif not writebuffer.seekable(): 

1235 raise ValueError('writebuffer needs to be seekable') 

1236 else: 

1237 self._buf = writebuffer 

1238 

1239 def flush(self): 

1240 pass 

1241 

1242 # 

1243 # Override some methods from io.IOBase. 

1244 # 

1245 def close(self): 

1246 logger.debug("close: called") 

1247 if self.closed: 

1248 return 

1249 

1250 self.seek(0) 

1251 

1252 try: 

1253 self._client.put_object( 

1254 Bucket=self._bucket, 

1255 Key=self._key, 

1256 Body=self._buf, 

1257 ) 

1258 except botocore.client.ClientError as e: 

1259 raise ValueError( 

1260 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e 

1261 else: 

1262 logger.debug("%s: direct upload finished", self) 

1263 finally: 

1264 self._buf.close() 

1265 

1266 @property 

1267 def closed(self): 

1268 return self._buf is None or self._buf.closed 

1269 

1270 def readable(self): 

1271 """Propagate.""" 

1272 return self._buf.readable() 

1273 

1274 def writable(self): 

1275 """Propagate.""" 

1276 return self._buf.writable() 

1277 

1278 def seekable(self): 

1279 """Propagate.""" 

1280 return self._buf.seekable() 

1281 

1282 def seek(self, offset, whence=constants.WHENCE_START): 

1283 """Propagate.""" 

1284 return self._buf.seek(offset, whence) 

1285 

1286 def truncate(self, size=None): 

1287 """Propagate.""" 

1288 return self._buf.truncate(size) 

1289 

1290 def tell(self): 

1291 """Propagate.""" 

1292 return self._buf.tell() 

1293 

1294 def write(self, b): 

1295 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1296 interface implementation) into the buffer. Content of the buffer will be 

1297 written to S3 on close as a single-part upload. 

1298 

1299 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html""" 

1300 return self._buf.write(b) 

1301 

1302 def read(self, size=-1): 

1303 """Propagate.""" 

1304 return self._buf.read(size) 

1305 

1306 def read1(self, size=-1): 

1307 """Propagate.""" 

1308 return self._buf.read1(size) 

1309 

1310 def terminate(self): 

1311 """Close buffer and skip upload.""" 

1312 self._buf.close() 

1313 logger.debug('%s: terminated singlepart upload', self) 

1314 

1315 # 

1316 # Internal methods. 

1317 # 

1318 def __enter__(self): 

1319 return self 

1320 

1321 def __exit__(self, exc_type, exc_val, exc_tb): 

1322 if exc_type is not None: 

1323 self.terminate() 

1324 else: 

1325 self.close() 

1326 

1327 def __str__(self): 

1328 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key) 

1329 

1330 def __repr__(self): 

1331 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) 

1332 

1333 

1334def _accept_all(key): 

1335 return True 

1336 

1337 

1338def iter_bucket( 

1339 bucket_name, 

1340 prefix='', 

1341 accept_key=None, 

1342 key_limit=None, 

1343 workers=16, 

1344 retries=3, 

1345 max_threads_per_fileobj=4, 

1346 **session_kwargs): 

1347 """ 

1348 Iterate and download all S3 objects under `s3://bucket_name/prefix`. 

1349 

1350 Parameters 

1351 ---------- 

1352 bucket_name: str 

1353 The name of the bucket. 

1354 prefix: str, optional 

1355 Limits the iteration to keys starting with the prefix. 

1356 accept_key: callable, optional 

1357 This is a function that accepts a key name (unicode string) and 

1358 returns True/False, signalling whether the given key should be downloaded. 

1359 The default behavior is to accept all keys. 

1360 key_limit: int, optional 

1361 If specified, the iterator will stop after yielding this many results. 

1362 workers: int, optional 

1363 The number of objects to download concurrently. The entire operation uses 

1364 a single ThreadPoolExecutor and shared thread-safe boto3 S3.Client. Default: 16 

1365 retries: int, optional 

1366 The number of time to retry a failed download. Default: 3 

1367 max_threads_per_fileobj: int, optional 

1368 The maximum number of download threads per worker. The maximum size of the 

1369 connection pool will be `workers * max_threads_per_fileobj + 1`. Default: 4 

1370 session_kwargs: dict, optional 

1371 Keyword arguments to pass when creating a new session. 

1372 For a list of available names and values, see: 

1373 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session 

1374 

1375 

1376 Yields 

1377 ------ 

1378 str 

1379 The full key name (does not include the bucket name). 

1380 bytes 

1381 The full contents of the key. 

1382 

1383 Notes 

1384 ----- 

1385 The keys are processed in parallel, using `workers` processes (default: 16), 

1386 to speed up downloads greatly. If multiprocessing is not available, thus 

1387 _MULTIPROCESSING is False, this parameter will be ignored. 

1388 

1389 Examples 

1390 -------- 

1391 

1392 >>> # get all JSON files under "mybucket/foo/" 

1393 >>> for key, content in iter_bucket( 

1394 ... bucket_name, prefix='foo/', 

1395 ... accept_key=lambda key: key.endswith('.json')): 

1396 ... print key, len(content) 

1397 

1398 >>> # limit to 10k files, using 32 parallel workers (default is 16) 

1399 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32): 

1400 ... print key, len(content) 

1401 """ 

1402 if accept_key is None: 

1403 accept_key = _accept_all 

1404 

1405 # 

1406 # If people insist on giving us bucket instances, silently extract the name 

1407 # before moving on. Works for boto3 as well as boto. 

1408 # 

1409 try: 

1410 bucket_name = bucket_name.name 

1411 except AttributeError: 

1412 pass 

1413 

1414 if bucket_name is None: 

1415 raise ValueError('bucket_name may not be None') 

1416 

1417 total_size, key_no = 0, -1 

1418 

1419 # thread-safe client to share across _list_bucket and _download_key calls 

1420 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111 

1421 session = boto3.session.Session(**session_kwargs) 

1422 config = botocore.client.Config( 

1423 max_pool_connections=workers * max_threads_per_fileobj + 1, # 1 thread for _list_bucket 

1424 tcp_keepalive=True, 

1425 retries={"max_attempts": retries * 2, "mode": "adaptive"}, 

1426 ) 

1427 client = session.client('s3', config=config) 

1428 

1429 transfer_config = boto3.s3.transfer.TransferConfig(max_concurrency=max_threads_per_fileobj) 

1430 

1431 key_iterator = _list_bucket( 

1432 bucket_name=bucket_name, 

1433 prefix=prefix, 

1434 accept_key=accept_key, 

1435 client=client, 

1436 ) 

1437 download_key = functools.partial( 

1438 _download_key, 

1439 bucket_name=bucket_name, 

1440 retries=retries, 

1441 client=client, 

1442 transfer_config=transfer_config, 

1443 ) 

1444 

1445 with smart_open.concurrency.create_pool(workers) as pool: 

1446 result_iterator = pool.imap_unordered(download_key, key_iterator) 

1447 key_no = 0 

1448 while True: 

1449 try: 

1450 (key, content) = result_iterator.__next__() 

1451 except StopIteration: 

1452 break 

1453 # Skip deleted objects (404 responses) 

1454 if key is None: 

1455 continue 

1456 if key_no % 1000 == 0: 

1457 logger.info( 

1458 "yielding key #%i: %s, size %i (total %.1fMB)", 

1459 key_no, key, len(content), total_size / 1024.0 ** 2 

1460 ) 

1461 yield key, content 

1462 total_size += len(content) 

1463 if key_limit is not None and key_no + 1 >= key_limit: 

1464 # we were asked to output only a limited number of keys => we're done 

1465 break 

1466 key_no += 1 

1467 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size)) 

1468 

1469 

1470def _list_bucket( 

1471 *, 

1472 bucket_name, 

1473 client, 

1474 prefix='', 

1475 accept_key=lambda k: True, 

1476): 

1477 ctoken = None 

1478 

1479 while True: 

1480 # list_objects_v2 doesn't like a None value for ContinuationToken 

1481 # so we don't set it if we don't have one. 

1482 if ctoken: 

1483 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken) 

1484 else: 

1485 kwargs = dict(Bucket=bucket_name, Prefix=prefix) 

1486 response = client.list_objects_v2(**kwargs) 

1487 try: 

1488 content = response['Contents'] 

1489 except KeyError: 

1490 pass 

1491 else: 

1492 for c in content: 

1493 key = c['Key'] 

1494 if accept_key(key): 

1495 yield key 

1496 ctoken = response.get('NextContinuationToken', None) 

1497 if not ctoken: 

1498 break 

1499 

1500 

1501def _download_key(key_name, *, client, bucket_name, retries, transfer_config): 

1502 # Sometimes, https://github.com/boto/boto/issues/2409 can happen 

1503 # because of network issues on either side. 

1504 # Retry up to 3 times to ensure its not a transient issue. 

1505 for x in range(retries + 1): 

1506 try: 

1507 content_bytes = _download_fileobj( 

1508 client=client, 

1509 bucket_name=bucket_name, 

1510 key_name=key_name, 

1511 transfer_config=transfer_config, 

1512 ) 

1513 except botocore.exceptions.ClientError as err: 

1514 # 

1515 # ignore 404 not found errors: they mean the object was deleted 

1516 # after we listed the contents of the bucket, but before we 

1517 # downloaded the object. 

1518 # 

1519 if 'Error' in err.response and err.response['Error'].get('Code') == '404': 

1520 return None, None 

1521 # Actually fail on last pass through the loop 

1522 if x == retries: 

1523 raise 

1524 # Otherwise, try again, as this might be a transient timeout 

1525 continue 

1526 return key_name, content_bytes 

1527 

1528 

1529def _download_fileobj(*, client, bucket_name, key_name, transfer_config): 

1530 # 

1531 # This is a separate function only because it makes it easier to inject 

1532 # exceptions during tests. 

1533 # 

1534 buf = io.BytesIO() 

1535 client.download_fileobj( 

1536 Bucket=bucket_name, 

1537 Key=key_name, 

1538 Fileobj=buf, 

1539 Config=transfer_config, 

1540 ) 

1541 return buf.getvalue()