Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/smart_open/s3.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

565 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8"""Implements file-like objects for reading and writing from/to AWS S3.""" 

9from __future__ import annotations 

10 

11import http 

12import io 

13import functools 

14import logging 

15import time 

16import warnings 

17 

18from typing import ( 

19 Callable, 

20 List, 

21 TYPE_CHECKING, 

22) 

23 

24try: 

25 import boto3 

26 import botocore.client 

27 import botocore.exceptions 

28 import urllib3.exceptions 

29except ImportError: 

30 MISSING_DEPS = True 

31 

32import smart_open.bytebuffer 

33import smart_open.concurrency 

34import smart_open.utils 

35 

36from smart_open import constants 

37 

38 

39if TYPE_CHECKING: 

40 from mypy_boto3_s3.client import S3Client 

41 from typing_extensions import Buffer 

42 

43logger = logging.getLogger(__name__) 

44 

45# 

46# AWS puts restrictions on the part size for multipart uploads. 

47# Each part must be more than 5MB, and less than 5GB. 

48# 

49# On top of that, our MultipartWriter has a min_part_size option. 

50# In retrospect, it's an unfortunate name, because it conflicts with the 

51# minimum allowable part size (5MB), but it's too late to change it, because 

52# people are using that parameter (unlike the MIN, DEFAULT, MAX constants). 

53# It really just means "part size": as soon as you have this many bytes, 

54# write a part to S3 (see the MultipartWriter.write method). 

55# 

56 

57MIN_PART_SIZE = 5 * 1024 ** 2 

58"""The absolute minimum permitted by Amazon.""" 

59 

60DEFAULT_PART_SIZE = 50 * 1024**2 

61"""The default part size for S3 multipart uploads, chosen carefully by smart_open""" 

62 

63MAX_PART_SIZE = 5 * 1024 ** 3 

64"""The absolute maximum permitted by Amazon.""" 

65 

66SCHEMES = ("s3", "s3n", 's3u', "s3a") 

67DEFAULT_PORT = 443 

68DEFAULT_HOST = 's3.amazonaws.com' 

69 

70DEFAULT_BUFFER_SIZE = 128 * 1024 

71 

72URI_EXAMPLES = ( 

73 's3://my_bucket/my_key', 

74 's3://my_key:my_secret@my_bucket/my_key', 

75 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key', 

76) 

77 

78# Returned by AWS when we try to seek beyond EOF. 

79_OUT_OF_RANGE = 'InvalidRange' 

80 

81 

82class Retry: 

83 def __init__(self): 

84 self.attempts: int = 6 

85 self.sleep_seconds: int = 10 

86 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError] 

87 self.client_error_codes: List[str] = ['NoSuchUpload'] 

88 

89 def _do(self, fn: Callable): 

90 for attempt in range(self.attempts): 

91 try: 

92 return fn() 

93 except tuple(self.exceptions) as err: 

94 logger.critical( 

95 'Caught non-fatal %s, retrying %d more times', 

96 err, 

97 self.attempts - attempt - 1, 

98 ) 

99 logger.exception(err) 

100 time.sleep(self.sleep_seconds) 

101 except botocore.exceptions.ClientError as err: 

102 error_code = err.response['Error'].get('Code') 

103 if error_code not in self.client_error_codes: 

104 raise 

105 logger.critical( 

106 'Caught non-fatal ClientError (%s), retrying %d more times', 

107 error_code, 

108 self.attempts - attempt - 1, 

109 ) 

110 logger.exception(err) 

111 time.sleep(self.sleep_seconds) 

112 else: 

113 logger.critical('encountered too many non-fatal errors, giving up') 

114 raise IOError('%s failed after %d attempts', fn.func, self.attempts) 

115 

116 

117# 

118# The retry mechanism for this submodule. Client code may modify it, e.g. by 

119# updating RETRY.sleep_seconds and friends. 

120# 

121if 'MISSING_DEPS' not in locals(): 

122 RETRY = Retry() 

123 

124 

125class _ClientWrapper: 

126 """Wraps a client to inject the appropriate keyword args into each method call. 

127 

128 The keyword args are a dictionary keyed by the fully qualified method name. 

129 For example, S3.Client.create_multipart_upload. 

130 

131 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client 

132 

133 This wrapper behaves identically to the client otherwise. 

134 """ 

135 def __init__(self, client, kwargs): 

136 self.client = client 

137 self.kwargs = kwargs 

138 

139 def __getattr__(self, method_name): 

140 method = getattr(self.client, method_name) 

141 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {}) 

142 return functools.partial(method, **kwargs) 

143 

144 

145def parse_uri(uri_as_string): 

146 # 

147 # Restrictions on bucket names and labels: 

148 # 

149 # - Bucket names must be at least 3 and no more than 63 characters long. 

150 # - Bucket names must be a series of one or more labels. 

151 # - Adjacent labels are separated by a single period (.). 

152 # - Bucket names can contain lowercase letters, numbers, and hyphens. 

153 # - Each label must start and end with a lowercase letter or a number. 

154 # 

155 # We use the above as a guide only, and do not perform any validation. We 

156 # let boto3 take care of that for us. 

157 # 

158 split_uri = smart_open.utils.safe_urlsplit(uri_as_string) 

159 assert split_uri.scheme in SCHEMES 

160 

161 port = DEFAULT_PORT 

162 host = DEFAULT_HOST 

163 ordinary_calling_format = False 

164 # 

165 # These defaults tell boto3 to look for credentials elsewhere 

166 # 

167 access_id, access_secret = None, None 

168 

169 # 

170 # Common URI template [secret:key@][host[:port]@]bucket/object 

171 # 

