Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/smart_open/s3.py: 20%

508 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:57 +0000

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8"""Implements file-like objects for reading and writing from/to AWS S3.""" 

9 

10import io 

11import functools 

12import logging 

13import time 

14import warnings 

15 

16try: 

17 import boto3 

18 import botocore.client 

19 import botocore.exceptions 

20 import urllib3.exceptions 

21except ImportError: 

22 MISSING_DEPS = True 

23 

24import smart_open.bytebuffer 

25import smart_open.concurrency 

26import smart_open.utils 

27 

28from smart_open import constants 

29 

30logger = logging.getLogger(__name__) 

31 

32DEFAULT_MIN_PART_SIZE = 50 * 1024**2 

33"""Default minimum part size for S3 multipart uploads""" 

34MIN_MIN_PART_SIZE = 5 * 1024 ** 2 

35"""The absolute minimum permitted by Amazon.""" 

36 

37SCHEMES = ("s3", "s3n", 's3u', "s3a") 

38DEFAULT_PORT = 443 

39DEFAULT_HOST = 's3.amazonaws.com' 

40 

41DEFAULT_BUFFER_SIZE = 128 * 1024 

42 

43URI_EXAMPLES = ( 

44 's3://my_bucket/my_key', 

45 's3://my_key:my_secret@my_bucket/my_key', 

46 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key', 

47) 

48 

49_UPLOAD_ATTEMPTS = 6 

50_SLEEP_SECONDS = 10 

51 

52# Returned by AWS when we try to seek beyond EOF. 

53_OUT_OF_RANGE = 'InvalidRange' 

54 

55 

56class _ClientWrapper: 

57 """Wraps a client to inject the appropriate keyword args into each method call. 

58 

59 The keyword args are a dictionary keyed by the fully qualified method name. 

60 For example, S3.Client.create_multipart_upload. 

61 

62 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client 

63 

64 This wrapper behaves identically to the client otherwise. 

65 """ 

66 def __init__(self, client, kwargs): 

67 self.client = client 

68 self.kwargs = kwargs 

69 

70 def __getattr__(self, method_name): 

71 method = getattr(self.client, method_name) 

72 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {}) 

73 return functools.partial(method, **kwargs) 

74 

75 

76def parse_uri(uri_as_string): 

77 # 

78 # Restrictions on bucket names and labels: 

79 # 

80 # - Bucket names must be at least 3 and no more than 63 characters long. 

81 # - Bucket names must be a series of one or more labels. 

82 # - Adjacent labels are separated by a single period (.). 

83 # - Bucket names can contain lowercase letters, numbers, and hyphens. 

84 # - Each label must start and end with a lowercase letter or a number. 

85 # 

86 # We use the above as a guide only, and do not perform any validation. We 

87 # let boto3 take care of that for us. 

88 # 

89 split_uri = smart_open.utils.safe_urlsplit(uri_as_string) 

90 assert split_uri.scheme in SCHEMES 

91 

92 port = DEFAULT_PORT 

93 host = DEFAULT_HOST 

94 ordinary_calling_format = False 

95 # 

96 # These defaults tell boto3 to look for credentials elsewhere 

97 # 

98 access_id, access_secret = None, None 

99 

100 # 

101 # Common URI template [secret:key@][host[:port]@]bucket/object 

102 # 

103 # The urlparse function doesn't handle the above schema, so we have to do 

104 # it ourselves. 

105 # 

106 uri = split_uri.netloc + split_uri.path 

107 

108 if '@' in uri and ':' in uri.split('@')[0]: 

109 auth, uri = uri.split('@', 1) 

110 access_id, access_secret = auth.split(':') 

111 

112 head, key_id = uri.split('/', 1) 

113 if '@' in head and ':' in head: 

114 ordinary_calling_format = True 

115 host_port, bucket_id = head.split('@') 

116 host, port = host_port.split(':', 1) 

117 port = int(port) 

118 elif '@' in head: 

119 ordinary_calling_format = True 

120 host, bucket_id = head.split('@') 

121 else: 

122 bucket_id = head 

123 

124 return dict( 

125 scheme=split_uri.scheme, 

126 bucket_id=bucket_id, 

127 key_id=key_id, 

128 port=port, 

129 host=host, 

130 ordinary_calling_format=ordinary_calling_format, 

131 access_id=access_id, 

132 access_secret=access_secret, 

133 ) 

134 

135 

136def _consolidate_params(uri, transport_params): 

137 """Consolidates the parsed Uri with the additional parameters. 

138 

139 This is necessary because the user can pass some of the parameters can in 

140 two different ways: 

141 

142 1) Via the URI itself 

143 2) Via the transport parameters 

144 

145 These are not mutually exclusive, but we have to pick one over the other 

146 in a sensible way in order to proceed. 

147 

148 """ 

149 transport_params = dict(transport_params) 

150 

151 def inject(**kwargs): 

152 try: 

153 client_kwargs = transport_params['client_kwargs'] 

154 except KeyError: 

155 client_kwargs = transport_params['client_kwargs'] = {} 

156 

157 try: 

158 init_kwargs = client_kwargs['S3.Client'] 

