Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/s3.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

577 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8"""Implements file-like objects for reading and writing from/to AWS S3.""" 

9from __future__ import annotations 

10 

11import http 

12import io 

13import functools 

14import logging 

15import time 

16import warnings 

17 

18from typing import ( 

19 Callable, 

20 List, 

21 TYPE_CHECKING, 

22) 

23 

24try: 

25 import boto3 

26 import botocore.client 

27 import botocore.exceptions 

28 import urllib3.exceptions 

29except ImportError: 

30 MISSING_DEPS = True 

31 

32import smart_open.bytebuffer 

33import smart_open.concurrency 

34import smart_open.utils 

35 

36from smart_open import constants 

37 

38 

39if TYPE_CHECKING: 

40 from mypy_boto3_s3.client import S3Client 

41 from typing_extensions import Buffer 

42 

43logger = logging.getLogger(__name__) 

44 

45# 

46# AWS puts restrictions on the part size for multipart uploads. 

47# Each part must be more than 5MB, and less than 5GB. 

48# 

49# On top of that, our MultipartWriter has a min_part_size option. 

50# In retrospect, it's an unfortunate name, because it conflicts with the 

51# minimum allowable part size (5MB), but it's too late to change it, because 

52# people are using that parameter (unlike the MIN, DEFAULT, MAX constants). 

53# It really just means "part size": as soon as you have this many bytes, 

54# write a part to S3 (see the MultipartWriter.write method). 

55# 

56 

57MIN_PART_SIZE = 5 * 1024 ** 2 

58"""The absolute minimum permitted by Amazon.""" 

59 

60DEFAULT_PART_SIZE = 50 * 1024**2 

61"""The default part size for S3 multipart uploads, chosen carefully by smart_open""" 

62 

63MAX_PART_SIZE = 5 * 1024 ** 3 

64"""The absolute maximum permitted by Amazon.""" 

65 

66SCHEMES = ("s3", "s3n", 's3u', "s3a") 

67DEFAULT_PORT = 443 

68DEFAULT_HOST = 's3.amazonaws.com' 

69 

70DEFAULT_BUFFER_SIZE = 128 * 1024 

71 

72URI_EXAMPLES = ( 

73 's3://my_bucket/my_key', 

74 's3://my_key:my_secret@my_bucket/my_key', 

75 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key', 

76) 

77 

78# Returned by AWS when we try to seek beyond EOF. 

79_OUT_OF_RANGE = 'InvalidRange' 

80 

81 

82class Retry: 

83 def __init__(self): 

84 self.attempts: int = 6 

85 self.sleep_seconds: int = 10 

86 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError] 

87 self.client_error_codes: List[str] = ['NoSuchUpload'] 

88 

89 def _do(self, fn: Callable): 

90 for attempt in range(self.attempts): 

91 try: 

92 return fn() 

93 except tuple(self.exceptions) as err: 

94 logger.critical( 

95 'Caught non-fatal %s, retrying %d more times', 

96 err, 

97 self.attempts - attempt - 1, 

98 ) 

99 logger.exception(err) 

100 time.sleep(self.sleep_seconds) 

101 except botocore.exceptions.ClientError as err: 

102 error_code = err.response['Error'].get('Code') 

103 if error_code not in self.client_error_codes: 

104 raise 

105 logger.critical( 

106 'Caught non-fatal ClientError (%s), retrying %d more times', 

107 error_code, 

108 self.attempts - attempt - 1, 

109 ) 

110 logger.exception(err) 

111 time.sleep(self.sleep_seconds) 

112 else: 

113 logger.critical('encountered too many non-fatal errors, giving up') 

114 raise IOError('%s failed after %d attempts', fn.func, self.attempts) 

115 

116 

117# 

118# The retry mechanism for this submodule. Client code may modify it, e.g. by 

119# updating RETRY.sleep_seconds and friends. 

120# 

121if 'MISSING_DEPS' not in locals(): 

122 RETRY = Retry() 

123 

124 

125class _ClientWrapper: 

126 """Wraps a client to inject the appropriate keyword args into each method call. 

127 

128 The keyword args are a dictionary keyed by the fully qualified method name. 

129 For example, S3.Client.create_multipart_upload. 

130 

131 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client 

132 

133 This wrapper behaves identically to the client otherwise. 

134 """ 

135 def __init__(self, client, kwargs): 

136 self.client = client 

137 self.kwargs = kwargs 

138 

139 def __getattr__(self, method_name): 

140 method = getattr(self.client, method_name) 

141 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {}) 

142 return functools.partial(method, **kwargs) 

143 

144 

145def parse_uri(uri_as_string): 

146 # 

147 # Restrictions on bucket names and labels: 

148 # 

149 # - Bucket names must be at least 3 and no more than 63 characters long. 

150 # - Bucket names must be a series of one or more labels. 

151 # - Adjacent labels are separated by a single period (.). 

152 # - Bucket names can contain lowercase letters, numbers, and hyphens. 

153 # - Each label must start and end with a lowercase letter or a number. 

154 # 

155 # We use the above as a guide only, and do not perform any validation. We 

156 # let boto3 take care of that for us. 

157 # 

158 split_uri = smart_open.utils.safe_urlsplit(uri_as_string) 

159 assert split_uri.scheme in SCHEMES 

160 

161 port = DEFAULT_PORT 

162 host = DEFAULT_HOST 

163 ordinary_calling_format = False 

164 # 

165 # These defaults tell boto3 to look for credentials elsewhere 

166 # 

167 access_id, access_secret = None, None 

168 

169 # 

170 # Common URI template [secret:key@][host[:port]@]bucket/object 

171 # 

172 # The urlparse function doesn't handle the above schema, so we have to do 