172 # The urlparse function doesn't handle the above schema, so we have to do 

173 # it ourselves. 

174 # 

175 uri = split_uri.netloc + split_uri.path 

176 

177 # 

178 # Attempt to extract edge-case authentication details from the URL. 

179 # 

180 # See: 

181 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/ 

182 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases 

183 # 

184 if '@' in uri: 

185 maybe_auth, rest = uri.split('@', 1) 

186 if ':' in maybe_auth: 

187 maybe_id, maybe_secret = maybe_auth.split(':', 1) 

188 if '/' not in maybe_id: 

189 access_id, access_secret = maybe_id, maybe_secret 

190 uri = rest 

191 

192 head, key_id = uri.split('/', 1) 

193 if '@' in head and ':' in head: 

194 ordinary_calling_format = True 

195 host_port, bucket_id = head.split('@') 

196 host, port = host_port.split(':', 1) 

197 port = int(port) 

198 elif '@' in head: 

199 ordinary_calling_format = True 

200 host, bucket_id = head.split('@') 

201 else: 

202 bucket_id = head 

203 

204 return dict( 

205 scheme=split_uri.scheme, 

206 bucket_id=bucket_id, 

207 key_id=key_id, 

208 port=port, 

209 host=host, 

210 ordinary_calling_format=ordinary_calling_format, 

211 access_id=access_id, 

212 access_secret=access_secret, 

213 ) 

214 

215 

216def _consolidate_params(uri, transport_params): 

217 """Consolidates the parsed Uri with the additional parameters. 

218 

219 This is necessary because the user can pass some of the parameters can in 

220 two different ways: 

221 

222 1) Via the URI itself 

223 2) Via the transport parameters 

224 

225 These are not mutually exclusive, but we have to pick one over the other 

226 in a sensible way in order to proceed. 

227 

228 """ 

229 transport_params = dict(transport_params) 

230 

231 def inject(**kwargs): 

232 try: 

233 client_kwargs = transport_params['client_kwargs'] 

234 except KeyError: 

235 client_kwargs = transport_params['client_kwargs'] = {} 

236 

237 try: 

238 init_kwargs = client_kwargs['S3.Client'] 

239 except KeyError: 

240 init_kwargs = client_kwargs['S3.Client'] = {} 

241 

242 init_kwargs.update(**kwargs) 

243 

244 client = transport_params.get('client') 

245 if client is not None and (uri['access_id'] or uri['access_secret']): 

246 logger.warning( 

247 'ignoring credentials parsed from URL because they conflict with ' 

248 'transport_params["client"]. Set transport_params["client"] to None ' 

249 'to suppress this warning.' 

250 ) 

251 uri.update(access_id=None, access_secret=None) 

252 elif (uri['access_id'] and uri['access_secret']): 

253 inject( 

254 aws_access_key_id=uri['access_id'], 

255 aws_secret_access_key=uri['access_secret'], 

256 ) 

257 uri.update(access_id=None, access_secret=None) 

258 

259 if client is not None and uri['host'] != DEFAULT_HOST: 

260 logger.warning( 

261 'ignoring endpoint_url parsed from URL because they conflict with ' 

262 'transport_params["client"]. Set transport_params["client"] to None ' 

263 'to suppress this warning.' 

264 ) 

265 uri.update(host=None) 

266 elif uri['host'] != DEFAULT_HOST: 

267 if uri['scheme'] == 's3u': 

268 scheme = 'http' 

269 else: 

270 scheme = 'https' 

271 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri) 

272 uri.update(host=None) 

273 

274 return uri, transport_params 

275 

276 

277def open_uri(uri, mode, transport_params): 

278 deprecated = ( 

279 'multipart_upload_kwargs', 

280 'object_kwargs', 

281 'resource', 

282 'resource_kwargs', 

283 'session', 

284 'singlepart_upload_kwargs', 

285 ) 

286 detected = [k for k in deprecated if k in transport_params] 

287 if detected: 

288 doc_url = ( 

289 'https://github.com/RaRe-Technologies/smart_open/blob/develop/' 

290 'MIGRATING_FROM_OLDER_VERSIONS.rst' 

291 ) 

292 # 

293 # We use warnings.warn /w UserWarning instead of logger.warn here because 

294 # 

295 # 1) Not everyone has logging enabled; and 

296 # 2) check_kwargs (below) already uses logger.warn with a similar message 

297 # 

298 # https://github.com/RaRe-Technologies/smart_open/issues/614 

299 # 

300 message = ( 

301 'ignoring the following deprecated transport parameters: %r. ' 

302 'See <%s> for details' % (detected, doc_url) 

303 ) 

304 warnings.warn(message, UserWarning) 

305 parsed_uri = parse_uri(uri) 

306 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params) 

307 kwargs = smart_open.utils.check_kwargs(open, transport_params) 

308 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs) 

309 

310 

311def open( 

312 bucket_id, 

313 key_id, 

314 mode, 

315 version_id=None, 

316 buffer_size=DEFAULT_BUFFER_SIZE, 

317 min_part_size=DEFAULT_PART_SIZE, 

318 multipart_upload=True, 

319 defer_seek=False, 

320 client=None, 

321 client_kwargs=None, 

322 writebuffer=None, 

323): 