159 except KeyError: 

160 init_kwargs = client_kwargs['S3.Client'] = {} 

161 

162 init_kwargs.update(**kwargs) 

163 

164 client = transport_params.get('client') 

165 if client is not None and (uri['access_id'] or uri['access_secret']): 

166 logger.warning( 

167 'ignoring credentials parsed from URL because they conflict with ' 

168 'transport_params["client"]. Set transport_params["client"] to None ' 

169 'to suppress this warning.' 

170 ) 

171 uri.update(access_id=None, access_secret=None) 

172 elif (uri['access_id'] and uri['access_secret']): 

173 inject( 

174 aws_access_key_id=uri['access_id'], 

175 aws_secret_access_key=uri['access_secret'], 

176 ) 

177 uri.update(access_id=None, access_secret=None) 

178 

179 if client is not None and uri['host'] != DEFAULT_HOST: 

180 logger.warning( 

181 'ignoring endpoint_url parsed from URL because they conflict with ' 

182 'transport_params["client"]. Set transport_params["client"] to None ' 

183 'to suppress this warning.' 

184 ) 

185 uri.update(host=None) 

186 elif uri['host'] != DEFAULT_HOST: 

187 inject(endpoint_url='https://%(host)s:%(port)d' % uri) 

188 uri.update(host=None) 

189 

190 return uri, transport_params 

191 

192 

193def open_uri(uri, mode, transport_params): 

194 deprecated = ( 

195 'multipart_upload_kwargs', 

196 'object_kwargs', 

197 'resource', 

198 'resource_kwargs', 

199 'session', 

200 'singlepart_upload_kwargs', 

201 ) 

202 detected = [k for k in deprecated if k in transport_params] 

203 if detected: 

204 doc_url = ( 

205 'https://github.com/RaRe-Technologies/smart_open/blob/develop/' 

206 'MIGRATING_FROM_OLDER_VERSIONS.rst' 

207 ) 

208 # 

209 # We use warnings.warn /w UserWarning instead of logger.warn here because 

210 # 

211 # 1) Not everyone has logging enabled; and 

212 # 2) check_kwargs (below) already uses logger.warn with a similar message 

213 # 

214 # https://github.com/RaRe-Technologies/smart_open/issues/614 

215 # 

216 message = ( 

217 'ignoring the following deprecated transport parameters: %r. ' 

218 'See <%s> for details' % (detected, doc_url) 

219 ) 

220 warnings.warn(message, UserWarning) 

221 parsed_uri = parse_uri(uri) 

222 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params) 

223 kwargs = smart_open.utils.check_kwargs(open, transport_params) 

224 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs) 

225 

226 

227def open( 

228 bucket_id, 

229 key_id, 

230 mode, 

231 version_id=None, 

232 buffer_size=DEFAULT_BUFFER_SIZE, 

233 min_part_size=DEFAULT_MIN_PART_SIZE, 

234 multipart_upload=True, 

235 defer_seek=False, 

236 client=None, 

237 client_kwargs=None, 

238 writebuffer=None, 

239): 

240 """Open an S3 object for reading or writing. 

241 

242 Parameters 

243 ---------- 

244 bucket_id: str 

245 The name of the bucket this object resides in. 

246 key_id: str 

247 The name of the key within the bucket. 

248 mode: str 

249 The mode for opening the object. Must be either "rb" or "wb". 

250 buffer_size: int, optional 

251 The buffer size to use when performing I/O. 

252 min_part_size: int, optional 

253 The minimum part size for multipart uploads. For writing only. 

254 multipart_upload: bool, optional 

255 Default: `True` 

256 If set to `True`, will use multipart upload for writing to S3. If set 

257 to `False`, S3 upload will use the S3 Single-Part Upload API, which 

258 is more ideal for small file sizes. 

259 For writing only. 

260 version_id: str, optional 

261 Version of the object, used when reading object. 

262 If None, will fetch the most recent version. 

263 defer_seek: boolean, optional 

264 Default: `False` 

265 If set to `True` on a file opened for reading, GetObject will not be 

266 called until the first seek() or read(). 

267 Avoids redundant API queries when seeking before reading. 

268 client: object, optional 

269 The S3 client to use when working with boto3. 

270 If you don't specify this, then smart_open will create a new client for you. 

271 client_kwargs: dict, optional 

272 Additional parameters to pass to the relevant functions of the client. 

273 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. 

274 The values are kwargs to pass to that method each time it is called. 

275 writebuffer: IO[bytes], optional 

276 By default, this module will buffer data in memory using io.BytesIO 

277 when writing. Pass another binary IO instance here to use it instead. 

278 For example, you may pass a file object to buffer to local disk instead 

279 of in RAM. Use this to keep RAM usage low at the expense of additional 

280 disk IO. If you pass in an open file, then you are responsible for 

281 cleaning it up after writing completes. 

282 """ 

283 logger.debug('%r', locals()) 

284 if mode not in constants.BINARY_MODES: 

285 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES)) 

286 

287 if (mode == constants.WRITE_BINARY) and (version_id is not None): 

288 raise ValueError("version_id must be None when writing") 

289 

290 if mode == constants.READ_BINARY: 