173 # it ourselves. 

174 # 

175 uri = split_uri.netloc + split_uri.path 

176 

177 # 

178 # Attempt to extract edge-case authentication details from the URL. 

179 # 

180 # See: 

181 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/ 

182 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases 

183 # 

184 if '@' in uri: 

185 maybe_auth, rest = uri.split('@', 1) 

186 if ':' in maybe_auth: 

187 maybe_id, maybe_secret = maybe_auth.split(':', 1) 

188 if '/' not in maybe_id: 

189 access_id, access_secret = maybe_id, maybe_secret 

190 uri = rest 

191 

192 head, key_id = uri.split('/', 1) 

193 if '@' in head and ':' in head: 

194 ordinary_calling_format = True 

195 host_port, bucket_id = head.split('@') 

196 host, port = host_port.split(':', 1) 

197 port = int(port) 

198 elif '@' in head: 

199 ordinary_calling_format = True 

200 host, bucket_id = head.split('@') 

201 else: 

202 bucket_id = head 

203 

204 return dict( 

205 scheme=split_uri.scheme, 

206 bucket_id=bucket_id, 

207 key_id=key_id, 

208 port=port, 

209 host=host, 

210 ordinary_calling_format=ordinary_calling_format, 

211 access_id=access_id, 

212 access_secret=access_secret, 

213 ) 

214 

215 

216def _consolidate_params(uri, transport_params): 

217 """Consolidates the parsed Uri with the additional parameters. 

218 

219 This is necessary because the user can pass some of the parameters can in 

220 two different ways: 

221 

222 1) Via the URI itself 

223 2) Via the transport parameters 

224 

225 These are not mutually exclusive, but we have to pick one over the other 

226 in a sensible way in order to proceed. 

227 

228 """ 

229 transport_params = dict(transport_params) 

230 

231 def inject(**kwargs): 

232 try: 

233 client_kwargs = transport_params['client_kwargs'] 

234 except KeyError: 

235 client_kwargs = transport_params['client_kwargs'] = {} 

236 

237 try: 

238 init_kwargs = client_kwargs['S3.Client'] 

239 except KeyError: 

240 init_kwargs = client_kwargs['S3.Client'] = {} 

241 

242 init_kwargs.update(**kwargs) 

243 

244 client = transport_params.get('client') 

245 if client is not None and (uri['access_id'] or uri['access_secret']): 

246 logger.warning( 

247 'ignoring credentials parsed from URL because they conflict with ' 

248 'transport_params["client"]. Set transport_params["client"] to None ' 

249 'to suppress this warning.' 

250 ) 

251 uri.update(access_id=None, access_secret=None) 

252 elif (uri['access_id'] and uri['access_secret']): 

253 inject( 

254 aws_access_key_id=uri['access_id'], 

255 aws_secret_access_key=uri['access_secret'], 

256 ) 

257 uri.update(access_id=None, access_secret=None) 

258 

259 if client is not None and uri['host'] != DEFAULT_HOST: 

260 logger.warning( 

261 'ignoring endpoint_url parsed from URL because they conflict with ' 

262 'transport_params["client"]. Set transport_params["client"] to None ' 

263 'to suppress this warning.' 

264 ) 

265 uri.update(host=None) 

266 elif uri['host'] != DEFAULT_HOST: 

267 if uri['scheme'] == 's3u': 

268 scheme = 'http' 

269 else: 

270 scheme = 'https' 

271 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri) 

272 uri.update(host=None) 

273 

274 return uri, transport_params 

275 

276 

277def open_uri(uri, mode, transport_params): 

278 deprecated = ( 

279 'multipart_upload_kwargs', 

280 'object_kwargs', 

281 'resource', 

282 'resource_kwargs', 

283 'session', 

284 'singlepart_upload_kwargs', 

285 ) 

286 detected = [k for k in deprecated if k in transport_params] 

287 if detected: 

288 doc_url = ( 

289 'https://github.com/piskvorky/smart_open/blob/develop/' 

290 'MIGRATING_FROM_OLDER_VERSIONS.rst' 

291 ) 

292 # 

293 # We use warnings.warn /w UserWarning instead of logger.warn here because 

294 # 

295 # 1) Not everyone has logging enabled; and 

296 # 2) check_kwargs (below) already uses logger.warn with a similar message 

297 # 

298 # https://github.com/piskvorky/smart_open/issues/614 

299 # 

300 message = ( 

301 'ignoring the following deprecated transport parameters: %r. ' 

302 'See <%s> for details' % (detected, doc_url) 

303 ) 

304 warnings.warn(message, UserWarning) 

305 parsed_uri = parse_uri(uri) 

306 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params) 

307 kwargs = smart_open.utils.check_kwargs(open, transport_params) 

308 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs) 

309 

310 

311def open( 

312 bucket_id, 

313 key_id, 

314 mode, 

315 version_id=None, 

316 buffer_size=DEFAULT_BUFFER_SIZE, 

317 min_part_size=DEFAULT_PART_SIZE, 

318 multipart_upload=True, 

319 defer_seek=False, 

320 client=None, 

321 client_kwargs=None, 

322 writebuffer=None, 

323): 