324 """Open an S3 object for reading or writing. 

325 

326 Parameters 

327 ---------- 

328 bucket_id: str 

329 The name of the bucket this object resides in. 

330 key_id: str 

331 The name of the key within the bucket. 

332 mode: str 

333 The mode for opening the object. Must be either "rb" or "wb". 

334 buffer_size: int, optional 

335 The buffer size to use when performing I/O. 

336 min_part_size: int, optional 

337 The minimum part size for multipart uploads, in bytes. 

338 

339 When the writebuffer contains this many bytes, smart_open will upload 

340 the bytes to S3 as a single part of a multi-part upload, freeing the 

341 buffer either partially or entirely. When you close the writer, it 

342 will assemble the parts together. 

343 

344 The value determines the upper limit for the writebuffer. If buffer 

345 space is short (e.g. you are buffering to memory), then use a smaller 

346 value for min_part_size, or consider buffering to disk instead (see 

347 the writebuffer option). 

348 

349 The value must be between 5MB and 5GB. If you specify a value outside 

350 of this range, smart_open will adjust it for you, because otherwise the 

351 upload _will_ fail. 

352 

353 For writing only. Does not apply if you set multipart_upload=False. 

354 multipart_upload: bool, optional 

355 Default: `True` 

356 If set to `True`, will use multipart upload for writing to S3. If set 

357 to `False`, S3 upload will use the S3 Single-Part Upload API, which 

358 is more ideal for small file sizes. 

359 

360 For writing only. 

361 version_id: str, optional 

362 Version of the object, used when reading object. 

363 If None, will fetch the most recent version. 

364 defer_seek: boolean, optional 

365 Default: `False` 

366 If set to `True` on a file opened for reading, GetObject will not be 

367 called until the first seek() or read(). 

368 Avoids redundant API queries when seeking before reading. 

369 client: object, optional 

370 The S3 client to use when working with boto3. 

371 If you don't specify this, then smart_open will create a new client for you. 

372 client_kwargs: dict, optional 

373 Additional parameters to pass to the relevant functions of the client. 

374 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. 

375 The values are kwargs to pass to that method each time it is called. 

376 writebuffer: IO[bytes], optional 

377 By default, this module will buffer data in memory using io.BytesIO 

378 when writing. Pass another binary IO instance here to use it instead. 

379 For example, you may pass a file object to buffer to local disk instead 

380 of in RAM. Use this to keep RAM usage low at the expense of additional 

381 disk IO. If you pass in an open file, then you are responsible for 

382 cleaning it up after writing completes. 

383 """ 

384 logger.debug('%r', locals()) 

385 if mode not in constants.BINARY_MODES: 

386 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES)) 

387 

388 if (mode == constants.WRITE_BINARY) and (version_id is not None): 

389 raise ValueError("version_id must be None when writing") 

390 

391 if mode == constants.READ_BINARY: 

392 fileobj = Reader( 

393 bucket_id, 

394 key_id, 

395 version_id=version_id, 

396 buffer_size=buffer_size, 

397 defer_seek=defer_seek, 

398 client=client, 

399 client_kwargs=client_kwargs, 

400 ) 

401 elif mode == constants.WRITE_BINARY: 

402 if multipart_upload: 

403 fileobj = MultipartWriter( 

404 bucket_id, 

405 key_id, 

406 client=client, 

407 client_kwargs=client_kwargs, 

408 writebuffer=writebuffer, 

409 part_size=min_part_size, 

410 ) 

411 else: 

412 fileobj = SinglepartWriter( 

413 bucket_id, 

414 key_id, 

415 client=client, 

416 client_kwargs=client_kwargs, 

417 writebuffer=writebuffer, 

418 ) 

419 else: 

420 assert False, 'unexpected mode: %r' % mode 

421 

422 fileobj.name = key_id 

423 return fileobj 

424 

425 

426def _get(client, bucket, key, version, range_string): 

427 try: 

428 params = dict(Bucket=bucket, Key=key) 

429 if version: 

430 params["VersionId"] = version 

431 if range_string: 

432 params["Range"] = range_string 

433 

434 return client.get_object(**params) 

435 except botocore.client.ClientError as error: 

436 wrapped_error = IOError( 

437 'unable to access bucket: %r key: %r version: %r error: %s' % ( 

438 bucket, key, version, error 

439 ) 

440 ) 

441 wrapped_error.backend_error = error 

442 raise wrapped_error from error 

443 

444 

445def _unwrap_ioerror(ioe): 

446 """Given an IOError from _get, return the 'Error' dictionary from boto.""" 

447 try: 

448 return ioe.backend_error.response['Error'] 

449 except (AttributeError, KeyError): 

450 return None 

451 

452 

453class _SeekableRawReader(object): 

454 """Read an S3 object. 

455 

456 This class is internal to the S3 submodule. 

457 """ 

458 

459 def __init__( 

460 self, 

461 client, 

462 bucket, 

463 key, 

464 version_id=None, 

465 ): 

466 self._client = client 

467 self._bucket = bucket 

468 self._key = key 

469 self._version_id = version_id 

470 

471 self._content_length = None 

472 self._position = 0 

473 self._body = None 

474 

475 def seek(self, offset, whence=constants.WHENCE_START): 

476 """Seek to the specified position. 

477 

478 :param int offset: The offset in bytes. 

479 :param int whence: Where the offset is from. 

480 

481 :returns: the position after seeking. 

482 :rtype: int 

483 """ 

484 if whence not in constants.WHENCE_CHOICES: 

485 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES) 

486 

487 # 

488 # Close old body explicitly. 

489 # When first seek() after __init__(), self._body is not exist. 

490 # 

491 if self._body is not None: 

492 self._body.close() 

493 self._body = None 

494 

495 start = None 

496 stop = None 

497 if whence == constants.WHENCE_START: 

498 start = max(0, offset) 

499 elif whence == constants.WHENCE_CURRENT: 

500 start = max(0, offset + self._position) 

501 else: 

502 stop = max(0, -offset) 

503 

504 # 

505 # If we can figure out that we've read past the EOF, then we can save 

506 # an extra API call. 

507 # 