291 fileobj = Reader( 

292 bucket_id, 

293 key_id, 

294 version_id=version_id, 

295 buffer_size=buffer_size, 

296 defer_seek=defer_seek, 

297 client=client, 

298 client_kwargs=client_kwargs, 

299 ) 

300 elif mode == constants.WRITE_BINARY: 

301 if multipart_upload: 

302 fileobj = MultipartWriter( 

303 bucket_id, 

304 key_id, 

305 min_part_size=min_part_size, 

306 client=client, 

307 client_kwargs=client_kwargs, 

308 writebuffer=writebuffer, 

309 ) 

310 else: 

311 fileobj = SinglepartWriter( 

312 bucket_id, 

313 key_id, 

314 client=client, 

315 client_kwargs=client_kwargs, 

316 writebuffer=writebuffer, 

317 ) 

318 else: 

319 assert False, 'unexpected mode: %r' % mode 

320 

321 fileobj.name = key_id 

322 return fileobj 

323 

324 

325def _get(client, bucket, key, version, range_string): 

326 try: 

327 if version: 

328 return client.get_object(Bucket=bucket, Key=key, VersionId=version, Range=range_string) 

329 else: 

330 return client.get_object(Bucket=bucket, Key=key, Range=range_string) 

331 except botocore.client.ClientError as error: 

332 wrapped_error = IOError( 

333 'unable to access bucket: %r key: %r version: %r error: %s' % ( 

334 bucket, key, version, error 

335 ) 

336 ) 

337 wrapped_error.backend_error = error 

338 raise wrapped_error from error 

339 

340 

341def _unwrap_ioerror(ioe): 

342 """Given an IOError from _get, return the 'Error' dictionary from boto.""" 

343 try: 

344 return ioe.backend_error.response['Error'] 

345 except (AttributeError, KeyError): 

346 return None 

347 

348 

349class _SeekableRawReader(object): 

350 """Read an S3 object. 

351 

352 This class is internal to the S3 submodule. 

353 """ 

354 

355 def __init__( 

356 self, 

357 client, 

358 bucket, 

359 key, 

360 version_id=None, 

361 ): 

362 self._client = client 

363 self._bucket = bucket 

364 self._key = key 

365 self._version_id = version_id 

366 

367 self._content_length = None 

368 self._position = 0 

369 self._body = None 

370 

371 def seek(self, offset, whence=constants.WHENCE_START): 

372 """Seek to the specified position. 

373 

374 :param int offset: The offset in bytes. 

375 :param int whence: Where the offset is from. 

376 

377 :returns: the position after seeking. 

378 :rtype: int 

379 """ 

380 if whence not in constants.WHENCE_CHOICES: 

381 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES) 

382 

383 # 

384 # Close old body explicitly. 

385 # When first seek() after __init__(), self._body is not exist. 

386 # 

387 if self._body is not None: 

388 self._body.close() 

389 self._body = None 

390 

391 start = None 

392 stop = None 

393 if whence == constants.WHENCE_START: 

394 start = max(0, offset) 

395 elif whence == constants.WHENCE_CURRENT: 

396 start = max(0, offset + self._position) 

397 else: 

398 stop = max(0, -offset) 

399 

400 # 

401 # If we can figure out that we've read past the EOF, then we can save 

402 # an extra API call. 

403 # 

404 if self._content_length is None: 

405 reached_eof = False 

406 elif start is not None and start >= self._content_length: 

407 reached_eof = True 

408 elif stop == 0: 

409 reached_eof = True 

410 else: 

411 reached_eof = False 

412 

413 if reached_eof: 

414 self._body = io.BytesIO() 

415 self._position = self._content_length 

416 else: 

417 self._open_body(start, stop) 

418 

419 return self._position 

420 

421 def _open_body(self, start=None, stop=None): 

422 """Open a connection to download the specified range of bytes. Store 

423 the open file handle in self._body. 

424 

425 If no range is specified, start defaults to self._position. 

426 start and stop follow the semantics of the http range header, 

427 so a stop without a start will read bytes beginning at stop. 

428 

429 As a side effect, set self._content_length. Set self._position 

430 to self._content_length if start is past end of file. 

431 """ 

432 if start is None and stop is None: 

433 start = self._position 

434 range_string = smart_open.utils.make_range_string(start, stop) 

435 

436 try: 

437 # Optimistically try to fetch the requested content range. 

438 response = _get( 

439 self._client, 

440 self._bucket, 

441 self._key, 

442 self._version_id, 

443 range_string, 

444 ) 

445 except IOError as ioe: 

446 # Handle requested content range exceeding content size. 

447 error_response = _unwrap_ioerror(ioe) 

448 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE: 

449 raise 

450 self._position = self._content_length = int(error_response['ActualObjectSize']) 

451 self._body = io.BytesIO() 

452 else: 

453 # 

454 # Keep track of how many times boto3's built-in retry mechanism 

455 # activated. 

456 # 

457 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response 

458 # 

459 logger.debug( 

460 '%s: RetryAttempts: %d', 

461 self, 

462 response['ResponseMetadata']['RetryAttempts'], 

463 ) 