324 """Open an S3 object for reading or writing. 

325 

326 Parameters 

327 ---------- 

328 bucket_id: str 

329 The name of the bucket this object resides in. 

330 key_id: str 

331 The name of the key within the bucket. 

332 mode: str 

333 The mode for opening the object. Must be either "rb" or "wb". 

334 buffer_size: int, optional 

335 The buffer size to use when performing I/O. 

336 min_part_size: int, optional 

337 The minimum part size for multipart uploads, in bytes. 

338 When the writebuffer contains this many bytes, smart_open will upload 

339 the bytes to S3 as a single part of a multi-part upload, freeing the 

340 buffer either partially or entirely. When you close the writer, it 

341 will assemble the parts together. 

342 The value determines the upper limit for the writebuffer. If buffer 

343 space is short (e.g. you are buffering to memory), then use a smaller 

344 value for min_part_size, or consider buffering to disk instead (see 

345 the writebuffer option). 

346 The value must be between 5MB and 5GB. If you specify a value outside 

347 of this range, smart_open will adjust it for you, because otherwise the 

348 upload _will_ fail. 

349 For writing only. Does not apply if you set multipart_upload=False. 

350 multipart_upload: bool, optional 

351 Default: `True` 

352 If set to `True`, will use multipart upload for writing to S3. If set 

353 to `False`, S3 upload will use the S3 Single-Part Upload API, which 

354 is more ideal for small file sizes. 

355 For writing only. 

356 version_id: str, optional 

357 Version of the object, used when reading object. 

358 If None, will fetch the most recent version. 

359 defer_seek: boolean, optional 

360 Default: `False` 

361 If set to `True` on a file opened for reading, GetObject will not be 

362 called until the first seek() or read(). 

363 Avoids redundant API queries when seeking before reading. 

364 client: object, optional 

365 The S3 client to use when working with boto3. 

366 If you don't specify this, then smart_open will create a new client for you. 

367 client_kwargs: dict, optional 

368 Additional parameters to pass to the relevant functions of the client. 

369 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. 

370 The values are kwargs to pass to that method each time it is called. 

371 writebuffer: IO[bytes], optional 

372 By default, this module will buffer data in memory using io.BytesIO 

373 when writing. Pass another binary IO instance here to use it instead. 

374 For example, you may pass a file object to buffer to local disk instead 

375 of in RAM. Use this to keep RAM usage low at the expense of additional 

376 disk IO. If you pass in an open file, then you are responsible for 

377 cleaning it up after writing completes. 

378 """ 

379 logger.debug('%r', locals()) 

380 if mode not in constants.BINARY_MODES: 

381 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES)) 

382 

383 if (mode == constants.WRITE_BINARY) and (version_id is not None): 

384 raise ValueError("version_id must be None when writing") 

385 

386 if mode == constants.READ_BINARY: 

387 fileobj = Reader( 

388 bucket_id, 

389 key_id, 

390 version_id=version_id, 

391 buffer_size=buffer_size, 

392 defer_seek=defer_seek, 

393 client=client, 

394 client_kwargs=client_kwargs, 

395 ) 

396 elif mode == constants.WRITE_BINARY: 

397 if multipart_upload: 

398 fileobj = MultipartWriter( 

399 bucket_id, 

400 key_id, 

401 client=client, 

402 client_kwargs=client_kwargs, 

403 writebuffer=writebuffer, 

404 part_size=min_part_size, 

405 ) 

406 else: 

407 fileobj = SinglepartWriter( 

408 bucket_id, 

409 key_id, 

410 client=client, 

411 client_kwargs=client_kwargs, 

412 writebuffer=writebuffer, 

413 ) 

414 else: 

415 assert False, 'unexpected mode: %r' % mode 

416 

417 fileobj.name = key_id 

418 return fileobj 

419 

420 

421def _get(client, bucket, key, version, range_string): 

422 try: 

423 params = dict(Bucket=bucket, Key=key) 

424 if version: 

425 params["VersionId"] = version 

426 if range_string: 

427 params["Range"] = range_string 

428 

429 return client.get_object(**params) 

430 except botocore.client.ClientError as error: 

431 wrapped_error = IOError( 

432 'unable to access bucket: %r key: %r version: %r error: %s' % ( 

433 bucket, key, version, error 

434 ) 

435 ) 

436 wrapped_error.backend_error = error 

437 raise wrapped_error from error 

438 

439 

440def _unwrap_ioerror(ioe): 

441 """Given an IOError from _get, return the 'Error' dictionary from boto.""" 

442 try: 

443 return ioe.backend_error.response['Error'] 

444 except (AttributeError, KeyError): 

445 return None 

446 

447 

448class _SeekableRawReader(object): 

449 """Read an S3 object. 

450 

451 This class is internal to the S3 submodule. 

452 """ 

453 

454 def __init__( 

455 self, 

456 client, 

457 bucket, 

458 key, 

459 version_id=None, 

460 ): 

461 self._client = client 

462 self._bucket = bucket 

463 self._key = key 

464 self._version_id = version_id 

465 

466 self._content_length = None 

467 self._position = 0 

468 self._body = None 

469 

470 def seek(self, offset, whence=constants.WHENCE_START): 

471 """Seek to the specified position. 

472 

473 :param int offset: The offset in bytes. 

474 :param int whence: Where the offset is from. 

475 

476 :returns: the position after seeking. 

477 :rtype: int 

478 """ 

479 if whence not in constants.WHENCE_CHOICES: 

480 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES) 

481 

482 # 

483 # Close old body explicitly. 

484 # When first seek() after __init__(), self._body is not exist. 

485 # 

486 if self._body is not None: 

487 self._body.close() 

488 self._body = None 

489 

490 start = None 

491 stop = None 

492 if whence == constants.WHENCE_START: 

493 start = max(0, offset) 

494 elif whence == constants.WHENCE_CURRENT: 

495 start = max(0, offset + self._position) 

496 else: 

497 stop = max(0, -offset) 

498 

499 # 

500 # If we can figure out that we've read past the EOF, then we can save 

501 # an extra API call. 

502 # 

503 if self._content_length is None: 

504 reached_eof = False 

505 elif start is not None and start >= self._content_length: 