508 if self._content_length is None: 

509 reached_eof = False 

510 elif start is not None and start >= self._content_length: 

511 reached_eof = True 

512 elif stop == 0: 

513 reached_eof = True 

514 else: 

515 reached_eof = False 

516 

517 if reached_eof: 

518 self._body = io.BytesIO() 

519 self._position = self._content_length 

520 else: 

521 self._open_body(start, stop) 

522 

523 return self._position 

524 

525 def _open_body(self, start=None, stop=None): 

526 """Open a connection to download the specified range of bytes. Store 

527 the open file handle in self._body. 

528 

529 If no range is specified, start defaults to self._position. 

530 start and stop follow the semantics of the http range header, 

531 so a stop without a start will read bytes beginning at stop. 

532 

533 As a side effect, set self._content_length. Set self._position 

534 to self._content_length if start is past end of file. 

535 """ 

536 if start is None and stop is None: 

537 start = self._position 

538 range_string = smart_open.utils.make_range_string(start, stop) 

539 

540 try: 

541 # Optimistically try to fetch the requested content range. 

542 response = _get( 

543 self._client, 

544 self._bucket, 

545 self._key, 

546 self._version_id, 

547 range_string, 

548 ) 

549 except IOError as ioe: 

550 # Handle requested content range exceeding content size. 

551 error_response = _unwrap_ioerror(ioe) 

552 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE: 

553 raise 

554 try: 

555 self._position = self._content_length = int(error_response['ActualObjectSize']) 

556 self._body = io.BytesIO() 

557 except KeyError: 

558 response = _get( 

559 self._client, 

560 self._bucket, 

561 self._key, 

562 self._version_id, 

563 None, 

564 ) 

565 self._position = self._content_length = response["ContentLength"] 

566 self._body = response["Body"] 

567 else: 

568 # 

569 # Keep track of how many times boto3's built-in retry mechanism 

570 # activated. 

571 # 

572 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response 

573 # 

574 logger.debug( 

575 '%s: RetryAttempts: %d', 

576 self, 

577 response['ResponseMetadata']['RetryAttempts'], 

578 ) 

579 # 

580 # range request may not always return partial content, see: 

581 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses 

582 # 

583 status_code = response['ResponseMetadata']['HTTPStatusCode'] 

584 if status_code == http.HTTPStatus.PARTIAL_CONTENT: 

585 _, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange']) 

586 self._position = start 

587 elif status_code == http.HTTPStatus.OK: 

588 length = response["ContentLength"] 

589 self._content_length = length 

590 self._body = response['Body'] 

591 

592 def read(self, size=-1): 

593 """Read from the continuous connection with the remote peer.""" 

594 if self._body is None: 

595 # This is necessary for the very first read() after __init__(). 

596 self._open_body() 

597 if self._position >= self._content_length: 

598 return b'' 

599 

600 # 

601 # Boto3 has built-in error handling and retry mechanisms: 

602 # 

603 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html 

604 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

605 # 

606 # Unfortunately, it isn't always enough. There is still a non-zero 

607 # possibility that an exception will slip past these mechanisms and 

608 # terminate the read prematurely. Luckily, at this stage, it's very 

609 # simple to recover from the problem: wait a little bit, reopen the 

610 # HTTP connection and try again. Usually, a single retry attempt is 

611 # enough to recover, but we try multiple times "just in case". 

612 # 

613 for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1): 

614 try: 

615 if size == -1: 

616 binary = self._body.read() 

617 else: 

618 binary = self._body.read(size) 

619 except ( 

620 ConnectionResetError, 

621 botocore.exceptions.BotoCoreError, 

622 urllib3.exceptions.HTTPError, 

623 ) as err: 

624 logger.warning( 

625 '%s: caught %r while reading %d bytes, sleeping %ds before retry', 

626 self, 

627 err, 

628 size, 

629 seconds, 

630 ) 

631 time.sleep(seconds) 

632 self._open_body() 

633 else: 

634 self._position += len(binary) 

635 return binary 

636 

637 raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt)) 

638 

639 def __str__(self): 

640 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) 

641 

642 

643def _initialize_boto3(rw, client, client_kwargs, bucket, key): 

644 """Created the required objects for accessing S3. Ideally, they have 

645 been already created for us and we can just reuse them.""" 

646 if client_kwargs is None: 

647 client_kwargs = {} 

648 

649 if client is None: 

650 init_kwargs = client_kwargs.get('S3.Client', {}) 

651 client = boto3.client('s3', **init_kwargs) 

652 assert client 

653 

654 rw._client = _ClientWrapper(client, client_kwargs) 

655 rw._bucket = bucket 

656 rw._key = key 

657 

658 

659class Reader(io.BufferedIOBase): 

660 """Reads bytes from S3. 

661 

662 Implements the io.BufferedIOBase interface of the standard library.""" 

663 

664 def __init__( 

665 self, 

666 bucket, 

667 key, 

668 version_id=None, 

669 buffer_size=DEFAULT_BUFFER_SIZE, 

670 line_terminator=constants.BINARY_NEWLINE, 

671 defer_seek=False, 

672 client=None, 

673 client_kwargs=None, 

674 ): 

675 self._version_id = version_id 

676 self._buffer_size = buffer_size 

677 

678 _initialize_boto3(self, client, client_kwargs, bucket, key) 

679 

680 self._raw_reader = _SeekableRawReader( 

681 self._client, 

682 bucket, 

683 key, 

684 self._version_id, 

685 ) 

686 self._current_pos = 0 

687 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) 

688 self._eof = False 

689 self._line_terminator = line_terminator 

690 self._seek_initialized = False 

691 

692 # 

693 # This member is part of the io.BufferedIOBase interface. 