464 units, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange']) 

465 self._content_length = length 

466 self._position = start 

467 self._body = response['Body'] 

468 

469 def read(self, size=-1): 

470 """Read from the continuous connection with the remote peer.""" 

471 if self._body is None: 

472 # This is necessary for the very first read() after __init__(). 

473 self._open_body() 

474 if self._position >= self._content_length: 

475 return b'' 

476 

477 # 

478 # Boto3 has built-in error handling and retry mechanisms: 

479 # 

480 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html 

481 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

482 # 

483 # Unfortunately, it isn't always enough. There is still a non-zero 

484 # possibility that an exception will slip past these mechanisms and 

485 # terminate the read prematurely. Luckily, at this stage, it's very 

486 # simple to recover from the problem: wait a little bit, reopen the 

487 # HTTP connection and try again. Usually, a single retry attempt is 

488 # enough to recover, but we try multiple times "just in case". 

489 # 

490 for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1): 

491 try: 

492 if size == -1: 

493 binary = self._body.read() 

494 else: 

495 binary = self._body.read(size) 

496 except ( 

497 ConnectionResetError, 

498 botocore.exceptions.BotoCoreError, 

499 urllib3.exceptions.HTTPError, 

500 ) as err: 

501 logger.warning( 

502 '%s: caught %r while reading %d bytes, sleeping %ds before retry', 

503 self, 

504 err, 

505 size, 

506 seconds, 

507 ) 

508 time.sleep(seconds) 

509 self._open_body() 

510 else: 

511 self._position += len(binary) 

512 return binary 

513 

514 raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt)) 

515 

516 def __str__(self): 

517 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) 

518 

519 

520def _initialize_boto3(rw, client, client_kwargs, bucket, key): 

521 """Created the required objects for accessing S3. Ideally, they have 

522 been already created for us and we can just reuse them.""" 

523 if client_kwargs is None: 

524 client_kwargs = {} 

525 

526 if client is None: 

527 init_kwargs = client_kwargs.get('S3.Client', {}) 

528 client = boto3.client('s3', **init_kwargs) 

529 assert client 

530 

531 rw._client = _ClientWrapper(client, client_kwargs) 

532 rw._bucket = bucket 

533 rw._key = key 

534 

535 

536class Reader(io.BufferedIOBase): 

537 """Reads bytes from S3. 

538 

539 Implements the io.BufferedIOBase interface of the standard library.""" 

540 

541 def __init__( 

542 self, 

543 bucket, 

544 key, 

545 version_id=None, 

546 buffer_size=DEFAULT_BUFFER_SIZE, 

547 line_terminator=constants.BINARY_NEWLINE, 

548 defer_seek=False, 

549 client=None, 

550 client_kwargs=None, 

551 ): 

552 self._version_id = version_id 

553 self._buffer_size = buffer_size 

554 

555 _initialize_boto3(self, client, client_kwargs, bucket, key) 

556 

557 self._raw_reader = _SeekableRawReader( 

558 self._client, 

559 bucket, 

560 key, 

561 self._version_id, 

562 ) 

563 self._current_pos = 0 

564 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) 

565 self._eof = False 

566 self._line_terminator = line_terminator 

567 

568 # 

569 # This member is part of the io.BufferedIOBase interface. 

570 # 

571 self.raw = None 

572 

573 if not defer_seek: 

574 self.seek(0) 

575 

576 # 

577 # io.BufferedIOBase methods. 

578 # 

579 

580 def close(self): 

581 """Flush and close this stream.""" 

582 pass 

583 

584 def readable(self): 

585 """Return True if the stream can be read from.""" 

586 return True 

587 

588 def read(self, size=-1): 

589 """Read up to size bytes from the object and return them.""" 

590 if size == 0: 

591 return b'' 

592 elif size < 0: 

593 # call read() before setting _current_pos to make sure _content_length is set 

594 out = self._read_from_buffer() + self._raw_reader.read() 

595 self._current_pos = self._raw_reader._content_length 

596 return out 

597 

598 # 

599 # Return unused data first 

600 # 

601 if len(self._buffer) >= size: 

602 return self._read_from_buffer(size) 

603 

604 # 

605 # If the stream is finished, return what we have. 

606 # 

607 if self._eof: 

608 return self._read_from_buffer() 

609 

610 self._fill_buffer(size) 

611 return self._read_from_buffer(size) 

612 

613 def read1(self, size=-1): 

614 """This is the same as read().""" 

615 return self.read(size=size) 

616 

617 def readinto(self, b): 

618 """Read up to len(b) bytes into b, and return the number of bytes 

619 read.""" 

620 data = self.read(len(b)) 

621 if not data: 

622 return 0 

623 b[:len(data)] = data 

624 return len(data) 

625 

626 def readline(self, limit=-1): 

627 """Read up to and including the next newline. Returns the bytes read.""" 

628 if limit != -1: 

629 raise NotImplementedError('limits other than -1 not implemented yet') 

630 

631 # 

632 # A single line may span multiple buffers. 

633 # 

634 line = io.BytesIO() 

635 while not (self._eof and len(self._buffer) == 0): 

