Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/smart_open/s3.py: 20%
508 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:57 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:57 +0000
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8"""Implements file-like objects for reading and writing from/to AWS S3."""
10import io
11import functools
12import logging
13import time
14import warnings
16try:
17 import boto3
18 import botocore.client
19 import botocore.exceptions
20 import urllib3.exceptions
21except ImportError:
22 MISSING_DEPS = True
24import smart_open.bytebuffer
25import smart_open.concurrency
26import smart_open.utils
28from smart_open import constants
30logger = logging.getLogger(__name__)
32DEFAULT_MIN_PART_SIZE = 50 * 1024**2
33"""Default minimum part size for S3 multipart uploads"""
34MIN_MIN_PART_SIZE = 5 * 1024 ** 2
35"""The absolute minimum permitted by Amazon."""
37SCHEMES = ("s3", "s3n", 's3u', "s3a")
38DEFAULT_PORT = 443
39DEFAULT_HOST = 's3.amazonaws.com'
41DEFAULT_BUFFER_SIZE = 128 * 1024
43URI_EXAMPLES = (
44 's3://my_bucket/my_key',
45 's3://my_key:my_secret@my_bucket/my_key',
46 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key',
47)
49_UPLOAD_ATTEMPTS = 6
50_SLEEP_SECONDS = 10
52# Returned by AWS when we try to seek beyond EOF.
53_OUT_OF_RANGE = 'InvalidRange'
56class _ClientWrapper:
57 """Wraps a client to inject the appropriate keyword args into each method call.
59 The keyword args are a dictionary keyed by the fully qualified method name.
60 For example, S3.Client.create_multipart_upload.
62 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client
64 This wrapper behaves identically to the client otherwise.
65 """
66 def __init__(self, client, kwargs):
67 self.client = client
68 self.kwargs = kwargs
70 def __getattr__(self, method_name):
71 method = getattr(self.client, method_name)
72 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {})
73 return functools.partial(method, **kwargs)
76def parse_uri(uri_as_string):
77 #
78 # Restrictions on bucket names and labels:
79 #
80 # - Bucket names must be at least 3 and no more than 63 characters long.
81 # - Bucket names must be a series of one or more labels.
82 # - Adjacent labels are separated by a single period (.).
83 # - Bucket names can contain lowercase letters, numbers, and hyphens.
84 # - Each label must start and end with a lowercase letter or a number.
85 #
86 # We use the above as a guide only, and do not perform any validation. We
87 # let boto3 take care of that for us.
88 #
89 split_uri = smart_open.utils.safe_urlsplit(uri_as_string)
90 assert split_uri.scheme in SCHEMES
92 port = DEFAULT_PORT
93 host = DEFAULT_HOST
94 ordinary_calling_format = False
95 #
96 # These defaults tell boto3 to look for credentials elsewhere
97 #
98 access_id, access_secret = None, None
100 #
101 # Common URI template [secret:key@][host[:port]@]bucket/object
102 #
103 # The urlparse function doesn't handle the above schema, so we have to do
104 # it ourselves.
105 #
106 uri = split_uri.netloc + split_uri.path
108 if '@' in uri and ':' in uri.split('@')[0]:
109 auth, uri = uri.split('@', 1)
110 access_id, access_secret = auth.split(':')
112 head, key_id = uri.split('/', 1)
113 if '@' in head and ':' in head:
114 ordinary_calling_format = True
115 host_port, bucket_id = head.split('@')
116 host, port = host_port.split(':', 1)
117 port = int(port)
118 elif '@' in head:
119 ordinary_calling_format = True
120 host, bucket_id = head.split('@')
121 else:
122 bucket_id = head
124 return dict(
125 scheme=split_uri.scheme,
126 bucket_id=bucket_id,
127 key_id=key_id,
128 port=port,
129 host=host,
130 ordinary_calling_format=ordinary_calling_format,
131 access_id=access_id,
132 access_secret=access_secret,
133 )
136def _consolidate_params(uri, transport_params):
137 """Consolidates the parsed Uri with the additional parameters.
139 This is necessary because the user can pass some of the parameters can in
140 two different ways:
142 1) Via the URI itself
143 2) Via the transport parameters
145 These are not mutually exclusive, but we have to pick one over the other
146 in a sensible way in order to proceed.
148 """
149 transport_params = dict(transport_params)
151 def inject(**kwargs):
152 try:
153 client_kwargs = transport_params['client_kwargs']
154 except KeyError:
155 client_kwargs = transport_params['client_kwargs'] = {}
157 try:
158 init_kwargs = client_kwargs['S3.Client']
159 except KeyError:
160 init_kwargs = client_kwargs['S3.Client'] = {}
162 init_kwargs.update(**kwargs)
164 client = transport_params.get('client')
165 if client is not None and (uri['access_id'] or uri['access_secret']):
166 logger.warning(
167 'ignoring credentials parsed from URL because they conflict with '
168 'transport_params["client"]. Set transport_params["client"] to None '
169 'to suppress this warning.'
170 )
171 uri.update(access_id=None, access_secret=None)
172 elif (uri['access_id'] and uri['access_secret']):
173 inject(
174 aws_access_key_id=uri['access_id'],
175 aws_secret_access_key=uri['access_secret'],
176 )
177 uri.update(access_id=None, access_secret=None)
179 if client is not None and uri['host'] != DEFAULT_HOST:
180 logger.warning(
181 'ignoring endpoint_url parsed from URL because they conflict with '
182 'transport_params["client"]. Set transport_params["client"] to None '
183 'to suppress this warning.'
184 )
185 uri.update(host=None)
186 elif uri['host'] != DEFAULT_HOST:
187 inject(endpoint_url='https://%(host)s:%(port)d' % uri)
188 uri.update(host=None)
190 return uri, transport_params
193def open_uri(uri, mode, transport_params):
194 deprecated = (
195 'multipart_upload_kwargs',
196 'object_kwargs',
197 'resource',
198 'resource_kwargs',
199 'session',
200 'singlepart_upload_kwargs',
201 )
202 detected = [k for k in deprecated if k in transport_params]
203 if detected:
204 doc_url = (
205 'https://github.com/RaRe-Technologies/smart_open/blob/develop/'
206 'MIGRATING_FROM_OLDER_VERSIONS.rst'
207 )
208 #
209 # We use warnings.warn /w UserWarning instead of logger.warn here because
210 #
211 # 1) Not everyone has logging enabled; and
212 # 2) check_kwargs (below) already uses logger.warn with a similar message
213 #
214 # https://github.com/RaRe-Technologies/smart_open/issues/614
215 #
216 message = (
217 'ignoring the following deprecated transport parameters: %r. '
218 'See <%s> for details' % (detected, doc_url)
219 )
220 warnings.warn(message, UserWarning)
221 parsed_uri = parse_uri(uri)
222 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params)
223 kwargs = smart_open.utils.check_kwargs(open, transport_params)
224 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs)
227def open(
228 bucket_id,
229 key_id,
230 mode,
231 version_id=None,
232 buffer_size=DEFAULT_BUFFER_SIZE,
233 min_part_size=DEFAULT_MIN_PART_SIZE,
234 multipart_upload=True,
235 defer_seek=False,
236 client=None,
237 client_kwargs=None,
238 writebuffer=None,
239):
240 """Open an S3 object for reading or writing.
242 Parameters
243 ----------
244 bucket_id: str
245 The name of the bucket this object resides in.
246 key_id: str
247 The name of the key within the bucket.
248 mode: str
249 The mode for opening the object. Must be either "rb" or "wb".
250 buffer_size: int, optional
251 The buffer size to use when performing I/O.
252 min_part_size: int, optional
253 The minimum part size for multipart uploads. For writing only.
254 multipart_upload: bool, optional
255 Default: `True`
256 If set to `True`, will use multipart upload for writing to S3. If set
257 to `False`, S3 upload will use the S3 Single-Part Upload API, which
258 is more ideal for small file sizes.
259 For writing only.
260 version_id: str, optional
261 Version of the object, used when reading object.
262 If None, will fetch the most recent version.
263 defer_seek: boolean, optional
264 Default: `False`
265 If set to `True` on a file opened for reading, GetObject will not be
266 called until the first seek() or read().
267 Avoids redundant API queries when seeking before reading.
268 client: object, optional
269 The S3 client to use when working with boto3.
270 If you don't specify this, then smart_open will create a new client for you.
271 client_kwargs: dict, optional
272 Additional parameters to pass to the relevant functions of the client.
273 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`.
274 The values are kwargs to pass to that method each time it is called.
275 writebuffer: IO[bytes], optional
276 By default, this module will buffer data in memory using io.BytesIO
277 when writing. Pass another binary IO instance here to use it instead.
278 For example, you may pass a file object to buffer to local disk instead
279 of in RAM. Use this to keep RAM usage low at the expense of additional
280 disk IO. If you pass in an open file, then you are responsible for
281 cleaning it up after writing completes.
282 """
283 logger.debug('%r', locals())
284 if mode not in constants.BINARY_MODES:
285 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
287 if (mode == constants.WRITE_BINARY) and (version_id is not None):
288 raise ValueError("version_id must be None when writing")
290 if mode == constants.READ_BINARY:
291 fileobj = Reader(
292 bucket_id,
293 key_id,
294 version_id=version_id,
295 buffer_size=buffer_size,
296 defer_seek=defer_seek,
297 client=client,
298 client_kwargs=client_kwargs,
299 )
300 elif mode == constants.WRITE_BINARY:
301 if multipart_upload:
302 fileobj = MultipartWriter(
303 bucket_id,
304 key_id,
305 min_part_size=min_part_size,
306 client=client,
307 client_kwargs=client_kwargs,
308 writebuffer=writebuffer,
309 )
310 else:
311 fileobj = SinglepartWriter(
312 bucket_id,
313 key_id,
314 client=client,
315 client_kwargs=client_kwargs,
316 writebuffer=writebuffer,
317 )
318 else:
319 assert False, 'unexpected mode: %r' % mode
321 fileobj.name = key_id
322 return fileobj
325def _get(client, bucket, key, version, range_string):
326 try:
327 if version:
328 return client.get_object(Bucket=bucket, Key=key, VersionId=version, Range=range_string)
329 else:
330 return client.get_object(Bucket=bucket, Key=key, Range=range_string)
331 except botocore.client.ClientError as error:
332 wrapped_error = IOError(
333 'unable to access bucket: %r key: %r version: %r error: %s' % (
334 bucket, key, version, error
335 )
336 )
337 wrapped_error.backend_error = error
338 raise wrapped_error from error
341def _unwrap_ioerror(ioe):
342 """Given an IOError from _get, return the 'Error' dictionary from boto."""
343 try:
344 return ioe.backend_error.response['Error']
345 except (AttributeError, KeyError):
346 return None
349class _SeekableRawReader(object):
350 """Read an S3 object.
352 This class is internal to the S3 submodule.
353 """
355 def __init__(
356 self,
357 client,
358 bucket,
359 key,
360 version_id=None,
361 ):
362 self._client = client
363 self._bucket = bucket
364 self._key = key
365 self._version_id = version_id
367 self._content_length = None
368 self._position = 0
369 self._body = None
371 def seek(self, offset, whence=constants.WHENCE_START):
372 """Seek to the specified position.
374 :param int offset: The offset in bytes.
375 :param int whence: Where the offset is from.
377 :returns: the position after seeking.
378 :rtype: int
379 """
380 if whence not in constants.WHENCE_CHOICES:
381 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES)
383 #
384 # Close old body explicitly.
385 # When first seek() after __init__(), self._body is not exist.
386 #
387 if self._body is not None:
388 self._body.close()
389 self._body = None
391 start = None
392 stop = None
393 if whence == constants.WHENCE_START:
394 start = max(0, offset)
395 elif whence == constants.WHENCE_CURRENT:
396 start = max(0, offset + self._position)
397 else:
398 stop = max(0, -offset)
400 #
401 # If we can figure out that we've read past the EOF, then we can save
402 # an extra API call.
403 #
404 if self._content_length is None:
405 reached_eof = False
406 elif start is not None and start >= self._content_length:
407 reached_eof = True
408 elif stop == 0:
409 reached_eof = True
410 else:
411 reached_eof = False
413 if reached_eof:
414 self._body = io.BytesIO()
415 self._position = self._content_length
416 else:
417 self._open_body(start, stop)
419 return self._position
421 def _open_body(self, start=None, stop=None):
422 """Open a connection to download the specified range of bytes. Store
423 the open file handle in self._body.
425 If no range is specified, start defaults to self._position.
426 start and stop follow the semantics of the http range header,
427 so a stop without a start will read bytes beginning at stop.
429 As a side effect, set self._content_length. Set self._position
430 to self._content_length if start is past end of file.
431 """
432 if start is None and stop is None:
433 start = self._position
434 range_string = smart_open.utils.make_range_string(start, stop)
436 try:
437 # Optimistically try to fetch the requested content range.
438 response = _get(
439 self._client,
440 self._bucket,
441 self._key,
442 self._version_id,
443 range_string,
444 )
445 except IOError as ioe:
446 # Handle requested content range exceeding content size.
447 error_response = _unwrap_ioerror(ioe)
448 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE:
449 raise
450 self._position = self._content_length = int(error_response['ActualObjectSize'])
451 self._body = io.BytesIO()
452 else:
453 #
454 # Keep track of how many times boto3's built-in retry mechanism
455 # activated.
456 #
457 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response
458 #
459 logger.debug(
460 '%s: RetryAttempts: %d',
461 self,
462 response['ResponseMetadata']['RetryAttempts'],
463 )
464 units, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange'])
465 self._content_length = length
466 self._position = start
467 self._body = response['Body']
469 def read(self, size=-1):
470 """Read from the continuous connection with the remote peer."""
471 if self._body is None:
472 # This is necessary for the very first read() after __init__().
473 self._open_body()
474 if self._position >= self._content_length:
475 return b''
477 #
478 # Boto3 has built-in error handling and retry mechanisms:
479 #
480 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
481 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
482 #
483 # Unfortunately, it isn't always enough. There is still a non-zero
484 # possibility that an exception will slip past these mechanisms and
485 # terminate the read prematurely. Luckily, at this stage, it's very
486 # simple to recover from the problem: wait a little bit, reopen the
487 # HTTP connection and try again. Usually, a single retry attempt is
488 # enough to recover, but we try multiple times "just in case".
489 #
490 for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1):
491 try:
492 if size == -1:
493 binary = self._body.read()
494 else:
495 binary = self._body.read(size)
496 except (
497 ConnectionResetError,
498 botocore.exceptions.BotoCoreError,
499 urllib3.exceptions.HTTPError,
500 ) as err:
501 logger.warning(
502 '%s: caught %r while reading %d bytes, sleeping %ds before retry',
503 self,
504 err,
505 size,
506 seconds,
507 )
508 time.sleep(seconds)
509 self._open_body()
510 else:
511 self._position += len(binary)
512 return binary
514 raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt))
516 def __str__(self):
517 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key)
520def _initialize_boto3(rw, client, client_kwargs, bucket, key):
521 """Created the required objects for accessing S3. Ideally, they have
522 been already created for us and we can just reuse them."""
523 if client_kwargs is None:
524 client_kwargs = {}
526 if client is None:
527 init_kwargs = client_kwargs.get('S3.Client', {})
528 client = boto3.client('s3', **init_kwargs)
529 assert client
531 rw._client = _ClientWrapper(client, client_kwargs)
532 rw._bucket = bucket
533 rw._key = key
536class Reader(io.BufferedIOBase):
537 """Reads bytes from S3.
539 Implements the io.BufferedIOBase interface of the standard library."""
541 def __init__(
542 self,
543 bucket,
544 key,
545 version_id=None,
546 buffer_size=DEFAULT_BUFFER_SIZE,
547 line_terminator=constants.BINARY_NEWLINE,
548 defer_seek=False,
549 client=None,
550 client_kwargs=None,
551 ):
552 self._version_id = version_id
553 self._buffer_size = buffer_size
555 _initialize_boto3(self, client, client_kwargs, bucket, key)
557 self._raw_reader = _SeekableRawReader(
558 self._client,
559 bucket,
560 key,
561 self._version_id,
562 )
563 self._current_pos = 0
564 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
565 self._eof = False
566 self._line_terminator = line_terminator
568 #
569 # This member is part of the io.BufferedIOBase interface.
570 #
571 self.raw = None
573 if not defer_seek:
574 self.seek(0)
576 #
577 # io.BufferedIOBase methods.
578 #
580 def close(self):
581 """Flush and close this stream."""
582 pass
584 def readable(self):
585 """Return True if the stream can be read from."""
586 return True
588 def read(self, size=-1):
589 """Read up to size bytes from the object and return them."""
590 if size == 0:
591 return b''
592 elif size < 0:
593 # call read() before setting _current_pos to make sure _content_length is set
594 out = self._read_from_buffer() + self._raw_reader.read()
595 self._current_pos = self._raw_reader._content_length
596 return out
598 #
599 # Return unused data first
600 #
601 if len(self._buffer) >= size:
602 return self._read_from_buffer(size)
604 #
605 # If the stream is finished, return what we have.
606 #
607 if self._eof:
608 return self._read_from_buffer()
610 self._fill_buffer(size)
611 return self._read_from_buffer(size)
613 def read1(self, size=-1):
614 """This is the same as read()."""
615 return self.read(size=size)
617 def readinto(self, b):
618 """Read up to len(b) bytes into b, and return the number of bytes
619 read."""
620 data = self.read(len(b))
621 if not data:
622 return 0
623 b[:len(data)] = data
624 return len(data)
626 def readline(self, limit=-1):
627 """Read up to and including the next newline. Returns the bytes read."""
628 if limit != -1:
629 raise NotImplementedError('limits other than -1 not implemented yet')
631 #
632 # A single line may span multiple buffers.
633 #
634 line = io.BytesIO()
635 while not (self._eof and len(self._buffer) == 0):
636 line_part = self._buffer.readline(self._line_terminator)
637 line.write(line_part)
638 self._current_pos += len(line_part)
640 if line_part.endswith(self._line_terminator):
641 break
642 else:
643 self._fill_buffer()
645 return line.getvalue()
647 def seekable(self):
648 """If False, seek(), tell() and truncate() will raise IOError.
650 We offer only seek support, and no truncate support."""
651 return True
653 def seek(self, offset, whence=constants.WHENCE_START):
654 """Seek to the specified position.
656 :param int offset: The offset in bytes.
657 :param int whence: Where the offset is from.
659 Returns the position after seeking."""
660 # Convert relative offset to absolute, since self._raw_reader
661 # doesn't know our current position.
662 if whence == constants.WHENCE_CURRENT:
663 whence = constants.WHENCE_START
664 offset += self._current_pos
666 self._current_pos = self._raw_reader.seek(offset, whence)
668 self._buffer.empty()
669 self._eof = self._current_pos == self._raw_reader._content_length
670 return self._current_pos
672 def tell(self):
673 """Return the current position within the file."""
674 return self._current_pos
676 def truncate(self, size=None):
677 """Unsupported."""
678 raise io.UnsupportedOperation
680 def detach(self):
681 """Unsupported."""
682 raise io.UnsupportedOperation
684 def terminate(self):
685 """Do nothing."""
686 pass
688 def to_boto3(self, resource):
689 """Create an **independent** `boto3.s3.Object` instance that points to
690 the same S3 object as this instance.
691 Changes to the returned object will not affect the current instance.
692 """
693 assert resource, 'resource must be a boto3.resource instance'
694 obj = resource.Object(self._bucket, self._key)
695 if self._version_id is not None:
696 return obj.Version(self._version_id)
697 else:
698 return obj
700 #
701 # Internal methods.
702 #
703 def _read_from_buffer(self, size=-1):
704 """Remove at most size bytes from our buffer and return them."""
705 size = size if size >= 0 else len(self._buffer)
706 part = self._buffer.read(size)
707 self._current_pos += len(part)
708 return part
710 def _fill_buffer(self, size=-1):
711 size = max(size, self._buffer._chunk_size)
712 while len(self._buffer) < size and not self._eof:
713 bytes_read = self._buffer.fill(self._raw_reader)
714 if bytes_read == 0:
715 logger.debug('%s: reached EOF while filling buffer', self)
716 self._eof = True
718 def __str__(self):
719 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key)
721 def __repr__(self):
722 return (
723 "smart_open.s3.Reader("
724 "bucket=%r, "
725 "key=%r, "
726 "version_id=%r, "
727 "buffer_size=%r, "
728 "line_terminator=%r)"
729 ) % (
730 self._bucket,
731 self._key,
732 self._version_id,
733 self._buffer_size,
734 self._line_terminator,
735 )
738class MultipartWriter(io.BufferedIOBase):
739 """Writes bytes to S3 using the multi part API.
741 Implements the io.BufferedIOBase interface of the standard library."""
743 def __init__(
744 self,
745 bucket,
746 key,
747 min_part_size=DEFAULT_MIN_PART_SIZE,
748 client=None,
749 client_kwargs=None,
750 writebuffer=None,
751 ):
752 if min_part_size < MIN_MIN_PART_SIZE:
753 logger.warning("S3 requires minimum part size >= 5MB; \
754multipart upload may fail")
755 self._min_part_size = min_part_size
757 _initialize_boto3(self, client, client_kwargs, bucket, key)
759 try:
760 partial = functools.partial(
761 self._client.create_multipart_upload,
762 Bucket=bucket,
763 Key=key,
764 )
765 self._upload_id = _retry_if_failed(partial)['UploadId']
766 except botocore.client.ClientError as error:
767 raise ValueError(
768 'the bucket %r does not exist, or is forbidden for access (%r)' % (
769 bucket, error
770 )
771 ) from error
773 if writebuffer is None:
774 self._buf = io.BytesIO()
775 else:
776 self._buf = writebuffer
778 self._total_bytes = 0
779 self._total_parts = 0
780 self._parts = []
782 #
783 # This member is part of the io.BufferedIOBase interface.
784 #
785 self.raw = None
787 def flush(self):
788 pass
790 #
791 # Override some methods from io.IOBase.
792 #
793 def close(self):
794 if self._buf.tell():
795 self._upload_next_part()
797 if self._total_bytes and self._upload_id:
798 partial = functools.partial(
799 self._client.complete_multipart_upload,
800 Bucket=self._bucket,
801 Key=self._key,
802 UploadId=self._upload_id,
803 MultipartUpload={'Parts': self._parts},
804 )
805 _retry_if_failed(partial)
806 logger.debug('%s: completed multipart upload', self)
807 elif self._upload_id:
808 #
809 # AWS complains with "The XML you provided was not well-formed or
810 # did not validate against our published schema" when the input is
811 # completely empty => abort the upload, no file created.
812 #
813 # We work around this by creating an empty file explicitly.
814 #
815 assert self._upload_id, "no multipart upload in progress"
816 self._client.abort_multipart_upload(
817 Bucket=self._bucket,
818 Key=self._key,
819 UploadId=self._upload_id,
820 )
821 self._client.put_object(
822 Bucket=self._bucket,
823 Key=self._key,
824 Body=b'',
825 )
826 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self)
827 self._upload_id = None
829 @property
830 def closed(self):
831 return self._upload_id is None
833 def writable(self):
834 """Return True if the stream supports writing."""
835 return True
837 def seekable(self):
838 """If False, seek(), tell() and truncate() will raise IOError.
840 We offer only tell support, and no seek or truncate support."""
841 return True
843 def seek(self, offset, whence=constants.WHENCE_START):
844 """Unsupported."""
845 raise io.UnsupportedOperation
847 def truncate(self, size=None):
848 """Unsupported."""
849 raise io.UnsupportedOperation
851 def tell(self):
852 """Return the current stream position."""
853 return self._total_bytes
855 #
856 # io.BufferedIOBase methods.
857 #
858 def detach(self):
859 raise io.UnsupportedOperation("detach() not supported")
861 def write(self, b):
862 """Write the given buffer (bytes, bytearray, memoryview or any buffer
863 interface implementation) to the S3 file.
865 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html
867 There's buffering happening under the covers, so this may not actually
868 do any HTTP transfer right away."""
870 length = self._buf.write(b)
871 self._total_bytes += length
873 if self._buf.tell() >= self._min_part_size:
874 self._upload_next_part()
876 return length
878 def terminate(self):
879 """Cancel the underlying multipart upload."""
880 assert self._upload_id, "no multipart upload in progress"
881 self._client.abort_multipart_upload(
882 Bucket=self._bucket,
883 Key=self._key,
884 UploadId=self._upload_id,
885 )
886 self._upload_id = None
888 def to_boto3(self, resource):
889 """Create an **independent** `boto3.s3.Object` instance that points to
890 the same S3 object as this instance.
891 Changes to the returned object will not affect the current instance.
892 """
893 assert resource, 'resource must be a boto3.resource instance'
894 return resource.Object(self._bucket, self._key)
896 #
897 # Internal methods.
898 #
899 def _upload_next_part(self):
900 part_num = self._total_parts + 1
901 logger.info(
902 "%s: uploading part_num: %i, %i bytes (total %.3fGB)",
903 self,
904 part_num,
905 self._buf.tell(),
906 self._total_bytes / 1024.0 ** 3,
907 )
908 self._buf.seek(0)
910 #
911 # Network problems in the middle of an upload are particularly
912 # troublesome. We don't want to abort the entire upload just because
913 # of a temporary connection problem, so this part needs to be
914 # especially robust.
915 #
916 upload = _retry_if_failed(
917 functools.partial(
918 self._client.upload_part,
919 Bucket=self._bucket,
920 Key=self._key,
921 UploadId=self._upload_id,
922 PartNumber=part_num,
923 Body=self._buf,
924 )
925 )
927 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num})
928 logger.debug("%s: upload of part_num #%i finished", self, part_num)
930 self._total_parts += 1
932 self._buf.seek(0)
933 self._buf.truncate(0)
935 def __enter__(self):
936 return self
938 def __exit__(self, exc_type, exc_val, exc_tb):
939 if exc_type is not None:
940 self.terminate()
941 else:
942 self.close()
944 def __str__(self):
945 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key)
947 def __repr__(self):
948 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, min_part_size=%r)" % (
949 self._bucket,
950 self._key,
951 self._min_part_size,
952 )
955class SinglepartWriter(io.BufferedIOBase):
956 """Writes bytes to S3 using the single part API.
958 Implements the io.BufferedIOBase interface of the standard library.
960 This class buffers all of its input in memory until its `close` method is called. Only then will
961 the data be written to S3 and the buffer is released."""
963 def __init__(
964 self,
965 bucket,
966 key,
967 client=None,
968 client_kwargs=None,
969 writebuffer=None,
970 ):
971 _initialize_boto3(self, client, client_kwargs, bucket, key)
973 try:
974 self._client.head_bucket(Bucket=bucket)
975 except botocore.client.ClientError as e:
976 raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e
978 if writebuffer is None:
979 self._buf = io.BytesIO()
980 else:
981 self._buf = writebuffer
983 self._total_bytes = 0
985 #
986 # This member is part of the io.BufferedIOBase interface.
987 #
988 self.raw = None
990 def flush(self):
991 pass
993 #
994 # Override some methods from io.IOBase.
995 #
996 def close(self):
997 if self._buf is None:
998 return
1000 self._buf.seek(0)
1002 try:
1003 self._client.put_object(
1004 Bucket=self._bucket,
1005 Key=self._key,
1006 Body=self._buf,
1007 )
1008 except botocore.client.ClientError as e:
1009 raise ValueError(
1010 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e
1012 logger.debug("%s: direct upload finished", self)
1013 self._buf = None
1015 @property
1016 def closed(self):
1017 return self._buf is None
1019 def writable(self):
1020 """Return True if the stream supports writing."""
1021 return True
1023 def seekable(self):
1024 """If False, seek(), tell() and truncate() will raise IOError.
1026 We offer only tell support, and no seek or truncate support."""
1027 return True
1029 def seek(self, offset, whence=constants.WHENCE_START):
1030 """Unsupported."""
1031 raise io.UnsupportedOperation
1033 def truncate(self, size=None):
1034 """Unsupported."""
1035 raise io.UnsupportedOperation
1037 def tell(self):
1038 """Return the current stream position."""
1039 return self._total_bytes
1041 #
1042 # io.BufferedIOBase methods.
1043 #
1044 def detach(self):
1045 raise io.UnsupportedOperation("detach() not supported")
1047 def write(self, b):
1048 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1049 interface implementation) into the buffer. Content of the buffer will be
1050 written to S3 on close as a single-part upload.
1052 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html"""
1054 length = self._buf.write(b)
1055 self._total_bytes += length
1056 return length
1058 def terminate(self):
1059 """Nothing to cancel in single-part uploads."""
1060 return
1062 #
1063 # Internal methods.
1064 #
1065 def __enter__(self):
1066 return self
1068 def __exit__(self, exc_type, exc_val, exc_tb):
1069 if exc_type is not None:
1070 self.terminate()
1071 else:
1072 self.close()
1074 def __str__(self):
1075 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._object.bucket_name, self._object.key)
1077 def __repr__(self):
1078 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key)
1081def _retry_if_failed(
1082 partial,
1083 attempts=_UPLOAD_ATTEMPTS,
1084 sleep_seconds=_SLEEP_SECONDS,
1085 exceptions=None):
1086 if exceptions is None:
1087 exceptions = (botocore.exceptions.EndpointConnectionError, )
1088 for attempt in range(attempts):
1089 try:
1090 return partial()
1091 except exceptions:
1092 logger.critical(
1093 'Unable to connect to the endpoint. Check your network connection. '
1094 'Sleeping and retrying %d more times '
1095 'before giving up.' % (attempts - attempt - 1)
1096 )
1097 time.sleep(sleep_seconds)
1098 else:
1099 logger.critical('Unable to connect to the endpoint. Giving up.')
1100 raise IOError('Unable to connect to the endpoint after %d attempts' % attempts)
1103def _accept_all(key):
1104 return True
1107def iter_bucket(
1108 bucket_name,
1109 prefix='',
1110 accept_key=None,
1111 key_limit=None,
1112 workers=16,
1113 retries=3,
1114 **session_kwargs):
1115 """
1116 Iterate and download all S3 objects under `s3://bucket_name/prefix`.
1118 Parameters
1119 ----------
1120 bucket_name: str
1121 The name of the bucket.
1122 prefix: str, optional
1123 Limits the iteration to keys starting with the prefix.
1124 accept_key: callable, optional
1125 This is a function that accepts a key name (unicode string) and
1126 returns True/False, signalling whether the given key should be downloaded.
1127 The default behavior is to accept all keys.
1128 key_limit: int, optional
1129 If specified, the iterator will stop after yielding this many results.
1130 workers: int, optional
1131 The number of subprocesses to use.
1132 retries: int, optional
1133 The number of time to retry a failed download.
1134 session_kwargs: dict, optional
1135 Keyword arguments to pass when creating a new session.
1136 For a list of available names and values, see:
1137 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session
1140 Yields
1141 ------
1142 str
1143 The full key name (does not include the bucket name).
1144 bytes
1145 The full contents of the key.
1147 Notes
1148 -----
1149 The keys are processed in parallel, using `workers` processes (default: 16),
1150 to speed up downloads greatly. If multiprocessing is not available, thus
1151 _MULTIPROCESSING is False, this parameter will be ignored.
1153 Examples
1154 --------
1156 >>> # get all JSON files under "mybucket/foo/"
1157 >>> for key, content in iter_bucket(
1158 ... bucket_name, prefix='foo/',
1159 ... accept_key=lambda key: key.endswith('.json')):
1160 ... print key, len(content)
1162 >>> # limit to 10k files, using 32 parallel workers (default is 16)
1163 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32):
1164 ... print key, len(content)
1165 """
1166 if accept_key is None:
1167 accept_key = _accept_all
1169 #
1170 # If people insist on giving us bucket instances, silently extract the name
1171 # before moving on. Works for boto3 as well as boto.
1172 #
1173 try:
1174 bucket_name = bucket_name.name
1175 except AttributeError:
1176 pass
1178 total_size, key_no = 0, -1
1179 key_iterator = _list_bucket(
1180 bucket_name,
1181 prefix=prefix,
1182 accept_key=accept_key,
1183 **session_kwargs)
1184 download_key = functools.partial(
1185 _download_key,
1186 bucket_name=bucket_name,
1187 retries=retries,
1188 **session_kwargs)
1190 with smart_open.concurrency.create_pool(processes=workers) as pool:
1191 result_iterator = pool.imap_unordered(download_key, key_iterator)
1192 key_no = 0
1193 while True:
1194 try:
1195 (key, content) = result_iterator.__next__()
1196 if key_no % 1000 == 0:
1197 logger.info(
1198 "yielding key #%i: %s, size %i (total %.1fMB)",
1199 key_no, key, len(content), total_size / 1024.0 ** 2
1200 )
1201 yield key, content
1202 total_size += len(content)
1203 if key_limit is not None and key_no + 1 >= key_limit:
1204 # we were asked to output only a limited number of keys => we're done
1205 break
1206 except botocore.exceptions.ClientError as err:
1207 #
1208 # ignore 404 not found errors: they mean the object was deleted
1209 # after we listed the contents of the bucket, but before we
1210 # downloaded the object.
1211 #
1212 if not ('Error' in err.response and err.response['Error'].get('Code') == '404'):
1213 raise err
1214 except StopIteration:
1215 break
1216 key_no += 1
1217 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))
1220def _list_bucket(
1221 bucket_name,
1222 prefix='',
1223 accept_key=lambda k: True,
1224 **session_kwargs):
1225 session = boto3.session.Session(**session_kwargs)
1226 client = session.client('s3')
1227 ctoken = None
1229 while True:
1230 # list_objects_v2 doesn't like a None value for ContinuationToken
1231 # so we don't set it if we don't have one.
1232 if ctoken:
1233 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken)
1234 else:
1235 kwargs = dict(Bucket=bucket_name, Prefix=prefix)
1236 response = client.list_objects_v2(**kwargs)
1237 try:
1238 content = response['Contents']
1239 except KeyError:
1240 pass
1241 else:
1242 for c in content:
1243 key = c['Key']
1244 if accept_key(key):
1245 yield key
1246 ctoken = response.get('NextContinuationToken', None)
1247 if not ctoken:
1248 break
1251def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs):
1252 if bucket_name is None:
1253 raise ValueError('bucket_name may not be None')
1255 #
1256 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-or-multiprocessing-with-resources
1257 #
1258 session = boto3.session.Session(**session_kwargs)
1259 s3 = session.resource('s3')
1260 bucket = s3.Bucket(bucket_name)
1262 # Sometimes, https://github.com/boto/boto/issues/2409 can happen
1263 # because of network issues on either side.
1264 # Retry up to 3 times to ensure its not a transient issue.
1265 for x in range(retries + 1):
1266 try:
1267 content_bytes = _download_fileobj(bucket, key_name)
1268 except botocore.client.ClientError:
1269 # Actually fail on last pass through the loop
1270 if x == retries:
1271 raise
1272 # Otherwise, try again, as this might be a transient timeout
1273 pass
1274 else:
1275 return key_name, content_bytes
1278def _download_fileobj(bucket, key_name):
1279 #
1280 # This is a separate function only because it makes it easier to inject
1281 # exceptions during tests.
1282 #
1283 buf = io.BytesIO()
1284 bucket.download_fileobj(key_name, buf)
1285 return buf.getvalue()