694 # 

695 self.raw = None 

696 

697 if not defer_seek: 

698 self.seek(0) 

699 

700 # 

701 # io.BufferedIOBase methods. 

702 # 

703 

704 def close(self): 

705 """Flush and close this stream.""" 

706 pass 

707 

708 def readable(self): 

709 """Return True if the stream can be read from.""" 

710 return True 

711 

712 def read(self, size=-1): 

713 """Read up to size bytes from the object and return them.""" 

714 if size == 0: 

715 return b'' 

716 elif size < 0: 

717 # call read() before setting _current_pos to make sure _content_length is set 

718 out = self._read_from_buffer() + self._raw_reader.read() 

719 self._current_pos = self._raw_reader._content_length 

720 return out 

721 

722 # 

723 # Return unused data first 

724 # 

725 if len(self._buffer) >= size: 

726 return self._read_from_buffer(size) 

727 

728 # 

729 # If the stream is finished, return what we have. 

730 # 

731 if self._eof: 

732 return self._read_from_buffer() 

733 

734 self._fill_buffer(size) 

735 return self._read_from_buffer(size) 

736 

737 def read1(self, size=-1): 

738 """This is the same as read().""" 

739 return self.read(size=size) 

740 

741 def readinto(self, b): 

742 """Read up to len(b) bytes into b, and return the number of bytes 

743 read.""" 

744 data = self.read(len(b)) 

745 if not data: 

746 return 0 

747 b[:len(data)] = data 

748 return len(data) 

749 

750 def readline(self, limit=-1): 

751 """Read up to and including the next newline. Returns the bytes read.""" 

752 if limit != -1: 

753 raise NotImplementedError('limits other than -1 not implemented yet') 

754 

755 # 

756 # A single line may span multiple buffers. 

757 # 

758 line = io.BytesIO() 

759 while not (self._eof and len(self._buffer) == 0): 

760 line_part = self._buffer.readline(self._line_terminator) 

761 line.write(line_part) 

762 self._current_pos += len(line_part) 

763 

764 if line_part.endswith(self._line_terminator): 

765 break 

766 else: 

767 self._fill_buffer() 

768 

769 return line.getvalue() 

770 

771 def seekable(self): 

772 """If False, seek(), tell() and truncate() will raise IOError. 

773 

774 We offer only seek support, and no truncate support.""" 

775 return True 

776 

777 def seek(self, offset, whence=constants.WHENCE_START): 

778 """Seek to the specified position. 

779 

780 :param int offset: The offset in bytes. 

781 :param int whence: Where the offset is from. 

782 

783 Returns the position after seeking.""" 

784 # Convert relative offset to absolute, since self._raw_reader 

785 # doesn't know our current position. 

786 if whence == constants.WHENCE_CURRENT: 

787 whence = constants.WHENCE_START 

788 offset += self._current_pos 

789 

790 if not self._seek_initialized or not ( 

791 whence == constants.WHENCE_START and offset == self._current_pos 

792 ): 

793 self._current_pos = self._raw_reader.seek(offset, whence) 

794 

795 self._buffer.empty() 

796 

797 self._eof = self._current_pos == self._raw_reader._content_length 

798 

799 self._seek_initialized = True 

800 return self._current_pos 

801 

802 def tell(self): 

803 """Return the current position within the file.""" 

804 return self._current_pos 

805 

806 def truncate(self, size=None): 

807 """Unsupported.""" 

808 raise io.UnsupportedOperation 

809 

810 def detach(self): 

811 """Unsupported.""" 

812 raise io.UnsupportedOperation 

813 

814 def terminate(self): 

815 """Do nothing.""" 

816 pass 

817 

818 def to_boto3(self, resource): 

819 """Create an **independent** `boto3.s3.Object` instance that points to 

820 the same S3 object as this instance. 

821 Changes to the returned object will not affect the current instance. 

822 """ 

823 assert resource, 'resource must be a boto3.resource instance' 

824 obj = resource.Object(self._bucket, self._key) 

825 if self._version_id is not None: 

826 return obj.Version(self._version_id) 

827 else: 

828 return obj 

829 

830 # 

831 # Internal methods. 

832 # 

833 def _read_from_buffer(self, size=-1): 

834 """Remove at most size bytes from our buffer and return them.""" 

835 size = size if size >= 0 else len(self._buffer) 

836 part = self._buffer.read(size) 

837 self._current_pos += len(part) 

838 return part 

839 

840 def _fill_buffer(self, size=-1): 

841 size = max(size, self._buffer._chunk_size) 

842 while len(self._buffer) < size and not self._eof: 

843 bytes_read = self._buffer.fill(self._raw_reader) 

844 if bytes_read == 0: 

845 logger.debug('%s: reached EOF while filling buffer', self) 

846 self._eof = True 

847 

848 def __str__(self): 

849 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key) 

850 

851 def __repr__(self): 

852 return ( 

853 "smart_open.s3.Reader(" 

854 "bucket=%r, " 

855 "key=%r, " 

856 "version_id=%r, " 

857 "buffer_size=%r, " 

858 "line_terminator=%r)" 

859 ) % ( 

860 self._bucket, 

861 self._key, 

862 self._version_id, 

863 self._buffer_size, 

864 self._line_terminator, 

865 ) 

866 

867 

868class MultipartWriter(io.BufferedIOBase): 

869 """Writes bytes to S3 using the multi part API. 

870 

871 Implements the io.BufferedIOBase interface of the standard library.""" 

872 

873 def __init__( 

874 self, 

875 bucket, 

876 key, 

877 part_size=DEFAULT_PART_SIZE, 

878 client=None, 

879 client_kwargs=None, 

880 writebuffer: io.BytesIO | None = None, 

881 ): 