636 line_part = self._buffer.readline(self._line_terminator) 

637 line.write(line_part) 

638 self._current_pos += len(line_part) 

639 

640 if line_part.endswith(self._line_terminator): 

641 break 

642 else: 

643 self._fill_buffer() 

644 

645 return line.getvalue() 

646 

647 def seekable(self): 

648 """If False, seek(), tell() and truncate() will raise IOError. 

649 

650 We offer only seek support, and no truncate support.""" 

651 return True 

652 

653 def seek(self, offset, whence=constants.WHENCE_START): 

654 """Seek to the specified position. 

655 

656 :param int offset: The offset in bytes. 

657 :param int whence: Where the offset is from. 

658 

659 Returns the position after seeking.""" 

660 # Convert relative offset to absolute, since self._raw_reader 

661 # doesn't know our current position. 

662 if whence == constants.WHENCE_CURRENT: 

663 whence = constants.WHENCE_START 

664 offset += self._current_pos 

665 

666 self._current_pos = self._raw_reader.seek(offset, whence) 

667 

668 self._buffer.empty() 

669 self._eof = self._current_pos == self._raw_reader._content_length 

670 return self._current_pos 

671 

672 def tell(self): 

673 """Return the current position within the file.""" 

674 return self._current_pos 

675 

676 def truncate(self, size=None): 

677 """Unsupported.""" 

678 raise io.UnsupportedOperation 

679 

680 def detach(self): 

681 """Unsupported.""" 

682 raise io.UnsupportedOperation 

683 

684 def terminate(self): 

685 """Do nothing.""" 

686 pass 

687 

688 def to_boto3(self, resource): 

689 """Create an **independent** `boto3.s3.Object` instance that points to 

690 the same S3 object as this instance. 

691 Changes to the returned object will not affect the current instance. 

692 """ 

693 assert resource, 'resource must be a boto3.resource instance' 

694 obj = resource.Object(self._bucket, self._key) 

695 if self._version_id is not None: 

696 return obj.Version(self._version_id) 

697 else: 

698 return obj 

699 

700 # 

701 # Internal methods. 

702 # 

703 def _read_from_buffer(self, size=-1): 

704 """Remove at most size bytes from our buffer and return them.""" 

705 size = size if size >= 0 else len(self._buffer) 

706 part = self._buffer.read(size) 

707 self._current_pos += len(part) 

708 return part 

709 

710 def _fill_buffer(self, size=-1): 

711 size = max(size, self._buffer._chunk_size) 

712 while len(self._buffer) < size and not self._eof: 

713 bytes_read = self._buffer.fill(self._raw_reader) 

714 if bytes_read == 0: 

715 logger.debug('%s: reached EOF while filling buffer', self) 

716 self._eof = True 

717 

718 def __str__(self): 

719 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key) 

720 

721 def __repr__(self): 

722 return ( 

723 "smart_open.s3.Reader(" 

724 "bucket=%r, " 

725 "key=%r, " 

726 "version_id=%r, " 

727 "buffer_size=%r, " 

728 "line_terminator=%r)" 

729 ) % ( 

730 self._bucket, 

731 self._key, 

732 self._version_id, 

733 self._buffer_size, 

734 self._line_terminator, 

735 ) 

736 

737 

738class MultipartWriter(io.BufferedIOBase): 

739 """Writes bytes to S3 using the multi part API. 

740 

741 Implements the io.BufferedIOBase interface of the standard library.""" 

742 

743 def __init__( 

744 self, 

745 bucket, 

746 key, 

747 min_part_size=DEFAULT_MIN_PART_SIZE, 

748 client=None, 

749 client_kwargs=None, 

750 writebuffer=None, 

751 ): 

752 if min_part_size < MIN_MIN_PART_SIZE: 

753 logger.warning("S3 requires minimum part size >= 5MB; \ 

754multipart upload may fail") 

755 self._min_part_size = min_part_size 

756 

757 _initialize_boto3(self, client, client_kwargs, bucket, key) 

758 

759 try: 

760 partial = functools.partial( 

761 self._client.create_multipart_upload, 

762 Bucket=bucket, 

763 Key=key, 

764 ) 

765 self._upload_id = _retry_if_failed(partial)['UploadId'] 

766 except botocore.client.ClientError as error: 

767 raise ValueError( 

768 'the bucket %r does not exist, or is forbidden for access (%r)' % ( 

769 bucket, error 

770 ) 

771 ) from error 

772 

773 if writebuffer is None: 

774 self._buf = io.BytesIO() 

775 else: 

776 self._buf = writebuffer 

777 

778 self._total_bytes = 0 

779 self._total_parts = 0 

780 self._parts = [] 

781 

782 # 

783 # This member is part of the io.BufferedIOBase interface. 

784 # 

785 self.raw = None 

786 

787 def flush(self): 

788 pass 

789 

790 # 

791 # Override some methods from io.IOBase. 

792 # 

793 def close(self): 

794 if self._buf.tell(): 

795 self._upload_next_part() 

796 

797 if self._total_bytes and self._upload_id: 

798 partial = functools.partial( 

799 self._client.complete_multipart_upload, 

800 Bucket=self._bucket, 

801 Key=self._key, 

802 UploadId=self._upload_id, 

803 MultipartUpload={'Parts': self._parts}, 

804 ) 

805 _retry_if_failed(partial) 

806 logger.debug('%s: completed multipart upload', self) 

807 elif self._upload_id: 

808 # 

809 # AWS complains with "The XML you provided was not well-formed or 

810 # did not validate against our published schema" when the input is 

811 # completely empty => abort the upload, no file created. 

812 # 

813 # We work around this by creating an empty file explicitly. 

814 # 

815 assert self._upload_id, "no multipart upload in progress" 

816 self._client.abort_multipart_upload( 

817 Bucket=self._bucket, 

818 Key=self._key, 

819 UploadId=self._upload_id, 

820 ) 

821 self._client.put_object( 

822 Bucket=self._bucket, 

823 Key=self._key, 

824 Body=b'', 

825 ) 

826 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) 