506 reached_eof = True 

507 elif stop == 0: 

508 reached_eof = True 

509 else: 

510 reached_eof = False 

511 

512 if reached_eof: 

513 self._body = io.BytesIO() 

514 self._position = self._content_length 

515 else: 

516 self._open_body(start, stop) 

517 

518 return self._position 

519 

520 def _open_body(self, start=None, stop=None): 

521 """Open a connection to download the specified range of bytes. Store 

522 the open file handle in self._body. 

523 

524 If no range is specified, start defaults to self._position. 

525 start and stop follow the semantics of the http range header, 

526 so a stop without a start will read bytes beginning at stop. 

527 

528 As a side effect, set self._content_length. Set self._position 

529 to self._content_length if start is past end of file. 

530 """ 

531 if start is None and stop is None: 

532 start = self._position 

533 range_string = smart_open.utils.make_range_string(start, stop) 

534 

535 try: 

536 # Optimistically try to fetch the requested content range. 

537 response = _get( 

538 self._client, 

539 self._bucket, 

540 self._key, 

541 self._version_id, 

542 range_string, 

543 ) 

544 except IOError as ioe: 

545 # Handle requested content range exceeding content size. 

546 error_response = _unwrap_ioerror(ioe) 

547 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE: 

548 raise 

549 try: 

550 self._position = self._content_length = int(error_response['ActualObjectSize']) 

551 self._body = io.BytesIO() 

552 except KeyError: 

553 response = _get( 

554 self._client, 

555 self._bucket, 

556 self._key, 

557 self._version_id, 

558 None, 

559 ) 

560 self._position = self._content_length = response["ContentLength"] 

561 self._body = response["Body"] 

562 else: 

563 # 

564 # Keep track of how many times boto3's built-in retry mechanism 

565 # activated. 

566 # 

567 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response 

568 # 

569 logger.debug( 

570 '%s: RetryAttempts: %d', 

571 self, 

572 response['ResponseMetadata']['RetryAttempts'], 

573 ) 

574 # 

575 # range request may not always return partial content, see: 

576 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses 

577 # 

578 status_code = response['ResponseMetadata']['HTTPStatusCode'] 

579 if status_code == http.HTTPStatus.PARTIAL_CONTENT: 

580 _, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange']) 

581 self._position = start 

582 elif status_code == http.HTTPStatus.OK: 

583 length = response["ContentLength"] 

584 self._content_length = length 

585 self._body = response['Body'] 

586 

587 def read(self, size=-1): 

588 """Read from the continuous connection with the remote peer.""" 

589 if self._body is None: 

590 # This is necessary for the very first read() after __init__(). 

591 self._open_body() 

592 if self._position >= self._content_length: 

593 return b'' 

594 

595 # 

596 # Boto3 has built-in error handling and retry mechanisms: 

597 # 

598 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html 

599 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

600 # 

601 # Unfortunately, it isn't always enough. There is still a non-zero 

602 # possibility that an exception will slip past these mechanisms and 

603 # terminate the read prematurely. Luckily, at this stage, it's very 

604 # simple to recover from the problem: wait a little bit, reopen the 

605 # HTTP connection and try again. Usually, a single retry attempt is 

606 # enough to recover, but we try multiple times "just in case". 

607 # 

608 for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1): 

609 try: 

610 if size == -1: 

611 binary = self._body.read() 

612 else: 

613 binary = self._body.read(size) 

614 except ( 

615 ConnectionResetError, 

616 botocore.exceptions.BotoCoreError, 

617 urllib3.exceptions.HTTPError, 

618 ) as err: 

619 logger.warning( 

620 '%s: caught %r while reading %d bytes, sleeping %ds before retry', 

621 self, 

622 err, 

623 size, 

624 seconds, 

625 ) 

626 time.sleep(seconds) 

627 self._open_body() 

628 else: 

629 self._position += len(binary) 

630 return binary 

631 

632 raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt)) 

633 

634 def __str__(self): 

635 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) 

636 

637 

638def _initialize_boto3(rw, client, client_kwargs, bucket, key): 

639 """Created the required objects for accessing S3. Ideally, they have 

640 been already created for us and we can just reuse them.""" 

641 if client_kwargs is None: 

642 client_kwargs = {} 

643 

644 if client is None: 

645 init_kwargs = client_kwargs.get('S3.Client', {}) 

646 client = boto3.client('s3', **init_kwargs) 

647 assert client 

648 

649 rw._client = _ClientWrapper(client, client_kwargs) 

650 rw._bucket = bucket 

651 rw._key = key 

652 

653 

654class Reader(io.BufferedIOBase): 

655 """Reads bytes from S3. 

656 

657 Implements the io.BufferedIOBase interface of the standard library.""" 

658 

659 def __init__( 

660 self, 

661 bucket, 

662 key, 

663 version_id=None, 

664 buffer_size=DEFAULT_BUFFER_SIZE, 

665 line_terminator=constants.BINARY_NEWLINE, 

666 defer_seek=False, 

667 client=None, 

668 client_kwargs=None, 

669 ): 

670 self._version_id = version_id 

671 self._buffer_size = buffer_size 

672 

673 _initialize_boto3(self, client, client_kwargs, bucket, key) 

674 

675 self._raw_reader = _SeekableRawReader( 

676 self._client, 

677 bucket, 

678 key, 

679 self._version_id, 

680 ) 

681 self._current_pos = 0 

682 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) 

683 self._eof = False 

684 self._line_terminator = line_terminator 

685 self._seek_initialized = False 

686 

687 # 

688 # This member is part of the io.BufferedIOBase interface. 

689 # 

690 self.raw = None 

691 

692 if not defer_seek: 