882 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE) 

883 if part_size != adjusted_ps: 

884 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}") 

885 part_size = adjusted_ps 

886 self._part_size = part_size 

887 

888 _initialize_boto3(self, client, client_kwargs, bucket, key) 

889 self._client: S3Client 

890 self._bucket: str 

891 self._key: str 

892 

893 try: 

894 partial = functools.partial( 

895 self._client.create_multipart_upload, 

896 Bucket=bucket, 

897 Key=key, 

898 ) 

899 self._upload_id = RETRY._do(partial)['UploadId'] 

900 except botocore.client.ClientError as error: 

901 raise ValueError( 

902 'the bucket %r does not exist, or is forbidden for access (%r)' % ( 

903 bucket, error 

904 ) 

905 ) from error 

906 

907 if writebuffer is None: 

908 self._buf = io.BytesIO() 

909 else: 

910 self._buf = writebuffer 

911 

912 self._total_bytes = 0 

913 self._total_parts = 0 

914 self._parts: list[dict[str, object]] = [] 

915 

916 # 

917 # This member is part of the io.BufferedIOBase interface. 

918 # 

919 self.raw = None # type: ignore[assignment] 

920 

921 def flush(self): 

922 pass 

923 

924 # 

925 # Override some methods from io.IOBase. 

926 # 

927 def close(self): 

928 if self._buf.tell(): 

929 self._upload_next_part() 

930 

931 logger.debug('%s: completing multipart upload', self) 

932 if self._total_bytes and self._upload_id: 

933 partial = functools.partial( 

934 self._client.complete_multipart_upload, 

935 Bucket=self._bucket, 

936 Key=self._key, 

937 UploadId=self._upload_id, 

938 MultipartUpload={'Parts': self._parts}, 

939 ) 

940 RETRY._do(partial) 

941 logger.debug('%s: completed multipart upload', self) 

942 elif self._upload_id: 

943 # 

944 # AWS complains with "The XML you provided was not well-formed or 

945 # did not validate against our published schema" when the input is 

946 # completely empty => abort the upload, no file created. 

947 # 

948 # We work around this by creating an empty file explicitly. 

949 # 

950 self._client.abort_multipart_upload( 

951 Bucket=self._bucket, 

952 Key=self._key, 

953 UploadId=self._upload_id, 

954 ) 

955 self._client.put_object( 

956 Bucket=self._bucket, 

957 Key=self._key, 

958 Body=b'', 

959 ) 

960 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) 

961 self._upload_id = None 

962 

963 @property 

964 def closed(self): 

965 return self._upload_id is None 

966 

967 def writable(self): 

968 """Return True if the stream supports writing.""" 

969 return True 

970 

971 def seekable(self): 

972 """If False, seek(), tell() and truncate() will raise IOError. 

973 

974 We offer only tell support, and no seek or truncate support.""" 

975 return True 

976 

977 def seek(self, offset, whence=constants.WHENCE_START): 

978 """Unsupported.""" 

979 raise io.UnsupportedOperation 

980 

981 def truncate(self, size=None): 

982 """Unsupported.""" 

983 raise io.UnsupportedOperation 

984 

985 def tell(self): 

986 """Return the current stream position.""" 

987 return self._total_bytes 

988 

989 # 

990 # io.BufferedIOBase methods. 

991 # 

992 def detach(self): 

993 raise io.UnsupportedOperation("detach() not supported") 

994 

995 def write(self, b: Buffer) -> int: 

996 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

997 interface implementation) to the S3 file. 

998 

999 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html 

1000 

1001 There's buffering happening under the covers, so this may not actually 

1002 do any HTTP transfer right away.""" 

1003 offset = 0 

1004 mv = memoryview(b) 

1005 self._total_bytes += len(mv) 

1006 

1007 # 

1008 # botocore does not accept memoryview, otherwise we could've gotten 

1009 # away with not needing to write a copy to the buffer aside from cases 

1010 # where b is smaller than part_size 

1011 # 

1012 while offset < len(mv): 

1013 start = offset 

1014 end = offset + self._part_size - self._buf.tell() 

1015 self._buf.write(mv[start:end]) 

1016 if self._buf.tell() < self._part_size: 

1017 # 

1018 # Not enough data to write a new part just yet. The assert 

1019 # ensures that we've consumed all of the input buffer. 

1020 # 

1021 assert end >= len(mv) 

1022 return len(mv) 

1023 

1024 self._upload_next_part() 

1025 offset = end 

1026 return len(mv) 

1027 

1028 def terminate(self): 

1029 """Cancel the underlying multipart upload.""" 

1030 if self._upload_id is None: 

1031 return 

1032 logger.debug('%s: terminating multipart upload', self) 

1033 self._client.abort_multipart_upload( 

1034 Bucket=self._bucket, 

1035 Key=self._key, 

1036 UploadId=self._upload_id, 

1037 ) 

1038 self._upload_id = None 

1039 logger.debug('%s: terminated multipart upload', self) 

1040 

1041 def to_boto3(self, resource): 

1042 """Create an **independent** `boto3.s3.Object` instance that points to 

1043 the same S3 object as this instance. 

1044 Changes to the returned object will not affect the current instance. 