827 self._upload_id = None 

828 

829 @property 

830 def closed(self): 

831 return self._upload_id is None 

832 

833 def writable(self): 

834 """Return True if the stream supports writing.""" 

835 return True 

836 

837 def seekable(self): 

838 """If False, seek(), tell() and truncate() will raise IOError. 

839 

840 We offer only tell support, and no seek or truncate support.""" 

841 return True 

842 

843 def seek(self, offset, whence=constants.WHENCE_START): 

844 """Unsupported.""" 

845 raise io.UnsupportedOperation 

846 

847 def truncate(self, size=None): 

848 """Unsupported.""" 

849 raise io.UnsupportedOperation 

850 

851 def tell(self): 

852 """Return the current stream position.""" 

853 return self._total_bytes 

854 

855 # 

856 # io.BufferedIOBase methods. 

857 # 

858 def detach(self): 

859 raise io.UnsupportedOperation("detach() not supported") 

860 

861 def write(self, b): 

862 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

863 interface implementation) to the S3 file. 

864 

865 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html 

866 

867 There's buffering happening under the covers, so this may not actually 

868 do any HTTP transfer right away.""" 

869 

870 length = self._buf.write(b) 

871 self._total_bytes += length 

872 

873 if self._buf.tell() >= self._min_part_size: 

874 self._upload_next_part() 

875 

876 return length 

877 

878 def terminate(self): 

879 """Cancel the underlying multipart upload.""" 

880 assert self._upload_id, "no multipart upload in progress" 

881 self._client.abort_multipart_upload( 

882 Bucket=self._bucket, 

883 Key=self._key, 

884 UploadId=self._upload_id, 

885 ) 

886 self._upload_id = None 

887 

888 def to_boto3(self, resource): 

889 """Create an **independent** `boto3.s3.Object` instance that points to 

890 the same S3 object as this instance. 

891 Changes to the returned object will not affect the current instance. 

892 """ 

893 assert resource, 'resource must be a boto3.resource instance' 

894 return resource.Object(self._bucket, self._key) 

895 

896 # 

897 # Internal methods. 

898 # 

899 def _upload_next_part(self): 

900 part_num = self._total_parts + 1 

901 logger.info( 

902 "%s: uploading part_num: %i, %i bytes (total %.3fGB)", 

903 self, 

904 part_num, 

905 self._buf.tell(), 

906 self._total_bytes / 1024.0 ** 3, 

907 ) 

908 self._buf.seek(0) 

909 

910 # 

911 # Network problems in the middle of an upload are particularly 

912 # troublesome. We don't want to abort the entire upload just because 

913 # of a temporary connection problem, so this part needs to be 

914 # especially robust. 

915 # 

916 upload = _retry_if_failed( 

917 functools.partial( 

918 self._client.upload_part, 

919 Bucket=self._bucket, 

920 Key=self._key, 

921 UploadId=self._upload_id, 

922 PartNumber=part_num, 

923 Body=self._buf, 

924 ) 

925 ) 

926 

927 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) 

928 logger.debug("%s: upload of part_num #%i finished", self, part_num) 

929 

930 self._total_parts += 1 

931 

932 self._buf.seek(0) 

933 self._buf.truncate(0) 

934 

935 def __enter__(self): 

936 return self 

937 

938 def __exit__(self, exc_type, exc_val, exc_tb): 

939 if exc_type is not None: 

940 self.terminate() 

941 else: 

942 self.close() 

943 

944 def __str__(self): 

945 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key) 

946 

947 def __repr__(self): 

948 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, min_part_size=%r)" % ( 

949 self._bucket, 

950 self._key, 

951 self._min_part_size, 

952 ) 

953 

954 

955class SinglepartWriter(io.BufferedIOBase): 

956 """Writes bytes to S3 using the single part API. 

957 

958 Implements the io.BufferedIOBase interface of the standard library. 

959 

960 This class buffers all of its input in memory until its `close` method is called. Only then will 