693 self.seek(0) 

694 

695 # 

696 # io.BufferedIOBase methods. 

697 # 

698 

699 def close(self): 

700 """Flush and close this stream.""" 

701 logger.debug("close: called") 

702 pass 

703 

704 def readable(self): 

705 """Return True if the stream can be read from.""" 

706 return True 

707 

708 def read(self, size=-1): 

709 """Read up to size bytes from the object and return them.""" 

710 if size == 0: 

711 return b'' 

712 elif size < 0: 

713 # call read() before setting _current_pos to make sure _content_length is set 

714 out = self._read_from_buffer() + self._raw_reader.read() 

715 self._current_pos = self._raw_reader._content_length 

716 return out 

717 

718 # 

719 # Return unused data first 

720 # 

721 if len(self._buffer) >= size: 

722 return self._read_from_buffer(size) 

723 

724 # 

725 # If the stream is finished, return what we have. 

726 # 

727 if self._eof: 

728 return self._read_from_buffer() 

729 

730 self._fill_buffer(size) 

731 return self._read_from_buffer(size) 

732 

733 def read1(self, size=-1): 

734 """This is the same as read().""" 

735 return self.read(size=size) 

736 

737 def readinto(self, b): 

738 """Read up to len(b) bytes into b, and return the number of bytes 

739 read.""" 

740 data = self.read(len(b)) 

741 if not data: 

742 return 0 

743 b[:len(data)] = data 

744 return len(data) 

745 

746 def readline(self, limit=-1): 

747 """Read up to and including the next newline. Returns the bytes read.""" 

748 if limit != -1: 

749 raise NotImplementedError('limits other than -1 not implemented yet') 

750 

751 # 

752 # A single line may span multiple buffers. 

753 # 

754 line = io.BytesIO() 

755 while not (self._eof and len(self._buffer) == 0): 

756 line_part = self._buffer.readline(self._line_terminator) 

757 line.write(line_part) 

758 self._current_pos += len(line_part) 

759 

760 if line_part.endswith(self._line_terminator): 

761 break 

762 else: 

763 self._fill_buffer() 

764 

765 return line.getvalue() 

766 

767 def seekable(self): 

768 """If False, seek(), tell() and truncate() will raise IOError. 

769 

770 We offer only seek support, and no truncate support.""" 

771 return True 

772 

773 def seek(self, offset, whence=constants.WHENCE_START): 

774 """Seek to the specified position. 

775 

776 :param int offset: The offset in bytes. 

777 :param int whence: Where the offset is from. 

778 

779 Returns the position after seeking.""" 

780 # Convert relative offset to absolute, since self._raw_reader 

781 # doesn't know our current position. 

782 if whence == constants.WHENCE_CURRENT: 

783 whence = constants.WHENCE_START 

784 offset += self._current_pos 

785 

786 if not self._seek_initialized or not ( 

787 whence == constants.WHENCE_START and offset == self._current_pos 

788 ): 

789 self._current_pos = self._raw_reader.seek(offset, whence) 

790 

791 self._buffer.empty() 

792 

793 self._eof = self._current_pos == self._raw_reader._content_length 

794 

795 self._seek_initialized = True 

796 return self._current_pos 

797 

798 def tell(self): 

799 """Return the current position within the file.""" 

800 return self._current_pos 

801 

802 def truncate(self, size=None): 

803 """Unsupported.""" 

804 raise io.UnsupportedOperation 

805 

806 def detach(self): 

807 """Unsupported.""" 

808 raise io.UnsupportedOperation 

809 

810 def terminate(self): 

811 """Do nothing.""" 

812 pass 

813 

814 def to_boto3(self, resource): 

815 """Create an **independent** `boto3.s3.Object` instance that points to 

816 the same S3 object as this instance. 

817 Changes to the returned object will not affect the current instance. 

818 """ 

819 assert resource, 'resource must be a boto3.resource instance' 

820 obj = resource.Object(self._bucket, self._key) 

821 if self._version_id is not None: 

822 return obj.Version(self._version_id) 

823 else: 

824 return obj 

825 

826 # 

827 # Internal methods. 

828 # 

829 def _read_from_buffer(self, size=-1): 

830 """Remove at most size bytes from our buffer and return them.""" 

831 size = size if size >= 0 else len(self._buffer) 

832 part = self._buffer.read(size) 

833 self._current_pos += len(part) 

834 return part 

835 

836 def _fill_buffer(self, size=-1): 

837 size = max(size, self._buffer._chunk_size) 

838 while len(self._buffer) < size and not self._eof: 

839 bytes_read = self._buffer.fill(self._raw_reader) 

840 if bytes_read == 0: 

841 logger.debug('%s: reached EOF while filling buffer', self) 

842 self._eof = True 

843 

844 def __str__(self): 

845 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key) 

846 

847 def __repr__(self): 

848 return ( 

849 "smart_open.s3.Reader(" 

850 "bucket=%r, " 

851 "key=%r, " 

852 "version_id=%r, " 

853 "buffer_size=%r, " 

854 "line_terminator=%r)" 

855 ) % ( 

856 self._bucket, 

857 self._key, 

858 self._version_id, 

859 self._buffer_size, 

860 self._line_terminator, 

861 ) 

862 

863 

864class MultipartWriter(io.BufferedIOBase): 

865 """Writes bytes to S3 using the multi part API. 

866 

867 Implements the io.BufferedIOBase interface of the standard library.""" 

868 _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called 

869 

870 def __init__( 

871 self, 

872 bucket, 

873 key, 

874 part_size=DEFAULT_PART_SIZE, 

875 client=None, 

876 client_kwargs=None, 

877 writebuffer: io.BytesIO | None = None, 

878 ): 

