Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/smart_open/s3.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8"""Implements file-like objects for reading and writing from/to AWS S3."""
9from __future__ import annotations
11import http
12import io
13import functools
14import logging
15import time
16import warnings
18from typing import (
19 Callable,
20 List,
21 TYPE_CHECKING,
22)
24try:
25 import boto3
26 import botocore.client
27 import botocore.exceptions
28 import urllib3.exceptions
29except ImportError:
30 MISSING_DEPS = True
32import smart_open.bytebuffer
33import smart_open.concurrency
34import smart_open.utils
36from smart_open import constants
39if TYPE_CHECKING:
40 from mypy_boto3_s3.client import S3Client
41 from typing_extensions import Buffer
43logger = logging.getLogger(__name__)
45#
46# AWS puts restrictions on the part size for multipart uploads.
47# Each part must be more than 5MB, and less than 5GB.
48#
49# On top of that, our MultipartWriter has a min_part_size option.
50# In retrospect, it's an unfortunate name, because it conflicts with the
51# minimum allowable part size (5MB), but it's too late to change it, because
52# people are using that parameter (unlike the MIN, DEFAULT, MAX constants).
53# It really just means "part size": as soon as you have this many bytes,
54# write a part to S3 (see the MultipartWriter.write method).
55#
57MIN_PART_SIZE = 5 * 1024 ** 2
58"""The absolute minimum permitted by Amazon."""
60DEFAULT_PART_SIZE = 50 * 1024**2
61"""The default part size for S3 multipart uploads, chosen carefully by smart_open"""
63MAX_PART_SIZE = 5 * 1024 ** 3
64"""The absolute maximum permitted by Amazon."""
66SCHEMES = ("s3", "s3n", 's3u', "s3a")
67DEFAULT_PORT = 443
68DEFAULT_HOST = 's3.amazonaws.com'
70DEFAULT_BUFFER_SIZE = 128 * 1024
72URI_EXAMPLES = (
73 's3://my_bucket/my_key',
74 's3://my_key:my_secret@my_bucket/my_key',
75 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key',
76)
78# Returned by AWS when we try to seek beyond EOF.
79_OUT_OF_RANGE = 'InvalidRange'
82class Retry:
83 def __init__(self):
84 self.attempts: int = 6
85 self.sleep_seconds: int = 10
86 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError]
87 self.client_error_codes: List[str] = ['NoSuchUpload']
89 def _do(self, fn: Callable):
90 for attempt in range(self.attempts):
91 try:
92 return fn()
93 except tuple(self.exceptions) as err:
94 logger.critical(
95 'Caught non-fatal %s, retrying %d more times',
96 err,
97 self.attempts - attempt - 1,
98 )
99 logger.exception(err)
100 time.sleep(self.sleep_seconds)
101 except botocore.exceptions.ClientError as err:
102 error_code = err.response['Error'].get('Code')
103 if error_code not in self.client_error_codes:
104 raise
105 logger.critical(
106 'Caught non-fatal ClientError (%s), retrying %d more times',
107 error_code,
108 self.attempts - attempt - 1,
109 )
110 logger.exception(err)
111 time.sleep(self.sleep_seconds)
112 else:
113 logger.critical('encountered too many non-fatal errors, giving up')
114 raise IOError('%s failed after %d attempts', fn.func, self.attempts)
117#
118# The retry mechanism for this submodule. Client code may modify it, e.g. by
119# updating RETRY.sleep_seconds and friends.
120#
121if 'MISSING_DEPS' not in locals():
122 RETRY = Retry()
125class _ClientWrapper:
126 """Wraps a client to inject the appropriate keyword args into each method call.
128 The keyword args are a dictionary keyed by the fully qualified method name.
129 For example, S3.Client.create_multipart_upload.
131 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client
133 This wrapper behaves identically to the client otherwise.
134 """
135 def __init__(self, client, kwargs):
136 self.client = client
137 self.kwargs = kwargs
139 def __getattr__(self, method_name):
140 method = getattr(self.client, method_name)
141 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {})
142 return functools.partial(method, **kwargs)
145def parse_uri(uri_as_string):
146 #
147 # Restrictions on bucket names and labels:
148 #
149 # - Bucket names must be at least 3 and no more than 63 characters long.
150 # - Bucket names must be a series of one or more labels.
151 # - Adjacent labels are separated by a single period (.).
152 # - Bucket names can contain lowercase letters, numbers, and hyphens.
153 # - Each label must start and end with a lowercase letter or a number.
154 #
155 # We use the above as a guide only, and do not perform any validation. We
156 # let boto3 take care of that for us.
157 #
158 split_uri = smart_open.utils.safe_urlsplit(uri_as_string)
159 assert split_uri.scheme in SCHEMES
161 port = DEFAULT_PORT
162 host = DEFAULT_HOST
163 ordinary_calling_format = False
164 #
165 # These defaults tell boto3 to look for credentials elsewhere
166 #
167 access_id, access_secret = None, None
169 #
170 # Common URI template [secret:key@][host[:port]@]bucket/object
171 #
172 # The urlparse function doesn't handle the above schema, so we have to do
173 # it ourselves.
174 #
175 uri = split_uri.netloc + split_uri.path
177 #
178 # Attempt to extract edge-case authentication details from the URL.
179 #
180 # See:
181 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/
182 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases
183 #
184 if '@' in uri:
185 maybe_auth, rest = uri.split('@', 1)
186 if ':' in maybe_auth:
187 maybe_id, maybe_secret = maybe_auth.split(':', 1)
188 if '/' not in maybe_id:
189 access_id, access_secret = maybe_id, maybe_secret
190 uri = rest
192 head, key_id = uri.split('/', 1)
193 if '@' in head and ':' in head:
194 ordinary_calling_format = True
195 host_port, bucket_id = head.split('@')
196 host, port = host_port.split(':', 1)
197 port = int(port)
198 elif '@' in head:
199 ordinary_calling_format = True
200 host, bucket_id = head.split('@')
201 else:
202 bucket_id = head
204 return dict(
205 scheme=split_uri.scheme,
206 bucket_id=bucket_id,
207 key_id=key_id,
208 port=port,
209 host=host,
210 ordinary_calling_format=ordinary_calling_format,
211 access_id=access_id,
212 access_secret=access_secret,
213 )
216def _consolidate_params(uri, transport_params):
217 """Consolidates the parsed Uri with the additional parameters.
219 This is necessary because the user can pass some of the parameters can in
220 two different ways:
222 1) Via the URI itself
223 2) Via the transport parameters
225 These are not mutually exclusive, but we have to pick one over the other
226 in a sensible way in order to proceed.
228 """
229 transport_params = dict(transport_params)
231 def inject(**kwargs):
232 try:
233 client_kwargs = transport_params['client_kwargs']
234 except KeyError:
235 client_kwargs = transport_params['client_kwargs'] = {}
237 try:
238 init_kwargs = client_kwargs['S3.Client']
239 except KeyError:
240 init_kwargs = client_kwargs['S3.Client'] = {}
242 init_kwargs.update(**kwargs)
244 client = transport_params.get('client')
245 if client is not None and (uri['access_id'] or uri['access_secret']):
246 logger.warning(
247 'ignoring credentials parsed from URL because they conflict with '
248 'transport_params["client"]. Set transport_params["client"] to None '
249 'to suppress this warning.'
250 )
251 uri.update(access_id=None, access_secret=None)
252 elif (uri['access_id'] and uri['access_secret']):
253 inject(
254 aws_access_key_id=uri['access_id'],
255 aws_secret_access_key=uri['access_secret'],
256 )
257 uri.update(access_id=None, access_secret=None)
259 if client is not None and uri['host'] != DEFAULT_HOST:
260 logger.warning(
261 'ignoring endpoint_url parsed from URL because they conflict with '
262 'transport_params["client"]. Set transport_params["client"] to None '
263 'to suppress this warning.'
264 )
265 uri.update(host=None)
266 elif uri['host'] != DEFAULT_HOST:
267 if uri['scheme'] == 's3u':
268 scheme = 'http'
269 else:
270 scheme = 'https'
271 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri)
272 uri.update(host=None)
274 return uri, transport_params
277def open_uri(uri, mode, transport_params):
278 deprecated = (
279 'multipart_upload_kwargs',
280 'object_kwargs',
281 'resource',
282 'resource_kwargs',
283 'session',
284 'singlepart_upload_kwargs',
285 )
286 detected = [k for k in deprecated if k in transport_params]
287 if detected:
288 doc_url = (
289 'https://github.com/RaRe-Technologies/smart_open/blob/develop/'
290 'MIGRATING_FROM_OLDER_VERSIONS.rst'
291 )
292 #
293 # We use warnings.warn /w UserWarning instead of logger.warn here because
294 #
295 # 1) Not everyone has logging enabled; and
296 # 2) check_kwargs (below) already uses logger.warn with a similar message
297 #
298 # https://github.com/RaRe-Technologies/smart_open/issues/614
299 #
300 message = (
301 'ignoring the following deprecated transport parameters: %r. '
302 'See <%s> for details' % (detected, doc_url)
303 )
304 warnings.warn(message, UserWarning)
305 parsed_uri = parse_uri(uri)
306 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params)
307 kwargs = smart_open.utils.check_kwargs(open, transport_params)
308 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs)
311def open(
312 bucket_id,
313 key_id,
314 mode,
315 version_id=None,
316 buffer_size=DEFAULT_BUFFER_SIZE,
317 min_part_size=DEFAULT_PART_SIZE,
318 multipart_upload=True,
319 defer_seek=False,
320 client=None,
321 client_kwargs=None,
322 writebuffer=None,
323):
324 """Open an S3 object for reading or writing.
326 Parameters
327 ----------
328 bucket_id: str
329 The name of the bucket this object resides in.
330 key_id: str
331 The name of the key within the bucket.
332 mode: str
333 The mode for opening the object. Must be either "rb" or "wb".
334 buffer_size: int, optional
335 The buffer size to use when performing I/O.
336 min_part_size: int, optional
337 The minimum part size for multipart uploads, in bytes.
339 When the writebuffer contains this many bytes, smart_open will upload
340 the bytes to S3 as a single part of a multi-part upload, freeing the
341 buffer either partially or entirely. When you close the writer, it
342 will assemble the parts together.
344 The value determines the upper limit for the writebuffer. If buffer
345 space is short (e.g. you are buffering to memory), then use a smaller
346 value for min_part_size, or consider buffering to disk instead (see
347 the writebuffer option).
349 The value must be between 5MB and 5GB. If you specify a value outside
350 of this range, smart_open will adjust it for you, because otherwise the
351 upload _will_ fail.
353 For writing only. Does not apply if you set multipart_upload=False.
354 multipart_upload: bool, optional
355 Default: `True`
356 If set to `True`, will use multipart upload for writing to S3. If set
357 to `False`, S3 upload will use the S3 Single-Part Upload API, which
358 is more ideal for small file sizes.
360 For writing only.
361 version_id: str, optional
362 Version of the object, used when reading object.
363 If None, will fetch the most recent version.
364 defer_seek: boolean, optional
365 Default: `False`
366 If set to `True` on a file opened for reading, GetObject will not be
367 called until the first seek() or read().
368 Avoids redundant API queries when seeking before reading.
369 client: object, optional
370 The S3 client to use when working with boto3.
371 If you don't specify this, then smart_open will create a new client for you.
372 client_kwargs: dict, optional
373 Additional parameters to pass to the relevant functions of the client.
374 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`.
375 The values are kwargs to pass to that method each time it is called.
376 writebuffer: IO[bytes], optional
377 By default, this module will buffer data in memory using io.BytesIO
378 when writing. Pass another binary IO instance here to use it instead.
379 For example, you may pass a file object to buffer to local disk instead
380 of in RAM. Use this to keep RAM usage low at the expense of additional
381 disk IO. If you pass in an open file, then you are responsible for
382 cleaning it up after writing completes.
383 """
384 logger.debug('%r', locals())
385 if mode not in constants.BINARY_MODES:
386 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
388 if (mode == constants.WRITE_BINARY) and (version_id is not None):
389 raise ValueError("version_id must be None when writing")
391 if mode == constants.READ_BINARY:
392 fileobj = Reader(
393 bucket_id,
394 key_id,
395 version_id=version_id,
396 buffer_size=buffer_size,
397 defer_seek=defer_seek,
398 client=client,
399 client_kwargs=client_kwargs,
400 )
401 elif mode == constants.WRITE_BINARY:
402 if multipart_upload:
403 fileobj = MultipartWriter(
404 bucket_id,
405 key_id,
406 client=client,
407 client_kwargs=client_kwargs,
408 writebuffer=writebuffer,
409 part_size=min_part_size,
410 )
411 else:
412 fileobj = SinglepartWriter(
413 bucket_id,
414 key_id,
415 client=client,
416 client_kwargs=client_kwargs,
417 writebuffer=writebuffer,
418 )
419 else:
420 assert False, 'unexpected mode: %r' % mode
422 fileobj.name = key_id
423 return fileobj
426def _get(client, bucket, key, version, range_string):
427 try:
428 params = dict(Bucket=bucket, Key=key)
429 if version:
430 params["VersionId"] = version
431 if range_string:
432 params["Range"] = range_string
434 return client.get_object(**params)
435 except botocore.client.ClientError as error:
436 wrapped_error = IOError(
437 'unable to access bucket: %r key: %r version: %r error: %s' % (
438 bucket, key, version, error
439 )
440 )
441 wrapped_error.backend_error = error
442 raise wrapped_error from error
445def _unwrap_ioerror(ioe):
446 """Given an IOError from _get, return the 'Error' dictionary from boto."""
447 try:
448 return ioe.backend_error.response['Error']
449 except (AttributeError, KeyError):
450 return None
453class _SeekableRawReader(object):
454 """Read an S3 object.
456 This class is internal to the S3 submodule.
457 """
459 def __init__(
460 self,
461 client,
462 bucket,
463 key,
464 version_id=None,
465 ):
466 self._client = client
467 self._bucket = bucket
468 self._key = key
469 self._version_id = version_id
471 self._content_length = None
472 self._position = 0
473 self._body = None
475 def seek(self, offset, whence=constants.WHENCE_START):
476 """Seek to the specified position.
478 :param int offset: The offset in bytes.
479 :param int whence: Where the offset is from.
481 :returns: the position after seeking.
482 :rtype: int
483 """
484 if whence not in constants.WHENCE_CHOICES:
485 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES)
487 #
488 # Close old body explicitly.
489 # When first seek() after __init__(), self._body is not exist.
490 #
491 if self._body is not None:
492 self._body.close()
493 self._body = None
495 start = None
496 stop = None
497 if whence == constants.WHENCE_START:
498 start = max(0, offset)
499 elif whence == constants.WHENCE_CURRENT:
500 start = max(0, offset + self._position)
501 else:
502 stop = max(0, -offset)
504 #
505 # If we can figure out that we've read past the EOF, then we can save
506 # an extra API call.
507 #
508 if self._content_length is None:
509 reached_eof = False
510 elif start is not None and start >= self._content_length:
511 reached_eof = True
512 elif stop == 0:
513 reached_eof = True
514 else:
515 reached_eof = False
517 if reached_eof:
518 self._body = io.BytesIO()
519 self._position = self._content_length
520 else:
521 self._open_body(start, stop)
523 return self._position
525 def _open_body(self, start=None, stop=None):
526 """Open a connection to download the specified range of bytes. Store
527 the open file handle in self._body.
529 If no range is specified, start defaults to self._position.
530 start and stop follow the semantics of the http range header,
531 so a stop without a start will read bytes beginning at stop.
533 As a side effect, set self._content_length. Set self._position
534 to self._content_length if start is past end of file.
535 """
536 if start is None and stop is None:
537 start = self._position
538 range_string = smart_open.utils.make_range_string(start, stop)
540 try:
541 # Optimistically try to fetch the requested content range.
542 response = _get(
543 self._client,
544 self._bucket,
545 self._key,
546 self._version_id,
547 range_string,
548 )
549 except IOError as ioe:
550 # Handle requested content range exceeding content size.
551 error_response = _unwrap_ioerror(ioe)
552 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE:
553 raise
554 try:
555 self._position = self._content_length = int(error_response['ActualObjectSize'])
556 self._body = io.BytesIO()
557 except KeyError:
558 response = _get(
559 self._client,
560 self._bucket,
561 self._key,
562 self._version_id,
563 None,
564 )
565 self._position = self._content_length = response["ContentLength"]
566 self._body = response["Body"]
567 else:
568 #
569 # Keep track of how many times boto3's built-in retry mechanism
570 # activated.
571 #
572 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response
573 #
574 logger.debug(
575 '%s: RetryAttempts: %d',
576 self,
577 response['ResponseMetadata']['RetryAttempts'],
578 )
579 #
580 # range request may not always return partial content, see:
581 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses
582 #
583 status_code = response['ResponseMetadata']['HTTPStatusCode']
584 if status_code == http.HTTPStatus.PARTIAL_CONTENT:
585 _, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange'])
586 self._position = start
587 elif status_code == http.HTTPStatus.OK:
588 length = response["ContentLength"]
589 self._content_length = length
590 self._body = response['Body']
592 def read(self, size=-1):
593 """Read from the continuous connection with the remote peer."""
594 if self._body is None:
595 # This is necessary for the very first read() after __init__().
596 self._open_body()
597 if self._position >= self._content_length:
598 return b''
600 #
601 # Boto3 has built-in error handling and retry mechanisms:
602 #
603 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
604 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
605 #
606 # Unfortunately, it isn't always enough. There is still a non-zero
607 # possibility that an exception will slip past these mechanisms and
608 # terminate the read prematurely. Luckily, at this stage, it's very
609 # simple to recover from the problem: wait a little bit, reopen the
610 # HTTP connection and try again. Usually, a single retry attempt is
611 # enough to recover, but we try multiple times "just in case".
612 #
613 for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1):
614 try:
615 if size == -1:
616 binary = self._body.read()
617 else:
618 binary = self._body.read(size)
619 except (
620 ConnectionResetError,
621 botocore.exceptions.BotoCoreError,
622 urllib3.exceptions.HTTPError,
623 ) as err:
624 logger.warning(
625 '%s: caught %r while reading %d bytes, sleeping %ds before retry',
626 self,
627 err,
628 size,
629 seconds,
630 )
631 time.sleep(seconds)
632 self._open_body()
633 else:
634 self._position += len(binary)
635 return binary
637 raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt))
639 def __str__(self):
640 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key)
643def _initialize_boto3(rw, client, client_kwargs, bucket, key):
644 """Created the required objects for accessing S3. Ideally, they have
645 been already created for us and we can just reuse them."""
646 if client_kwargs is None:
647 client_kwargs = {}
649 if client is None:
650 init_kwargs = client_kwargs.get('S3.Client', {})
651 client = boto3.client('s3', **init_kwargs)
652 assert client
654 rw._client = _ClientWrapper(client, client_kwargs)
655 rw._bucket = bucket
656 rw._key = key
659class Reader(io.BufferedIOBase):
660 """Reads bytes from S3.
662 Implements the io.BufferedIOBase interface of the standard library."""
664 def __init__(
665 self,
666 bucket,
667 key,
668 version_id=None,
669 buffer_size=DEFAULT_BUFFER_SIZE,
670 line_terminator=constants.BINARY_NEWLINE,
671 defer_seek=False,
672 client=None,
673 client_kwargs=None,
674 ):
675 self._version_id = version_id
676 self._buffer_size = buffer_size
678 _initialize_boto3(self, client, client_kwargs, bucket, key)
680 self._raw_reader = _SeekableRawReader(
681 self._client,
682 bucket,
683 key,
684 self._version_id,
685 )
686 self._current_pos = 0
687 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
688 self._eof = False
689 self._line_terminator = line_terminator
690 self._seek_initialized = False
692 #
693 # This member is part of the io.BufferedIOBase interface.
694 #
695 self.raw = None
697 if not defer_seek:
698 self.seek(0)
700 #
701 # io.BufferedIOBase methods.
702 #
704 def close(self):
705 """Flush and close this stream."""
706 pass
708 def readable(self):
709 """Return True if the stream can be read from."""
710 return True
712 def read(self, size=-1):
713 """Read up to size bytes from the object and return them."""
714 if size == 0:
715 return b''
716 elif size < 0:
717 # call read() before setting _current_pos to make sure _content_length is set
718 out = self._read_from_buffer() + self._raw_reader.read()
719 self._current_pos = self._raw_reader._content_length
720 return out
722 #
723 # Return unused data first
724 #
725 if len(self._buffer) >= size:
726 return self._read_from_buffer(size)
728 #
729 # If the stream is finished, return what we have.
730 #
731 if self._eof:
732 return self._read_from_buffer()
734 self._fill_buffer(size)
735 return self._read_from_buffer(size)
737 def read1(self, size=-1):
738 """This is the same as read()."""
739 return self.read(size=size)
741 def readinto(self, b):
742 """Read up to len(b) bytes into b, and return the number of bytes
743 read."""
744 data = self.read(len(b))
745 if not data:
746 return 0
747 b[:len(data)] = data
748 return len(data)
750 def readline(self, limit=-1):
751 """Read up to and including the next newline. Returns the bytes read."""
752 if limit != -1:
753 raise NotImplementedError('limits other than -1 not implemented yet')
755 #
756 # A single line may span multiple buffers.
757 #
758 line = io.BytesIO()
759 while not (self._eof and len(self._buffer) == 0):
760 line_part = self._buffer.readline(self._line_terminator)
761 line.write(line_part)
762 self._current_pos += len(line_part)
764 if line_part.endswith(self._line_terminator):
765 break
766 else:
767 self._fill_buffer()
769 return line.getvalue()
771 def seekable(self):
772 """If False, seek(), tell() and truncate() will raise IOError.
774 We offer only seek support, and no truncate support."""
775 return True
777 def seek(self, offset, whence=constants.WHENCE_START):
778 """Seek to the specified position.
780 :param int offset: The offset in bytes.
781 :param int whence: Where the offset is from.
783 Returns the position after seeking."""
784 # Convert relative offset to absolute, since self._raw_reader
785 # doesn't know our current position.
786 if whence == constants.WHENCE_CURRENT:
787 whence = constants.WHENCE_START
788 offset += self._current_pos
790 if not self._seek_initialized or not (
791 whence == constants.WHENCE_START and offset == self._current_pos
792 ):
793 self._current_pos = self._raw_reader.seek(offset, whence)
795 self._buffer.empty()
797 self._eof = self._current_pos == self._raw_reader._content_length
799 self._seek_initialized = True
800 return self._current_pos
802 def tell(self):
803 """Return the current position within the file."""
804 return self._current_pos
806 def truncate(self, size=None):
807 """Unsupported."""
808 raise io.UnsupportedOperation
810 def detach(self):
811 """Unsupported."""
812 raise io.UnsupportedOperation
814 def terminate(self):
815 """Do nothing."""
816 pass
818 def to_boto3(self, resource):
819 """Create an **independent** `boto3.s3.Object` instance that points to
820 the same S3 object as this instance.
821 Changes to the returned object will not affect the current instance.
822 """
823 assert resource, 'resource must be a boto3.resource instance'
824 obj = resource.Object(self._bucket, self._key)
825 if self._version_id is not None:
826 return obj.Version(self._version_id)
827 else:
828 return obj
830 #
831 # Internal methods.
832 #
833 def _read_from_buffer(self, size=-1):
834 """Remove at most size bytes from our buffer and return them."""
835 size = size if size >= 0 else len(self._buffer)
836 part = self._buffer.read(size)
837 self._current_pos += len(part)
838 return part
840 def _fill_buffer(self, size=-1):
841 size = max(size, self._buffer._chunk_size)
842 while len(self._buffer) < size and not self._eof:
843 bytes_read = self._buffer.fill(self._raw_reader)
844 if bytes_read == 0:
845 logger.debug('%s: reached EOF while filling buffer', self)
846 self._eof = True
848 def __str__(self):
849 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key)
851 def __repr__(self):
852 return (
853 "smart_open.s3.Reader("
854 "bucket=%r, "
855 "key=%r, "
856 "version_id=%r, "
857 "buffer_size=%r, "
858 "line_terminator=%r)"
859 ) % (
860 self._bucket,
861 self._key,
862 self._version_id,
863 self._buffer_size,
864 self._line_terminator,
865 )
868class MultipartWriter(io.BufferedIOBase):
869 """Writes bytes to S3 using the multi part API.
871 Implements the io.BufferedIOBase interface of the standard library."""
873 def __init__(
874 self,
875 bucket,
876 key,
877 part_size=DEFAULT_PART_SIZE,
878 client=None,
879 client_kwargs=None,
880 writebuffer: io.BytesIO | None = None,
881 ):
882 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE)
883 if part_size != adjusted_ps:
884 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}")
885 part_size = adjusted_ps
886 self._part_size = part_size
888 _initialize_boto3(self, client, client_kwargs, bucket, key)
889 self._client: S3Client
890 self._bucket: str
891 self._key: str
893 try:
894 partial = functools.partial(
895 self._client.create_multipart_upload,
896 Bucket=bucket,
897 Key=key,
898 )
899 self._upload_id = RETRY._do(partial)['UploadId']
900 except botocore.client.ClientError as error:
901 raise ValueError(
902 'the bucket %r does not exist, or is forbidden for access (%r)' % (
903 bucket, error
904 )
905 ) from error
907 if writebuffer is None:
908 self._buf = io.BytesIO()
909 else:
910 self._buf = writebuffer
912 self._total_bytes = 0
913 self._total_parts = 0
914 self._parts: list[dict[str, object]] = []
916 #
917 # This member is part of the io.BufferedIOBase interface.
918 #
919 self.raw = None # type: ignore[assignment]
921 def flush(self):
922 pass
924 #
925 # Override some methods from io.IOBase.
926 #
927 def close(self):
928 if self._buf.tell():
929 self._upload_next_part()
931 logger.debug('%s: completing multipart upload', self)
932 if self._total_bytes and self._upload_id:
933 partial = functools.partial(
934 self._client.complete_multipart_upload,
935 Bucket=self._bucket,
936 Key=self._key,
937 UploadId=self._upload_id,
938 MultipartUpload={'Parts': self._parts},
939 )
940 RETRY._do(partial)
941 logger.debug('%s: completed multipart upload', self)
942 elif self._upload_id:
943 #
944 # AWS complains with "The XML you provided was not well-formed or
945 # did not validate against our published schema" when the input is
946 # completely empty => abort the upload, no file created.
947 #
948 # We work around this by creating an empty file explicitly.
949 #
950 self._client.abort_multipart_upload(
951 Bucket=self._bucket,
952 Key=self._key,
953 UploadId=self._upload_id,
954 )
955 self._client.put_object(
956 Bucket=self._bucket,
957 Key=self._key,
958 Body=b'',
959 )
960 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self)
961 self._upload_id = None
963 @property
964 def closed(self):
965 return self._upload_id is None
967 def writable(self):
968 """Return True if the stream supports writing."""
969 return True
971 def seekable(self):
972 """If False, seek(), tell() and truncate() will raise IOError.
974 We offer only tell support, and no seek or truncate support."""
975 return True
977 def seek(self, offset, whence=constants.WHENCE_START):
978 """Unsupported."""
979 raise io.UnsupportedOperation
981 def truncate(self, size=None):
982 """Unsupported."""
983 raise io.UnsupportedOperation
985 def tell(self):
986 """Return the current stream position."""
987 return self._total_bytes
989 #
990 # io.BufferedIOBase methods.
991 #
992 def detach(self):
993 raise io.UnsupportedOperation("detach() not supported")
995 def write(self, b: Buffer) -> int:
996 """Write the given buffer (bytes, bytearray, memoryview or any buffer
997 interface implementation) to the S3 file.
999 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html
1001 There's buffering happening under the covers, so this may not actually
1002 do any HTTP transfer right away."""
1003 offset = 0
1004 mv = memoryview(b)
1005 self._total_bytes += len(mv)
1007 #
1008 # botocore does not accept memoryview, otherwise we could've gotten
1009 # away with not needing to write a copy to the buffer aside from cases
1010 # where b is smaller than part_size
1011 #
1012 while offset < len(mv):
1013 start = offset
1014 end = offset + self._part_size - self._buf.tell()
1015 self._buf.write(mv[start:end])
1016 if self._buf.tell() < self._part_size:
1017 #
1018 # Not enough data to write a new part just yet. The assert
1019 # ensures that we've consumed all of the input buffer.
1020 #
1021 assert end >= len(mv)
1022 return len(mv)
1024 self._upload_next_part()
1025 offset = end
1026 return len(mv)
1028 def terminate(self):
1029 """Cancel the underlying multipart upload."""
1030 if self._upload_id is None:
1031 return
1032 logger.debug('%s: terminating multipart upload', self)
1033 self._client.abort_multipart_upload(
1034 Bucket=self._bucket,
1035 Key=self._key,
1036 UploadId=self._upload_id,
1037 )
1038 self._upload_id = None
1039 logger.debug('%s: terminated multipart upload', self)
1041 def to_boto3(self, resource):
1042 """Create an **independent** `boto3.s3.Object` instance that points to
1043 the same S3 object as this instance.
1044 Changes to the returned object will not affect the current instance.
1045 """
1046 assert resource, 'resource must be a boto3.resource instance'
1047 return resource.Object(self._bucket, self._key)
1049 #
1050 # Internal methods.
1051 #
1052 def _upload_next_part(self) -> None:
1053 part_num = self._total_parts + 1
1054 logger.info(
1055 "%s: uploading part_num: %i, %i bytes (total %.3fGB)",
1056 self,
1057 part_num,
1058 self._buf.tell(),
1059 self._total_bytes / 1024.0 ** 3,
1060 )
1061 self._buf.seek(0)
1063 #
1064 # Network problems in the middle of an upload are particularly
1065 # troublesome. We don't want to abort the entire upload just because
1066 # of a temporary connection problem, so this part needs to be
1067 # especially robust.
1068 #
1069 upload = RETRY._do(
1070 functools.partial(
1071 self._client.upload_part,
1072 Bucket=self._bucket,
1073 Key=self._key,
1074 UploadId=self._upload_id,
1075 PartNumber=part_num,
1076 Body=self._buf,
1077 )
1078 )
1080 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num})
1081 logger.debug("%s: upload of part_num #%i finished", self, part_num)
1083 self._total_parts += 1
1085 self._buf.seek(0)
1086 self._buf.truncate(0)
1088 def __enter__(self):
1089 return self
1091 def __exit__(self, exc_type, exc_val, exc_tb):
1092 if exc_type is not None:
1093 self.terminate()
1094 else:
1095 self.close()
1097 def __str__(self):
1098 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key)
1100 def __repr__(self):
1101 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % (
1102 self._bucket,
1103 self._key,
1104 self._part_size,
1105 )
1108class SinglepartWriter(io.BufferedIOBase):
1109 """Writes bytes to S3 using the single part API.
1111 Implements the io.BufferedIOBase interface of the standard library.
1113 This class buffers all of its input in memory until its `close` method is called. Only then will
1114 the data be written to S3 and the buffer is released."""
1116 def __init__(
1117 self,
1118 bucket,
1119 key,
1120 client=None,
1121 client_kwargs=None,
1122 writebuffer=None,
1123 ):
1124 _initialize_boto3(self, client, client_kwargs, bucket, key)
1126 try:
1127 self._client.head_bucket(Bucket=bucket)
1128 except botocore.client.ClientError as e:
1129 raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e
1131 if writebuffer is None:
1132 self._buf = io.BytesIO()
1133 else:
1134 self._buf = writebuffer
1136 self._total_bytes = 0
1138 #
1139 # This member is part of the io.BufferedIOBase interface.
1140 #
1141 self.raw = None
1143 def flush(self):
1144 pass
1146 #
1147 # Override some methods from io.IOBase.
1148 #
1149 def close(self):
1150 if self._buf is None:
1151 return
1153 self._buf.seek(0)
1155 try:
1156 self._client.put_object(
1157 Bucket=self._bucket,
1158 Key=self._key,
1159 Body=self._buf,
1160 )
1161 except botocore.client.ClientError as e:
1162 raise ValueError(
1163 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e
1165 logger.debug("%s: direct upload finished", self)
1166 self._buf = None
1168 @property
1169 def closed(self):
1170 return self._buf is None
1172 def writable(self):
1173 """Return True if the stream supports writing."""
1174 return True
1176 def seekable(self):
1177 """If False, seek(), tell() and truncate() will raise IOError.
1179 We offer only tell support, and no seek or truncate support."""
1180 return True
1182 def seek(self, offset, whence=constants.WHENCE_START):
1183 """Unsupported."""
1184 raise io.UnsupportedOperation
1186 def truncate(self, size=None):
1187 """Unsupported."""
1188 raise io.UnsupportedOperation
1190 def tell(self):
1191 """Return the current stream position."""
1192 return self._total_bytes
1194 #
1195 # io.BufferedIOBase methods.
1196 #
1197 def detach(self):
1198 raise io.UnsupportedOperation("detach() not supported")
1200 def write(self, b):
1201 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1202 interface implementation) into the buffer. Content of the buffer will be
1203 written to S3 on close as a single-part upload.
1205 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html"""
1207 length = self._buf.write(b)
1208 self._total_bytes += length
1209 return length
1211 def terminate(self):
1212 """Nothing to cancel in single-part uploads."""
1213 return
1215 #
1216 # Internal methods.
1217 #
1218 def __enter__(self):
1219 return self
1221 def __exit__(self, exc_type, exc_val, exc_tb):
1222 if exc_type is not None:
1223 self.terminate()
1224 else:
1225 self.close()
1227 def __str__(self):
1228 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key)
1230 def __repr__(self):
1231 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key)
1234def _accept_all(key):
1235 return True
1238def iter_bucket(
1239 bucket_name,
1240 prefix='',
1241 accept_key=None,
1242 key_limit=None,
1243 workers=16,
1244 retries=3,
1245 **session_kwargs):
1246 """
1247 Iterate and download all S3 objects under `s3://bucket_name/prefix`.
1249 Parameters
1250 ----------
1251 bucket_name: str
1252 The name of the bucket.
1253 prefix: str, optional
1254 Limits the iteration to keys starting with the prefix.
1255 accept_key: callable, optional
1256 This is a function that accepts a key name (unicode string) and
1257 returns True/False, signalling whether the given key should be downloaded.
1258 The default behavior is to accept all keys.
1259 key_limit: int, optional
1260 If specified, the iterator will stop after yielding this many results.
1261 workers: int, optional
1262 The number of subprocesses to use.
1263 retries: int, optional
1264 The number of time to retry a failed download.
1265 session_kwargs: dict, optional
1266 Keyword arguments to pass when creating a new session.
1267 For a list of available names and values, see:
1268 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session
1271 Yields
1272 ------
1273 str
1274 The full key name (does not include the bucket name).
1275 bytes
1276 The full contents of the key.
1278 Notes
1279 -----
1280 The keys are processed in parallel, using `workers` processes (default: 16),
1281 to speed up downloads greatly. If multiprocessing is not available, thus
1282 _MULTIPROCESSING is False, this parameter will be ignored.
1284 Examples
1285 --------
1287 >>> # get all JSON files under "mybucket/foo/"
1288 >>> for key, content in iter_bucket(
1289 ... bucket_name, prefix='foo/',
1290 ... accept_key=lambda key: key.endswith('.json')):
1291 ... print key, len(content)
1293 >>> # limit to 10k files, using 32 parallel workers (default is 16)
1294 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32):
1295 ... print key, len(content)
1296 """
1297 if accept_key is None:
1298 accept_key = _accept_all
1300 #
1301 # If people insist on giving us bucket instances, silently extract the name
1302 # before moving on. Works for boto3 as well as boto.
1303 #
1304 try:
1305 bucket_name = bucket_name.name
1306 except AttributeError:
1307 pass
1309 total_size, key_no = 0, -1
1310 key_iterator = _list_bucket(
1311 bucket_name,
1312 prefix=prefix,
1313 accept_key=accept_key,
1314 **session_kwargs)
1315 download_key = functools.partial(
1316 _download_key,
1317 bucket_name=bucket_name,
1318 retries=retries,
1319 **session_kwargs)
1321 with smart_open.concurrency.create_pool(processes=workers) as pool:
1322 result_iterator = pool.imap_unordered(download_key, key_iterator)
1323 key_no = 0
1324 while True:
1325 try:
1326 (key, content) = result_iterator.__next__()
1327 if key_no % 1000 == 0:
1328 logger.info(
1329 "yielding key #%i: %s, size %i (total %.1fMB)",
1330 key_no, key, len(content), total_size / 1024.0 ** 2
1331 )
1332 yield key, content
1333 total_size += len(content)
1334 if key_limit is not None and key_no + 1 >= key_limit:
1335 # we were asked to output only a limited number of keys => we're done
1336 break
1337 except botocore.exceptions.ClientError as err:
1338 #
1339 # ignore 404 not found errors: they mean the object was deleted
1340 # after we listed the contents of the bucket, but before we
1341 # downloaded the object.
1342 #
1343 if not ('Error' in err.response and err.response['Error'].get('Code') == '404'):
1344 raise err
1345 except StopIteration:
1346 break
1347 key_no += 1
1348 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))
1351def _list_bucket(
1352 bucket_name,
1353 prefix='',
1354 accept_key=lambda k: True,
1355 **session_kwargs):
1356 session = boto3.session.Session(**session_kwargs)
1357 client = session.client('s3')
1358 ctoken = None
1360 while True:
1361 # list_objects_v2 doesn't like a None value for ContinuationToken
1362 # so we don't set it if we don't have one.
1363 if ctoken:
1364 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken)
1365 else:
1366 kwargs = dict(Bucket=bucket_name, Prefix=prefix)
1367 response = client.list_objects_v2(**kwargs)
1368 try:
1369 content = response['Contents']
1370 except KeyError:
1371 pass
1372 else:
1373 for c in content:
1374 key = c['Key']
1375 if accept_key(key):
1376 yield key
1377 ctoken = response.get('NextContinuationToken', None)
1378 if not ctoken:
1379 break
1382def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs):
1383 if bucket_name is None:
1384 raise ValueError('bucket_name may not be None')
1386 #
1387 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-or-multiprocessing-with-resources
1388 #
1389 session = boto3.session.Session(**session_kwargs)
1390 s3 = session.resource('s3')
1391 bucket = s3.Bucket(bucket_name)
1393 # Sometimes, https://github.com/boto/boto/issues/2409 can happen
1394 # because of network issues on either side.
1395 # Retry up to 3 times to ensure its not a transient issue.
1396 for x in range(retries + 1):
1397 try:
1398 content_bytes = _download_fileobj(bucket, key_name)
1399 except botocore.client.ClientError:
1400 # Actually fail on last pass through the loop
1401 if x == retries:
1402 raise
1403 # Otherwise, try again, as this might be a transient timeout
1404 pass
1405 else:
1406 return key_name, content_bytes
1409def _download_fileobj(bucket, key_name):
1410 #
1411 # This is a separate function only because it makes it easier to inject
1412 # exceptions during tests.
1413 #
1414 buf = io.BytesIO()
1415 bucket.download_fileobj(key_name, buf)
1416 return buf.getvalue()