1045 """ 

1046 assert resource, 'resource must be a boto3.resource instance' 

1047 return resource.Object(self._bucket, self._key) 

1048 

1049 # 

1050 # Internal methods. 

1051 # 

1052 def _upload_next_part(self) -> None: 

1053 part_num = self._total_parts + 1 

1054 logger.info( 

1055 "%s: uploading part_num: %i, %i bytes (total %.3fGB)", 

1056 self, 

1057 part_num, 

1058 self._buf.tell(), 

1059 self._total_bytes / 1024.0 ** 3, 

1060 ) 

1061 self._buf.seek(0) 

1062 

1063 # 

1064 # Network problems in the middle of an upload are particularly 

1065 # troublesome. We don't want to abort the entire upload just because 

1066 # of a temporary connection problem, so this part needs to be 

1067 # especially robust. 

1068 # 

1069 upload = RETRY._do( 

1070 functools.partial( 

1071 self._client.upload_part, 

1072 Bucket=self._bucket, 

1073 Key=self._key, 

1074 UploadId=self._upload_id, 

1075 PartNumber=part_num, 

1076 Body=self._buf, 

1077 ) 

1078 ) 

1079 

1080 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) 

1081 logger.debug("%s: upload of part_num #%i finished", self, part_num) 

1082 

1083 self._total_parts += 1 

1084 

1085 self._buf.seek(0) 

1086 self._buf.truncate(0) 

1087 

1088 def __enter__(self): 

1089 return self 

1090 

1091 def __exit__(self, exc_type, exc_val, exc_tb): 

1092 if exc_type is not None: 

1093 self.terminate() 

1094 else: 

1095 self.close() 

1096 

1097 def __str__(self): 

1098 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key) 

1099 

1100 def __repr__(self): 

1101 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % ( 

1102 self._bucket, 

1103 self._key, 

1104 self._part_size, 

1105 ) 

1106 

1107 

1108class SinglepartWriter(io.BufferedIOBase): 

1109 """Writes bytes to S3 using the single part API. 

1110 

1111 Implements the io.BufferedIOBase interface of the standard library. 

1112 

1113 This class buffers all of its input in memory until its `close` method is called. Only then will 

1114 the data be written to S3 and the buffer is released.""" 

1115 

1116 def __init__( 

1117 self, 

1118 bucket, 

1119 key, 

1120 client=None, 

1121 client_kwargs=None, 

1122 writebuffer=None, 

1123 ): 

1124 _initialize_boto3(self, client, client_kwargs, bucket, key) 

1125 

1126 try: 

1127 self._client.head_bucket(Bucket=bucket) 

1128 except botocore.client.ClientError as e: 

1129 raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e 

1130 

1131 if writebuffer is None: 

1132 self._buf = io.BytesIO() 

1133 else: 

1134 self._buf = writebuffer 

1135 

1136 self._total_bytes = 0 

1137 

1138 # 

1139 # This member is part of the io.BufferedIOBase interface. 

1140 # 

1141 self.raw = None 

1142 

1143 def flush(self): 

1144 pass 

1145 

1146 # 

1147 # Override some methods from io.IOBase. 

1148 # 

1149 def close(self): 

1150 if self._buf is None: 

1151 return 

1152 

1153 self._buf.seek(0) 

1154 

1155 try: 

1156 self._client.put_object( 

1157 Bucket=self._bucket, 

1158 Key=self._key, 

1159 Body=self._buf, 

1160 ) 

1161 except botocore.client.ClientError as e: 

1162 raise ValueError( 

1163 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e 

1164 

1165 logger.debug("%s: direct upload finished", self) 

1166 self._buf = None 

1167 

1168 @property 

1169 def closed(self): 

1170 return self._buf is None 

1171 

1172 def writable(self): 

1173 """Return True if the stream supports writing.""" 

1174 return True 

1175 

1176 def seekable(self): 

1177 """If False, seek(), tell() and truncate() will raise IOError. 

1178 

1179 We offer only tell support, and no seek or truncate support.""" 

1180 return True 

1181 

1182 def seek(self, offset, whence=constants.WHENCE_START): 

1183 """Unsupported.""" 

1184 raise io.UnsupportedOperation 

1185 

1186 def truncate(self, size=None): 

1187 """Unsupported.""" 

1188 raise io.UnsupportedOperation 

1189 

1190 def tell(self): 

1191 """Return the current stream position.""" 

1192 return self._total_bytes 

1193 

1194 # 

1195 # io.BufferedIOBase methods. 

1196 # 

1197 def detach(self): 

1198 raise io.UnsupportedOperation("detach() not supported") 

1199 

1200 def write(self, b): 

1201 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1202 interface implementation) into the buffer. Content of the buffer will be 

1203 written to S3 on close as a single-part upload. 

1204 

1205 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html""" 

1206 

1207 length = self._buf.write(b) 

1208 self._total_bytes += length 

1209 return length 

1210 

1211 def terminate(self): 

1212 """Nothing to cancel in single-part uploads.""" 

1213 return 

1214 

1215 # 

1216 # Internal methods. 

1217 # 

1218 def __enter__(self): 

1219 return self 

1220 

1221 def __exit__(self, exc_type, exc_val, exc_tb): 

1222 if exc_type is not None: 

1223 self.terminate() 

1224 else: 

1225 self.close() 

1226 

1227 def __str__(self): 

1228 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key) 

1229 

1230 def __repr__(self): 

1231 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) 

1232 

1233 

1234def _accept_all(key): 

1235 return True 

1236 

1237 

1238def iter_bucket( 

1239 bucket_name, 

1240 prefix='', 

1241 accept_key=None, 

1242 key_limit=None, 

1243 workers=16, 

1244 retries=3, 

1245 **session_kwargs): 