879 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE) 

880 if part_size != adjusted_ps: 

881 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}") 

882 part_size = adjusted_ps 

883 self._part_size = part_size 

884 

885 _initialize_boto3(self, client, client_kwargs, bucket, key) 

886 self._client: S3Client 

887 self._bucket: str 

888 self._key: str 

889 

890 try: 

891 partial = functools.partial( 

892 self._client.create_multipart_upload, 

893 Bucket=bucket, 

894 Key=key, 

895 ) 

896 self._upload_id = RETRY._do(partial)['UploadId'] 

897 except botocore.client.ClientError as error: 

898 raise ValueError( 

899 'the bucket %r does not exist, or is forbidden for access (%r)' % ( 

900 bucket, error 

901 ) 

902 ) from error 

903 

904 if writebuffer is None: 

905 self._buf = io.BytesIO() 

906 else: 

907 self._buf = writebuffer 

908 

909 self._total_bytes = 0 

910 self._total_parts = 0 

911 self._parts: list[dict[str, object]] = [] 

912 

913 # 

914 # This member is part of the io.BufferedIOBase interface. 

915 # 

916 self.raw = None # type: ignore[assignment] 

917 

918 def flush(self): 

919 pass 

920 

921 # 

922 # Override some methods from io.IOBase. 

923 # 

924 def close(self): 

925 logger.debug("close: called") 

926 if self.closed: 

927 return 

928 

929 if self._buf.tell(): 

930 self._upload_next_part() 

931 

932 logger.debug('%s: completing multipart upload', self) 

933 if self._total_bytes and self._upload_id: 

934 partial = functools.partial( 

935 self._client.complete_multipart_upload, 

936 Bucket=self._bucket, 

937 Key=self._key, 

938 UploadId=self._upload_id, 

939 MultipartUpload={'Parts': self._parts}, 

940 ) 

941 RETRY._do(partial) 

942 logger.debug('%s: completed multipart upload', self) 

943 elif self._upload_id: 

944 # 

945 # AWS complains with "The XML you provided was not well-formed or 

946 # did not validate against our published schema" when the input is 

947 # completely empty => abort the upload, no file created. 

948 # 

949 # We work around this by creating an empty file explicitly. 

950 # 

951 self._client.abort_multipart_upload( 

952 Bucket=self._bucket, 

953 Key=self._key, 

954 UploadId=self._upload_id, 

955 ) 

956 self._client.put_object( 

957 Bucket=self._bucket, 

958 Key=self._key, 

959 Body=b'', 

960 ) 

961 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) 

962 self._upload_id = None 

963 

964 @property 

965 def closed(self): 

966 return self._upload_id is None 

967 

968 def writable(self): 

969 """Return True if the stream supports writing.""" 

970 return True 

971 

972 def seekable(self): 

973 """If False, seek(), tell() and truncate() will raise IOError. 

974 

975 We offer only tell support, and no seek or truncate support.""" 

976 return True 

977 

978 def seek(self, offset, whence=constants.WHENCE_START): 

979 """Unsupported.""" 

980 raise io.UnsupportedOperation 

981 

982 def truncate(self, size=None): 

983 """Unsupported.""" 

984 raise io.UnsupportedOperation 

985 

986 def tell(self): 

987 """Return the current stream position.""" 

988 return self._total_bytes 

989 

990 # 

991 # io.BufferedIOBase methods. 

992 # 

993 def detach(self): 

994 raise io.UnsupportedOperation("detach() not supported") 

995 

996 def write(self, b: Buffer) -> int: 

997 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

998 interface implementation) to the S3 file. 

999 

1000 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html 

1001 

1002 There's buffering happening under the covers, so this may not actually 

1003 do any HTTP transfer right away.""" 

1004 offset = 0 

1005 mv = memoryview(b) 

1006 self._total_bytes += len(mv) 

1007 

1008 # 

1009 # botocore does not accept memoryview, otherwise we could've gotten 

1010 # away with not needing to write a copy to the buffer aside from cases 

1011 # where b is smaller than part_size 

1012 # 

1013 while offset < len(mv): 

1014 start = offset 

1015 end = offset + self._part_size - self._buf.tell() 

1016 self._buf.write(mv[start:end]) 

1017 if self._buf.tell() < self._part_size: 

1018 # 

1019 # Not enough data to write a new part just yet. The assert 

1020 # ensures that we've consumed all of the input buffer. 

1021 # 

1022 assert end >= len(mv) 

1023 return len(mv) 

1024 

1025 self._upload_next_part() 

1026 offset = end 

1027 return len(mv) 

1028 

1029 def terminate(self): 

1030 """Cancel the underlying multipart upload.""" 

1031 if self.closed: 

1032 return 

1033 logger.debug('%s: terminating multipart upload', self) 

1034 self._client.abort_multipart_upload( 

1035 Bucket=self._bucket, 

1036 Key=self._key, 

1037 UploadId=self._upload_id, 

1038 ) 

1039 self._upload_id = None 

1040 logger.debug('%s: terminated multipart upload', self) 

1041 

1042 def to_boto3(self, resource): 

1043 """Create an **independent** `boto3.s3.Object` instance that points to 

1044 the same S3 object as this instance. 

1045 Changes to the returned object will not affect the current instance. 