961 the data be written to S3 and the buffer is released.""" 

962 

963 def __init__( 

964 self, 

965 bucket, 

966 key, 

967 client=None, 

968 client_kwargs=None, 

969 writebuffer=None, 

970 ): 

971 _initialize_boto3(self, client, client_kwargs, bucket, key) 

972 

973 try: 

974 self._client.head_bucket(Bucket=bucket) 

975 except botocore.client.ClientError as e: 

976 raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e 

977 

978 if writebuffer is None: 

979 self._buf = io.BytesIO() 

980 else: 

981 self._buf = writebuffer 

982 

983 self._total_bytes = 0 

984 

985 # 

986 # This member is part of the io.BufferedIOBase interface. 

987 # 

988 self.raw = None 

989 

990 def flush(self): 

991 pass 

992 

993 # 

994 # Override some methods from io.IOBase. 

995 # 

996 def close(self): 

997 if self._buf is None: 

998 return 

999 

1000 self._buf.seek(0) 

1001 

1002 try: 

1003 self._client.put_object( 

1004 Bucket=self._bucket, 

1005 Key=self._key, 

1006 Body=self._buf, 

1007 ) 

1008 except botocore.client.ClientError as e: 

1009 raise ValueError( 

1010 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e 

1011 

1012 logger.debug("%s: direct upload finished", self) 

1013 self._buf = None 

1014 

1015 @property 

1016 def closed(self): 

1017 return self._buf is None 

1018 

1019 def writable(self): 

1020 """Return True if the stream supports writing.""" 

1021 return True 

1022 

1023 def seekable(self): 

1024 """If False, seek(), tell() and truncate() will raise IOError. 

1025 

1026 We offer only tell support, and no seek or truncate support.""" 

1027 return True 

1028 

1029 def seek(self, offset, whence=constants.WHENCE_START): 

1030 """Unsupported.""" 

1031 raise io.UnsupportedOperation 

1032 

1033 def truncate(self, size=None): 

1034 """Unsupported.""" 

1035 raise io.UnsupportedOperation 

1036 

1037 def tell(self): 

1038 """Return the current stream position.""" 

1039 return self._total_bytes 

1040 

1041 # 

1042 # io.BufferedIOBase methods. 

1043 # 

1044 def detach(self): 

1045 raise io.UnsupportedOperation("detach() not supported") 

1046 

1047 def write(self, b): 

1048 """Write the given buffer (bytes, bytearray, memoryview or any buffer 

1049 interface implementation) into the buffer. Content of the buffer will be 

1050 written to S3 on close as a single-part upload. 

1051 

1052 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html""" 

1053 

1054 length = self._buf.write(b) 

1055 self._total_bytes += length 

1056 return length 

1057 

1058 def terminate(self): 

1059 """Nothing to cancel in single-part uploads.""" 

1060 return 

1061 

1062 # 

1063 # Internal methods. 

1064 # 

1065 def __enter__(self): 

1066 return self 

1067 

1068 def __exit__(self, exc_type, exc_val, exc_tb): 

1069 if exc_type is not None: 

1070 self.terminate() 

1071 else: 

1072 self.close() 

1073 

1074 def __str__(self): 

1075 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._object.bucket_name, self._object.key) 

1076 

1077 def __repr__(self): 

1078 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) 

1079 

1080 

1081def _retry_if_failed( 

1082 partial, 

1083 attempts=_UPLOAD_ATTEMPTS, 

1084 sleep_seconds=_SLEEP_SECONDS, 

1085 exceptions=None): 

1086 if exceptions is None: 

1087 exceptions = (botocore.exceptions.EndpointConnectionError, ) 

1088 for attempt in range(attempts): 

1089 try: 

1090 return partial() 

1091 except exceptions: 

1092 logger.critical( 

1093 'Unable to connect to the endpoint. Check your network connection. ' 

1094 'Sleeping and retrying %d more times ' 

1095 'before giving up.' % (attempts - attempt - 1) 

1096 ) 

1097 time.sleep(sleep_seconds) 

1098 else: 

1099 logger.critical('Unable to connect to the endpoint. Giving up.') 

1100 raise IOError('Unable to connect to the endpoint after %d attempts' % attempts) 

1101 

1102 

1103def _accept_all(key): 

1104 return True 

1105 

1106 

1107def iter_bucket( 

1108 bucket_name, 

1109 prefix='', 

1110 accept_key=None, 

1111 key_limit=None, 

1112 workers=16, 

1113 retries=3, 

1114 **session_kwargs): 

1115 """ 

1116 Iterate and download all S3 objects under `s3://bucket_name/prefix`. 

1117 

1118 Parameters 

1119 ---------- 

1120 bucket_name: str 

1121 The name of the bucket. 

1122 prefix: str, optional 

1123 Limits the iteration to keys starting with the prefix. 

1124 accept_key: callable, optional 

1125 This is a function that accepts a key name (unicode string) and 

1126 returns True/False, signalling whether the given key should be downloaded. 

1127 The default behavior is to accept all keys. 

1128 key_limit: int, optional 

1129 If specified, the iterator will stop after yielding this many results. 

1130 workers: int, optional 

1131 The number of subprocesses to use. 

1132 retries: int, optional 

1133 The number of time to retry a failed download. 

1134 session_kwargs: dict, optional 

1135 Keyword arguments to pass when creating a new session. 

1136 For a list of available names and values, see: 

1137 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session 

1138 