1246 """ 

1247 Iterate and download all S3 objects under `s3://bucket_name/prefix`. 

1248 

1249 Parameters 

1250 ---------- 

1251 bucket_name: str 

1252 The name of the bucket. 

1253 prefix: str, optional 

1254 Limits the iteration to keys starting with the prefix. 

1255 accept_key: callable, optional 

1256 This is a function that accepts a key name (unicode string) and 

1257 returns True/False, signalling whether the given key should be downloaded. 

1258 The default behavior is to accept all keys. 

1259 key_limit: int, optional 

1260 If specified, the iterator will stop after yielding this many results. 

1261 workers: int, optional 

1262 The number of subprocesses to use. 

1263 retries: int, optional 

1264 The number of time to retry a failed download. 

1265 session_kwargs: dict, optional 

1266 Keyword arguments to pass when creating a new session. 

1267 For a list of available names and values, see: 

1268 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session 

1269 

1270 

1271 Yields 

1272 ------ 

1273 str 

1274 The full key name (does not include the bucket name). 

1275 bytes 

1276 The full contents of the key. 

1277 

1278 Notes 

1279 ----- 

1280 The keys are processed in parallel, using `workers` processes (default: 16), 

1281 to speed up downloads greatly. If multiprocessing is not available, thus 

1282 _MULTIPROCESSING is False, this parameter will be ignored. 

1283 

1284 Examples 

1285 -------- 

1286 

1287 >>> # get all JSON files under "mybucket/foo/" 

1288 >>> for key, content in iter_bucket( 

1289 ... bucket_name, prefix='foo/', 

1290 ... accept_key=lambda key: key.endswith('.json')): 

1291 ... print key, len(content) 

1292 

1293 >>> # limit to 10k files, using 32 parallel workers (default is 16) 

1294 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32): 

1295 ... print key, len(content) 

1296 """ 

1297 if accept_key is None: 

1298 accept_key = _accept_all 

1299 

1300 # 

1301 # If people insist on giving us bucket instances, silently extract the name 

1302 # before moving on. Works for boto3 as well as boto. 

1303 # 

1304 try: 

1305 bucket_name = bucket_name.name 

1306 except AttributeError: 

1307 pass 

1308 

1309 total_size, key_no = 0, -1 

1310 key_iterator = _list_bucket( 

1311 bucket_name, 

1312 prefix=prefix, 

1313 accept_key=accept_key, 

1314 **session_kwargs) 

1315 download_key = functools.partial( 

1316 _download_key, 

1317 bucket_name=bucket_name, 

1318 retries=retries, 

1319 **session_kwargs) 

1320 

1321 with smart_open.concurrency.create_pool(processes=workers) as pool: 

1322 result_iterator = pool.imap_unordered(download_key, key_iterator) 

1323 key_no = 0 

1324 while True: 

1325 try: 

1326 (key, content) = result_iterator.__next__() 

1327 if key_no % 1000 == 0: 

1328 logger.info( 

1329 "yielding key #%i: %s, size %i (total %.1fMB)", 

1330 key_no, key, len(content), total_size / 1024.0 ** 2 

1331 ) 

1332 yield key, content 

1333 total_size += len(content) 

1334 if key_limit is not None and key_no + 1 >= key_limit: 

1335 # we were asked to output only a limited number of keys => we're done 

1336 break 

1337 except botocore.exceptions.ClientError as err: 

1338 # 

1339 # ignore 404 not found errors: they mean the object was deleted 

1340 # after we listed the contents of the bucket, but before we 

1341 # downloaded the object. 

1342 # 

1343 if not ('Error' in err.response and err.response['Error'].get('Code') == '404'): 

1344 raise err 

1345 except StopIteration: 

1346 break 

1347 key_no += 1 

1348 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size)) 

1349 

1350 

1351def _list_bucket( 

1352 bucket_name, 

1353 prefix='', 

1354 accept_key=lambda k: True, 

1355 **session_kwargs): 

1356 session = boto3.session.Session(**session_kwargs) 

1357 client = session.client('s3') 

1358 ctoken = None 

1359 

1360 while True: 

1361 # list_objects_v2 doesn't like a None value for ContinuationToken 

1362 # so we don't set it if we don't have one. 

1363 if ctoken: 

1364 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken) 

1365 else: 

1366 kwargs = dict(Bucket=bucket_name, Prefix=prefix) 

1367 response = client.list_objects_v2(**kwargs) 

1368 try: 

1369 content = response['Contents'] 

1370 except KeyError: 

1371 pass 

1372 else: 

1373 for c in content: 

1374 key = c['Key'] 

1375 if accept_key(key): 

1376 yield key 

1377 ctoken = response.get('NextContinuationToken', None) 

1378 if not ctoken: 

1379 break 

1380 

1381 

1382def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs): 

1383 if bucket_name is None: 

1384 raise ValueError('bucket_name may not be None') 

1385 

1386 # 

1387 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-or-multiprocessing-with-resources 

1388 # 

1389 session = boto3.session.Session(**session_kwargs) 

1390 s3 = session.resource('s3') 

1391 bucket = s3.Bucket(bucket_name) 

1392 

1393 # Sometimes, https://github.com/boto/boto/issues/2409 can happen 

1394 # because of network issues on either side. 

1395 # Retry up to 3 times to ensure its not a transient issue. 

1396 for x in range(retries + 1): 

1397 try: 

1398 content_bytes = _download_fileobj(bucket, key_name) 

1399 except botocore.client.ClientError: 

1400 # Actually fail on last pass through the loop 

1401 if x == retries: 

1402 raise 

1403 # Otherwise, try again, as this might be a transient timeout 

1404 pass 

1405 else: 

1406 return key_name, content_bytes 

1407 

1408 

1409def _download_fileobj(bucket, key_name): 

1410 # 

1411 # This is a separate function only because it makes it easier to inject 

1412 # exceptions during tests. 

1413 # 

1414 buf = io.BytesIO() 

1415 bucket.download_fileobj(key_name, buf) 

1416 return buf.getvalue()