1046 """ 

1047 assert resource, 'resource must be a boto3.resource instance' 

1048 return resource.Object(self._bucket, self._key) 

1049 

1050 # 

1051 # Internal methods. 

1052 # 

1053 def _upload_next_part(self) -> None: 

1054 part_num = self._total_parts + 1 

1055 logger.info( 

1056 "%s: uploading part_num: %i, %i bytes (total %.3fGB)", 

1057 self, 

1058 part_num, 

1059 self._buf.tell(), 

1060 self._total_bytes / 1024.0 ** 3, 

1061 ) 

1062 self._buf.seek(0) 

1063 

1064 # 

1065 # Network problems in the middle of an upload are particularly 

1066 # troublesome. We don't want to abort the entire upload just because 

1067 # of a temporary connection problem, so this part needs to be 

1068 # especially robust. 

1069 # 

1070 upload = RETRY._do( 

1071 functools.partial( 

1072 self._client.upload_part, 

1073 Bucket=self._bucket, 

1074 Key=self._key, 

1075 UploadId=self._upload_id, 

1076 PartNumber=part_num, 

1077 Body=self._buf, 

1078 ) 

1079 ) 

1080 

1081 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) 

1082 logger.debug("%s: upload of part_num #%i finished", self, part_num) 

1083 

1084 self._total_parts += 1 

1085 

1086 self._buf.seek(0) 

1087 self._buf.truncate(0) 

1088 

1089 def __enter__(self): 

1090 return self 

1091 

1092 def __exit__(self, exc_type, exc_val, exc_tb): 

1093 if exc_type is not None: 

1094 self.terminate() 

1095 else: 

1096 self.close() 

1097 

1098 def __str__(self): 

1099 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key) 

1100 

1101 def __repr__(self): 

1102 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % ( 

1103 self._bucket, 

1104 self._key, 

1105 self._part_size, 

1106 ) 

1107 

1108 

1109class SinglepartWriter(io.BufferedIOBase): 

1110 """Writes bytes to S3 using the single part API. 

1111 

1112 Implements the io.BufferedIOBase interface of the standard library. 

1113 

1114 This class buffers all of its input in memory until its `close` method is called. Only then will 

1115 the data be written to S3 and the buffer is released.""" 

1116 _buf = None # so `closed` property works in case __init__ fails and __del__ is called 

1117 

1118 def __init__( 

1119 self, 

1120 bucket, 

1121 key, 

1122 client=None, 

1123 client_kwargs=None, 

1124 writebuffer=None, 

1125 ): 

1126 _initialize_boto3(self, client, client_kwargs, bucket, key) 

1127 

1128 if writebuffer is None: 

1129 self._buf = io.BytesIO() 

1130 elif not writebuffer.seekable(): 

1131 raise ValueError('writebuffer needs to be seekable') 

1132 else: 

1133 self._buf = writebuffer 

1134 

1135 def flush(self): 

1136 pass 

1137 

1138 # 

1139 # Override some methods from io.IOBase. 

1140 # 

1141 def close(self): 

1142 logger.debug("close: called") 

1143 if self.closed: 

1144 return 

1145 

1146 self.seek(0) 

1147 

1148 try: 

1149 self._client.put_object( 

1150 Bucket=self._bucket, 

1151 Key=self._key, 

1152 Body=self._buf, 

1153 ) 

1154 except botocore.client.ClientError as e: 

1155 raise ValueError( 

1156 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e 

1157 

1158 logger.debug("%s: direct upload finished", self) 

1159 self._buf.close() 

1160 

1161 @property 

1162 def closed(self): 

1163 return self._buf is None or self._buf.closed 

1164 

1165 def readable(self): 

1166 """Propagate.""" 

1167 return self._buf.readable() 

1168 

1169 def writable(self): 

1170 """Propagate.""" 

1171 return self._buf.writable() 

1172 

1173 def seekable(self): 

1174 """Propagate.""" 

1175 return self._buf.seekable() 

1176 

1177 def seek(self, offset, whence=constants.WHENCE_START): 

1178 """Propagate.""" 

1179 return self._buf.seek(offset, whence) 

1180 

1181 def truncate(self, size=None): 

1182 """Propagate.""" 

1183 return self._buf.truncate(size) 

1184 

1185 def tell(self): 

1186 """Propagate.""" 

1187 return self._buf.tell() 

1188 

1189 def write(self, b): 

1190 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1191 interface implementation) into the buffer. Content of the buffer will be 

1192 written to S3 on close as a single-part upload. 

1193 

1194 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html""" 

1195 return self._buf.write(b) 

1196 

1197 def read(self, size=-1): 

1198 """Propagate.""" 

1199 return self._buf.read(size) 

1200 

1201 def read1(self, size=-1): 

1202 """Propagate.""" 

1203 return self._buf.read1(size) 

1204 

1205 def terminate(self): 

1206 """Close buffer and skip upload.""" 

1207 self._buf.close() 

1208 logger.debug('%s: terminated singlepart upload', self) 

1209 

1210 # 

1211 # Internal methods. 

1212 # 

1213 def __enter__(self): 

1214 return self 

1215 

1216 def __exit__(self, exc_type, exc_val, exc_tb): 

1217 if exc_type is not None: 

1218 self.terminate() 

1219 else: 

1220 self.close() 

1221 

1222 def __str__(self): 

1223 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key) 

1224 

1225 def __repr__(self): 

1226 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) 

1227 

1228 

1229def _accept_all(key): 

1230 return True 

1231 

1232 

1233def iter_bucket( 

1234 bucket_name, 

1235 prefix='', 

1236 accept_key=None, 

1237 key_limit=None, 

1238 workers=16, 

1239 retries=3, 

1240 **session_kwargs): 