1139 

1140 Yields 

1141 ------ 

1142 str 

1143 The full key name (does not include the bucket name). 

1144 bytes 

1145 The full contents of the key. 

1146 

1147 Notes 

1148 ----- 

1149 The keys are processed in parallel, using `workers` processes (default: 16), 

1150 to speed up downloads greatly. If multiprocessing is not available, thus 

1151 _MULTIPROCESSING is False, this parameter will be ignored. 

1152 

1153 Examples 

1154 -------- 

1155 

1156 >>> # get all JSON files under "mybucket/foo/" 

1157 >>> for key, content in iter_bucket( 

1158 ... bucket_name, prefix='foo/', 

1159 ... accept_key=lambda key: key.endswith('.json')): 

1160 ... print key, len(content) 

1161 

1162 >>> # limit to 10k files, using 32 parallel workers (default is 16) 

1163 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32): 

1164 ... print key, len(content) 

1165 """ 

1166 if accept_key is None: 

1167 accept_key = _accept_all 

1168 

1169 # 

1170 # If people insist on giving us bucket instances, silently extract the name 

1171 # before moving on. Works for boto3 as well as boto. 

1172 # 

1173 try: 

1174 bucket_name = bucket_name.name 

1175 except AttributeError: 

1176 pass 

1177 

1178 total_size, key_no = 0, -1 

1179 key_iterator = _list_bucket( 

1180 bucket_name, 

1181 prefix=prefix, 

1182 accept_key=accept_key, 

1183 **session_kwargs) 

1184 download_key = functools.partial( 

1185 _download_key, 

1186 bucket_name=bucket_name, 

1187 retries=retries, 

1188 **session_kwargs) 

1189 

1190 with smart_open.concurrency.create_pool(processes=workers) as pool: 

1191 result_iterator = pool.imap_unordered(download_key, key_iterator) 

1192 key_no = 0 

1193 while True: 

1194 try: 

1195 (key, content) = result_iterator.__next__() 

1196 if key_no % 1000 == 0: 

1197 logger.info( 

1198 "yielding key #%i: %s, size %i (total %.1fMB)", 

1199 key_no, key, len(content), total_size / 1024.0 ** 2 

1200 ) 

1201 yield key, content 

1202 total_size += len(content) 

1203 if key_limit is not None and key_no + 1 >= key_limit: 

1204 # we were asked to output only a limited number of keys => we're done 

1205 break 

1206 except botocore.exceptions.ClientError as err: 

1207 # 

1208 # ignore 404 not found errors: they mean the object was deleted 

1209 # after we listed the contents of the bucket, but before we 

1210 # downloaded the object. 

1211 # 

1212 if not ('Error' in err.response and err.response['Error'].get('Code') == '404'): 

1213 raise err 

1214 except StopIteration: 

1215 break 

1216 key_no += 1 

1217 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size)) 

1218 

1219 

1220def _list_bucket( 

1221 bucket_name, 

1222 prefix='', 

1223 accept_key=lambda k: True, 

1224 **session_kwargs): 

1225 session = boto3.session.Session(**session_kwargs) 

1226 client = session.client('s3') 

1227 ctoken = None 

1228 

1229 while True: 

1230 # list_objects_v2 doesn't like a None value for ContinuationToken 

1231 # so we don't set it if we don't have one. 

1232 if ctoken: 

1233 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken) 

1234 else: 

1235 kwargs = dict(Bucket=bucket_name, Prefix=prefix) 

1236 response = client.list_objects_v2(**kwargs) 

1237 try: 

1238 content = response['Contents'] 

1239 except KeyError: 

1240 pass 

1241 else: 

1242 for c in content: 

1243 key = c['Key'] 

1244 if accept_key(key): 

1245 yield key 

1246 ctoken = response.get('NextContinuationToken', None) 

1247 if not ctoken: 

1248 break 

1249 

1250 

1251def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs): 

1252 if bucket_name is None: 

1253 raise ValueError('bucket_name may not be None') 

1254 

1255 # 

1256 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-or-multiprocessing-with-resources 

1257 # 

1258 session = boto3.session.Session(**session_kwargs) 

1259 s3 = session.resource('s3') 

1260 bucket = s3.Bucket(bucket_name) 

1261 

1262 # Sometimes, https://github.com/boto/boto/issues/2409 can happen 

1263 # because of network issues on either side. 

1264 # Retry up to 3 times to ensure its not a transient issue. 

1265 for x in range(retries + 1): 

1266 try: 

1267 content_bytes = _download_fileobj(bucket, key_name) 

1268 except botocore.client.ClientError: 

1269 # Actually fail on last pass through the loop 

1270 if x == retries: 

1271 raise 

1272 # Otherwise, try again, as this might be a transient timeout 

1273 pass 

1274 else: 

1275 return key_name, content_bytes 

1276 

1277 

1278def _download_fileobj(bucket, key_name): 

1279 # 

1280 # This is a separate function only because it makes it easier to inject 

1281 # exceptions during tests. 

1282 # 

1283 buf = io.BytesIO() 

1284 bucket.download_fileobj(key_name, buf) 

1285 return buf.getvalue()