1241 """ 

1242 Iterate and download all S3 objects under `s3://bucket_name/prefix`. 

1243 

1244 Parameters 

1245 ---------- 

1246 bucket_name: str 

1247 The name of the bucket. 

1248 prefix: str, optional 

1249 Limits the iteration to keys starting with the prefix. 

1250 accept_key: callable, optional 

1251 This is a function that accepts a key name (unicode string) and 

1252 returns True/False, signalling whether the given key should be downloaded. 

1253 The default behavior is to accept all keys. 

1254 key_limit: int, optional 

1255 If specified, the iterator will stop after yielding this many results. 

1256 workers: int, optional 

1257 The number of subprocesses to use. 

1258 retries: int, optional 

1259 The number of time to retry a failed download. 

1260 session_kwargs: dict, optional 

1261 Keyword arguments to pass when creating a new session. 

1262 For a list of available names and values, see: 

1263 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session 

1264 

1265 

1266 Yields 

1267 ------ 

1268 str 

1269 The full key name (does not include the bucket name). 

1270 bytes 

1271 The full contents of the key. 

1272 

1273 Notes 

1274 ----- 

1275 The keys are processed in parallel, using `workers` processes (default: 16), 

1276 to speed up downloads greatly. If multiprocessing is not available, thus 

1277 _MULTIPROCESSING is False, this parameter will be ignored. 

1278 

1279 Examples 

1280 -------- 

1281 

1282 >>> # get all JSON files under "mybucket/foo/" 

1283 >>> for key, content in iter_bucket( 

1284 ... bucket_name, prefix='foo/', 

1285 ... accept_key=lambda key: key.endswith('.json')): 

1286 ... print key, len(content) 

1287 

1288 >>> # limit to 10k files, using 32 parallel workers (default is 16) 

1289 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32): 

1290 ... print key, len(content) 

1291 """ 

1292 if accept_key is None: 

1293 accept_key = _accept_all 

1294 

1295 # 

1296 # If people insist on giving us bucket instances, silently extract the name 

1297 # before moving on. Works for boto3 as well as boto. 

1298 # 

1299 try: 

1300 bucket_name = bucket_name.name 

1301 except AttributeError: 

1302 pass 

1303 

1304 total_size, key_no = 0, -1 

1305 key_iterator = _list_bucket( 

1306 bucket_name, 

1307 prefix=prefix, 

1308 accept_key=accept_key, 

1309 **session_kwargs) 

1310 download_key = functools.partial( 

1311 _download_key, 

1312 bucket_name=bucket_name, 

1313 retries=retries, 

1314 **session_kwargs) 

1315 

1316 with smart_open.concurrency.create_pool(processes=workers) as pool: 

1317 result_iterator = pool.imap_unordered(download_key, key_iterator) 

1318 key_no = 0 

1319 while True: 

1320 try: 

1321 (key, content) = result_iterator.__next__() 

1322 if key_no % 1000 == 0: 

1323 logger.info( 

1324 "yielding key #%i: %s, size %i (total %.1fMB)", 

1325 key_no, key, len(content), total_size / 1024.0 ** 2 

1326 ) 

1327 yield key, content 

1328 total_size += len(content) 

1329 if key_limit is not None and key_no + 1 >= key_limit: 

1330 # we were asked to output only a limited number of keys => we're done 

1331 break 

1332 except botocore.exceptions.ClientError as err: 

1333 # 

1334 # ignore 404 not found errors: they mean the object was deleted 

1335 # after we listed the contents of the bucket, but before we 

1336 # downloaded the object. 

1337 # 

1338 if not ('Error' in err.response and err.response['Error'].get('Code') == '404'): 

1339 raise err 

1340 except StopIteration: 

1341 break 

1342 key_no += 1 

1343 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size)) 

1344 

1345 

1346def _list_bucket( 

1347 bucket_name, 

1348 prefix='', 

1349 accept_key=lambda k: True, 

1350 **session_kwargs): 

1351 session = boto3.session.Session(**session_kwargs) 

1352 client = session.client('s3') 

1353 ctoken = None 

1354 

1355 while True: 

1356 # list_objects_v2 doesn't like a None value for ContinuationToken 

1357 # so we don't set it if we don't have one. 

1358 if ctoken: 

1359 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken) 

1360 else: 

1361 kwargs = dict(Bucket=bucket_name, Prefix=prefix) 

1362 response = client.list_objects_v2(**kwargs) 

1363 try: 

1364 content = response['Contents'] 

1365 except KeyError: 

1366 pass 

1367 else: 

1368 for c in content: 

1369 key = c['Key'] 

1370 if accept_key(key): 

1371 yield key 

1372 ctoken = response.get('NextContinuationToken', None) 

1373 if not ctoken: 

1374 break 

1375 

1376 

1377def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs): 

1378 if bucket_name is None: 

1379 raise ValueError('bucket_name may not be None') 

1380 

1381 # 

1382 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-or-multiprocessing-with-resources 

1383 # 

1384 session = boto3.session.Session(**session_kwargs) 

1385 s3 = session.resource('s3') 

1386 bucket = s3.Bucket(bucket_name) 

1387 

1388 # Sometimes, https://github.com/boto/boto/issues/2409 can happen 

1389 # because of network issues on either side. 

1390 # Retry up to 3 times to ensure its not a transient issue. 

1391 for x in range(retries + 1): 

1392 try: 

1393 content_bytes = _download_fileobj(bucket, key_name) 

1394 except botocore.client.ClientError: 

1395 # Actually fail on last pass through the loop 

1396 if x == retries: 

1397 raise 

1398 # Otherwise, try again, as this might be a transient timeout 

1399 pass 

1400 else: 

1401 return key_name, content_bytes 

1402 

1403 

1404def _download_fileobj(bucket, key_name): 

1405 # 

1406 # This is a separate function only because it makes it easier to inject 

1407 # exceptions during tests. 

1408 # 

1409 buf = io.BytesIO() 

1410 bucket.download_fileobj(key_name, buf) 

1411 return buf.getvalue()