Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/s3.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8"""Implements file-like objects for reading and writing from/to AWS S3."""
9from __future__ import annotations
11import http
12import io
13import functools
14import logging
15import time
16import warnings
18from typing import (
19 Callable,
20 List,
21 TYPE_CHECKING,
22)
24try:
25 import boto3
26 import botocore.client
27 import botocore.exceptions
28 import urllib3.exceptions
29except ImportError:
30 MISSING_DEPS = True
32import smart_open.bytebuffer
33import smart_open.concurrency
34import smart_open.utils
36from smart_open import constants
39if TYPE_CHECKING:
40 from mypy_boto3_s3.client import S3Client
41 from typing_extensions import Buffer
43logger = logging.getLogger(__name__)
45#
46# AWS puts restrictions on the part size for multipart uploads.
47# Each part must be more than 5MB, and less than 5GB.
48#
49# On top of that, our MultipartWriter has a min_part_size option.
50# In retrospect, it's an unfortunate name, because it conflicts with the
51# minimum allowable part size (5MB), but it's too late to change it, because
52# people are using that parameter (unlike the MIN, DEFAULT, MAX constants).
53# It really just means "part size": as soon as you have this many bytes,
54# write a part to S3 (see the MultipartWriter.write method).
55#
57MIN_PART_SIZE = 5 * 1024 ** 2
58"""The absolute minimum permitted by Amazon."""
60DEFAULT_PART_SIZE = 50 * 1024**2
61"""The default part size for S3 multipart uploads, chosen carefully by smart_open"""
63MAX_PART_SIZE = 5 * 1024 ** 3
64"""The absolute maximum permitted by Amazon."""
66SCHEMES = ("s3", "s3n", 's3u', "s3a")
67DEFAULT_PORT = 443
68DEFAULT_HOST = 's3.amazonaws.com'
70DEFAULT_BUFFER_SIZE = 128 * 1024
72URI_EXAMPLES = (
73 's3://my_bucket/my_key',
74 's3://my_key:my_secret@my_bucket/my_key',
75 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key',
76)
78# Returned by AWS when we try to seek beyond EOF.
79_OUT_OF_RANGE = 'InvalidRange'
82class Retry:
83 def __init__(self):
84 self.attempts: int = 6
85 self.sleep_seconds: int = 10
86 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError]
87 self.client_error_codes: List[str] = ['NoSuchUpload']
89 def _do(self, fn: Callable):
90 for attempt in range(self.attempts):
91 try:
92 return fn()
93 except tuple(self.exceptions) as err:
94 logger.critical(
95 'Caught non-fatal %s, retrying %d more times',
96 err,
97 self.attempts - attempt - 1,
98 )
99 logger.exception(err)
100 time.sleep(self.sleep_seconds)
101 except botocore.exceptions.ClientError as err:
102 error_code = err.response['Error'].get('Code')
103 if error_code not in self.client_error_codes:
104 raise
105 logger.critical(
106 'Caught non-fatal ClientError (%s), retrying %d more times',
107 error_code,
108 self.attempts - attempt - 1,
109 )
110 logger.exception(err)
111 time.sleep(self.sleep_seconds)
112 else:
113 logger.critical('encountered too many non-fatal errors, giving up')
114 raise IOError('%s failed after %d attempts', fn.func, self.attempts)
117#
118# The retry mechanism for this submodule. Client code may modify it, e.g. by
119# updating RETRY.sleep_seconds and friends.
120#
121if 'MISSING_DEPS' not in locals():
122 RETRY = Retry()
125class _ClientWrapper:
126 """Wraps a client to inject the appropriate keyword args into each method call.
128 The keyword args are a dictionary keyed by the fully qualified method name.
129 For example, S3.Client.create_multipart_upload.
131 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client
133 This wrapper behaves identically to the client otherwise.
134 """
135 def __init__(self, client, kwargs):
136 self.client = client
137 self.kwargs = kwargs
139 def __getattr__(self, method_name):
140 method = getattr(self.client, method_name)
141 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {})
142 return functools.partial(method, **kwargs)
145def parse_uri(uri_as_string):
146 #
147 # Restrictions on bucket names and labels:
148 #
149 # - Bucket names must be at least 3 and no more than 63 characters long.
150 # - Bucket names must be a series of one or more labels.
151 # - Adjacent labels are separated by a single period (.).
152 # - Bucket names can contain lowercase letters, numbers, and hyphens.
153 # - Each label must start and end with a lowercase letter or a number.
154 #
155 # We use the above as a guide only, and do not perform any validation. We
156 # let boto3 take care of that for us.
157 #
158 split_uri = smart_open.utils.safe_urlsplit(uri_as_string)
159 assert split_uri.scheme in SCHEMES
161 port = DEFAULT_PORT
162 host = DEFAULT_HOST
163 ordinary_calling_format = False
164 #
165 # These defaults tell boto3 to look for credentials elsewhere
166 #
167 access_id, access_secret = None, None
169 #
170 # Common URI template [secret:key@][host[:port]@]bucket/object
171 #
172 # The urlparse function doesn't handle the above schema, so we have to do
173 # it ourselves.
174 #
175 uri = split_uri.netloc + split_uri.path
177 #
178 # Attempt to extract edge-case authentication details from the URL.
179 #
180 # See:
181 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/
182 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases
183 #
184 if '@' in uri:
185 maybe_auth, rest = uri.split('@', 1)
186 if ':' in maybe_auth:
187 maybe_id, maybe_secret = maybe_auth.split(':', 1)
188 if '/' not in maybe_id:
189 access_id, access_secret = maybe_id, maybe_secret
190 uri = rest
192 head, key_id = uri.split('/', 1)
193 if '@' in head and ':' in head:
194 ordinary_calling_format = True
195 host_port, bucket_id = head.split('@')
196 host, port = host_port.split(':', 1)
197 port = int(port)
198 elif '@' in head:
199 ordinary_calling_format = True
200 host, bucket_id = head.split('@')
201 else:
202 bucket_id = head
204 return dict(
205 scheme=split_uri.scheme,
206 bucket_id=bucket_id,
207 key_id=key_id,
208 port=port,
209 host=host,
210 ordinary_calling_format=ordinary_calling_format,
211 access_id=access_id,
212 access_secret=access_secret,
213 )
216def _consolidate_params(uri, transport_params):
217 """Consolidates the parsed Uri with the additional parameters.
219 This is necessary because the user can pass some of the parameters can in
220 two different ways:
222 1) Via the URI itself
223 2) Via the transport parameters
225 These are not mutually exclusive, but we have to pick one over the other
226 in a sensible way in order to proceed.
228 """
229 transport_params = dict(transport_params)
231 def inject(**kwargs):
232 try:
233 client_kwargs = transport_params['client_kwargs']
234 except KeyError:
235 client_kwargs = transport_params['client_kwargs'] = {}
237 try:
238 init_kwargs = client_kwargs['S3.Client']
239 except KeyError:
240 init_kwargs = client_kwargs['S3.Client'] = {}
242 init_kwargs.update(**kwargs)
244 client = transport_params.get('client')
245 if client is not None and (uri['access_id'] or uri['access_secret']):
246 logger.warning(
247 'ignoring credentials parsed from URL because they conflict with '
248 'transport_params["client"]. Set transport_params["client"] to None '
249 'to suppress this warning.'
250 )
251 uri.update(access_id=None, access_secret=None)
252 elif (uri['access_id'] and uri['access_secret']):
253 inject(
254 aws_access_key_id=uri['access_id'],
255 aws_secret_access_key=uri['access_secret'],
256 )
257 uri.update(access_id=None, access_secret=None)
259 if client is not None and uri['host'] != DEFAULT_HOST:
260 logger.warning(
261 'ignoring endpoint_url parsed from URL because they conflict with '
262 'transport_params["client"]. Set transport_params["client"] to None '
263 'to suppress this warning.'
264 )
265 uri.update(host=None)
266 elif uri['host'] != DEFAULT_HOST:
267 if uri['scheme'] == 's3u':
268 scheme = 'http'
269 else:
270 scheme = 'https'
271 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri)
272 uri.update(host=None)
274 return uri, transport_params
277def open_uri(uri, mode, transport_params):
278 deprecated = (
279 'multipart_upload_kwargs',
280 'object_kwargs',
281 'resource',
282 'resource_kwargs',
283 'session',
284 'singlepart_upload_kwargs',
285 )
286 detected = [k for k in deprecated if k in transport_params]
287 if detected:
288 doc_url = (
289 'https://github.com/piskvorky/smart_open/blob/develop/'
290 'MIGRATING_FROM_OLDER_VERSIONS.rst'
291 )
292 #
293 # We use warnings.warn /w UserWarning instead of logger.warn here because
294 #
295 # 1) Not everyone has logging enabled; and
296 # 2) check_kwargs (below) already uses logger.warn with a similar message
297 #
298 # https://github.com/piskvorky/smart_open/issues/614
299 #
300 message = (
301 'ignoring the following deprecated transport parameters: %r. '
302 'See <%s> for details' % (detected, doc_url)
303 )
304 warnings.warn(message, UserWarning)
305 parsed_uri = parse_uri(uri)
306 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params)
307 kwargs = smart_open.utils.check_kwargs(open, transport_params)
308 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs)
311def open(
312 bucket_id,
313 key_id,
314 mode,
315 version_id=None,
316 buffer_size=DEFAULT_BUFFER_SIZE,
317 min_part_size=DEFAULT_PART_SIZE,
318 multipart_upload=True,
319 defer_seek=False,
320 client=None,
321 client_kwargs=None,
322 writebuffer=None,
323):
324 """Open an S3 object for reading or writing.
326 Parameters
327 ----------
328 bucket_id: str
329 The name of the bucket this object resides in.
330 key_id: str
331 The name of the key within the bucket.
332 mode: str
333 The mode for opening the object. Must be either "rb" or "wb".
334 buffer_size: int, optional
335 The buffer size to use when performing I/O.
336 min_part_size: int, optional
337 The minimum part size for multipart uploads, in bytes.
338 When the writebuffer contains this many bytes, smart_open will upload
339 the bytes to S3 as a single part of a multi-part upload, freeing the
340 buffer either partially or entirely. When you close the writer, it
341 will assemble the parts together.
342 The value determines the upper limit for the writebuffer. If buffer
343 space is short (e.g. you are buffering to memory), then use a smaller
344 value for min_part_size, or consider buffering to disk instead (see
345 the writebuffer option).
346 The value must be between 5MB and 5GB. If you specify a value outside
347 of this range, smart_open will adjust it for you, because otherwise the
348 upload _will_ fail.
349 For writing only. Does not apply if you set multipart_upload=False.
350 multipart_upload: bool, optional
351 Default: `True`
352 If set to `True`, will use multipart upload for writing to S3. If set
353 to `False`, S3 upload will use the S3 Single-Part Upload API, which
354 is more ideal for small file sizes.
355 For writing only.
356 version_id: str, optional
357 Version of the object, used when reading object.
358 If None, will fetch the most recent version.
359 defer_seek: boolean, optional
360 Default: `False`
361 If set to `True` on a file opened for reading, GetObject will not be
362 called until the first seek() or read().
363 Avoids redundant API queries when seeking before reading.
364 client: object, optional
365 The S3 client to use when working with boto3.
366 If you don't specify this, then smart_open will create a new client for you.
367 client_kwargs: dict, optional
368 Additional parameters to pass to the relevant functions of the client.
369 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`.
370 The values are kwargs to pass to that method each time it is called.
371 writebuffer: IO[bytes], optional
372 By default, this module will buffer data in memory using io.BytesIO
373 when writing. Pass another binary IO instance here to use it instead.
374 For example, you may pass a file object to buffer to local disk instead
375 of in RAM. Use this to keep RAM usage low at the expense of additional
376 disk IO. If you pass in an open file, then you are responsible for
377 cleaning it up after writing completes.
378 """
379 logger.debug('%r', locals())
380 if mode not in constants.BINARY_MODES:
381 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
383 if (mode == constants.WRITE_BINARY) and (version_id is not None):
384 raise ValueError("version_id must be None when writing")
386 if mode == constants.READ_BINARY:
387 fileobj = Reader(
388 bucket_id,
389 key_id,
390 version_id=version_id,
391 buffer_size=buffer_size,
392 defer_seek=defer_seek,
393 client=client,
394 client_kwargs=client_kwargs,
395 )
396 elif mode == constants.WRITE_BINARY:
397 if multipart_upload:
398 fileobj = MultipartWriter(
399 bucket_id,
400 key_id,
401 client=client,
402 client_kwargs=client_kwargs,
403 writebuffer=writebuffer,
404 part_size=min_part_size,
405 )
406 else:
407 fileobj = SinglepartWriter(
408 bucket_id,
409 key_id,
410 client=client,
411 client_kwargs=client_kwargs,
412 writebuffer=writebuffer,
413 )
414 else:
415 assert False, 'unexpected mode: %r' % mode
417 fileobj.name = key_id
418 return fileobj
421def _get(client, bucket, key, version, range_string):
422 try:
423 params = dict(Bucket=bucket, Key=key)
424 if version:
425 params["VersionId"] = version
426 if range_string:
427 params["Range"] = range_string
429 return client.get_object(**params)
430 except botocore.client.ClientError as error:
431 wrapped_error = IOError(
432 'unable to access bucket: %r key: %r version: %r error: %s' % (
433 bucket, key, version, error
434 )
435 )
436 wrapped_error.backend_error = error
437 raise wrapped_error from error
440def _unwrap_ioerror(ioe):
441 """Given an IOError from _get, return the 'Error' dictionary from boto."""
442 try:
443 return ioe.backend_error.response['Error']
444 except (AttributeError, KeyError):
445 return None
448class _SeekableRawReader(object):
449 """Read an S3 object.
451 This class is internal to the S3 submodule.
452 """
454 def __init__(
455 self,
456 client,
457 bucket,
458 key,
459 version_id=None,
460 ):
461 self._client = client
462 self._bucket = bucket
463 self._key = key
464 self._version_id = version_id
466 self._content_length = None
467 self._position = 0
468 self._body = None
470 def seek(self, offset, whence=constants.WHENCE_START):
471 """Seek to the specified position.
473 :param int offset: The offset in bytes.
474 :param int whence: Where the offset is from.
476 :returns: the position after seeking.
477 :rtype: int
478 """
479 if whence not in constants.WHENCE_CHOICES:
480 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES)
482 #
483 # Close old body explicitly.
484 # When first seek() after __init__(), self._body is not exist.
485 #
486 if self._body is not None:
487 self._body.close()
488 self._body = None
490 start = None
491 stop = None
492 if whence == constants.WHENCE_START:
493 start = max(0, offset)
494 elif whence == constants.WHENCE_CURRENT:
495 start = max(0, offset + self._position)
496 else:
497 stop = max(0, -offset)
499 #
500 # If we can figure out that we've read past the EOF, then we can save
501 # an extra API call.
502 #
503 if self._content_length is None:
504 reached_eof = False
505 elif start is not None and start >= self._content_length:
506 reached_eof = True
507 elif stop == 0:
508 reached_eof = True
509 else:
510 reached_eof = False
512 if reached_eof:
513 self._body = io.BytesIO()
514 self._position = self._content_length
515 else:
516 self._open_body(start, stop)
518 return self._position
520 def _open_body(self, start=None, stop=None):
521 """Open a connection to download the specified range of bytes. Store
522 the open file handle in self._body.
524 If no range is specified, start defaults to self._position.
525 start and stop follow the semantics of the http range header,
526 so a stop without a start will read bytes beginning at stop.
528 As a side effect, set self._content_length. Set self._position
529 to self._content_length if start is past end of file.
530 """
531 if start is None and stop is None:
532 start = self._position
533 range_string = smart_open.utils.make_range_string(start, stop)
535 try:
536 # Optimistically try to fetch the requested content range.
537 response = _get(
538 self._client,
539 self._bucket,
540 self._key,
541 self._version_id,
542 range_string,
543 )
544 except IOError as ioe:
545 # Handle requested content range exceeding content size.
546 error_response = _unwrap_ioerror(ioe)
547 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE:
548 raise
549 try:
550 self._position = self._content_length = int(error_response['ActualObjectSize'])
551 self._body = io.BytesIO()
552 except KeyError:
553 response = _get(
554 self._client,
555 self._bucket,
556 self._key,
557 self._version_id,
558 None,
559 )
560 self._position = self._content_length = response["ContentLength"]
561 self._body = response["Body"]
562 else:
563 #
564 # Keep track of how many times boto3's built-in retry mechanism
565 # activated.
566 #
567 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response
568 #
569 logger.debug(
570 '%s: RetryAttempts: %d',
571 self,
572 response['ResponseMetadata']['RetryAttempts'],
573 )
574 #
575 # range request may not always return partial content, see:
576 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses
577 #
578 status_code = response['ResponseMetadata']['HTTPStatusCode']
579 if status_code == http.HTTPStatus.PARTIAL_CONTENT:
580 _, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange'])
581 self._position = start
582 elif status_code == http.HTTPStatus.OK:
583 length = response["ContentLength"]
584 self._content_length = length
585 self._body = response['Body']
587 def read(self, size=-1):
588 """Read from the continuous connection with the remote peer."""
589 if self._body is None:
590 # This is necessary for the very first read() after __init__().
591 self._open_body()
592 if self._position >= self._content_length:
593 return b''
595 #
596 # Boto3 has built-in error handling and retry mechanisms:
597 #
598 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
599 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
600 #
601 # Unfortunately, it isn't always enough. There is still a non-zero
602 # possibility that an exception will slip past these mechanisms and
603 # terminate the read prematurely. Luckily, at this stage, it's very
604 # simple to recover from the problem: wait a little bit, reopen the
605 # HTTP connection and try again. Usually, a single retry attempt is
606 # enough to recover, but we try multiple times "just in case".
607 #
608 for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1):
609 try:
610 if size == -1:
611 binary = self._body.read()
612 else:
613 binary = self._body.read(size)
614 except (
615 ConnectionResetError,
616 botocore.exceptions.BotoCoreError,
617 urllib3.exceptions.HTTPError,
618 ) as err:
619 logger.warning(
620 '%s: caught %r while reading %d bytes, sleeping %ds before retry',
621 self,
622 err,
623 size,
624 seconds,
625 )
626 time.sleep(seconds)
627 self._open_body()
628 else:
629 self._position += len(binary)
630 return binary
632 raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt))
634 def __str__(self):
635 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key)
638def _initialize_boto3(rw, client, client_kwargs, bucket, key):
639 """Created the required objects for accessing S3. Ideally, they have
640 been already created for us and we can just reuse them."""
641 if client_kwargs is None:
642 client_kwargs = {}
644 if client is None:
645 init_kwargs = client_kwargs.get('S3.Client', {})
646 client = boto3.client('s3', **init_kwargs)
647 assert client
649 rw._client = _ClientWrapper(client, client_kwargs)
650 rw._bucket = bucket
651 rw._key = key
654class Reader(io.BufferedIOBase):
655 """Reads bytes from S3.
657 Implements the io.BufferedIOBase interface of the standard library."""
659 def __init__(
660 self,
661 bucket,
662 key,
663 version_id=None,
664 buffer_size=DEFAULT_BUFFER_SIZE,
665 line_terminator=constants.BINARY_NEWLINE,
666 defer_seek=False,
667 client=None,
668 client_kwargs=None,
669 ):
670 self._version_id = version_id
671 self._buffer_size = buffer_size
673 _initialize_boto3(self, client, client_kwargs, bucket, key)
675 self._raw_reader = _SeekableRawReader(
676 self._client,
677 bucket,
678 key,
679 self._version_id,
680 )
681 self._current_pos = 0
682 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
683 self._eof = False
684 self._line_terminator = line_terminator
685 self._seek_initialized = False
687 #
688 # This member is part of the io.BufferedIOBase interface.
689 #
690 self.raw = None
692 if not defer_seek:
693 self.seek(0)
695 #
696 # io.BufferedIOBase methods.
697 #
699 def close(self):
700 """Flush and close this stream."""
701 logger.debug("close: called")
702 pass
704 def readable(self):
705 """Return True if the stream can be read from."""
706 return True
708 def read(self, size=-1):
709 """Read up to size bytes from the object and return them."""
710 if size == 0:
711 return b''
712 elif size < 0:
713 # call read() before setting _current_pos to make sure _content_length is set
714 out = self._read_from_buffer() + self._raw_reader.read()
715 self._current_pos = self._raw_reader._content_length
716 return out
718 #
719 # Return unused data first
720 #
721 if len(self._buffer) >= size:
722 return self._read_from_buffer(size)
724 #
725 # If the stream is finished, return what we have.
726 #
727 if self._eof:
728 return self._read_from_buffer()
730 self._fill_buffer(size)
731 return self._read_from_buffer(size)
733 def read1(self, size=-1):
734 """This is the same as read()."""
735 return self.read(size=size)
737 def readinto(self, b):
738 """Read up to len(b) bytes into b, and return the number of bytes
739 read."""
740 data = self.read(len(b))
741 if not data:
742 return 0
743 b[:len(data)] = data
744 return len(data)
746 def readline(self, limit=-1):
747 """Read up to and including the next newline. Returns the bytes read."""
748 if limit != -1:
749 raise NotImplementedError('limits other than -1 not implemented yet')
751 #
752 # A single line may span multiple buffers.
753 #
754 line = io.BytesIO()
755 while not (self._eof and len(self._buffer) == 0):
756 line_part = self._buffer.readline(self._line_terminator)
757 line.write(line_part)
758 self._current_pos += len(line_part)
760 if line_part.endswith(self._line_terminator):
761 break
762 else:
763 self._fill_buffer()
765 return line.getvalue()
767 def seekable(self):
768 """If False, seek(), tell() and truncate() will raise IOError.
770 We offer only seek support, and no truncate support."""
771 return True
773 def seek(self, offset, whence=constants.WHENCE_START):
774 """Seek to the specified position.
776 :param int offset: The offset in bytes.
777 :param int whence: Where the offset is from.
779 Returns the position after seeking."""
780 # Convert relative offset to absolute, since self._raw_reader
781 # doesn't know our current position.
782 if whence == constants.WHENCE_CURRENT:
783 whence = constants.WHENCE_START
784 offset += self._current_pos
786 if not self._seek_initialized or not (
787 whence == constants.WHENCE_START and offset == self._current_pos
788 ):
789 self._current_pos = self._raw_reader.seek(offset, whence)
791 self._buffer.empty()
793 self._eof = self._current_pos == self._raw_reader._content_length
795 self._seek_initialized = True
796 return self._current_pos
798 def tell(self):
799 """Return the current position within the file."""
800 return self._current_pos
802 def truncate(self, size=None):
803 """Unsupported."""
804 raise io.UnsupportedOperation
806 def detach(self):
807 """Unsupported."""
808 raise io.UnsupportedOperation
810 def terminate(self):
811 """Do nothing."""
812 pass
814 def to_boto3(self, resource):
815 """Create an **independent** `boto3.s3.Object` instance that points to
816 the same S3 object as this instance.
817 Changes to the returned object will not affect the current instance.
818 """
819 assert resource, 'resource must be a boto3.resource instance'
820 obj = resource.Object(self._bucket, self._key)
821 if self._version_id is not None:
822 return obj.Version(self._version_id)
823 else:
824 return obj
826 #
827 # Internal methods.
828 #
829 def _read_from_buffer(self, size=-1):
830 """Remove at most size bytes from our buffer and return them."""
831 size = size if size >= 0 else len(self._buffer)
832 part = self._buffer.read(size)
833 self._current_pos += len(part)
834 return part
836 def _fill_buffer(self, size=-1):
837 size = max(size, self._buffer._chunk_size)
838 while len(self._buffer) < size and not self._eof:
839 bytes_read = self._buffer.fill(self._raw_reader)
840 if bytes_read == 0:
841 logger.debug('%s: reached EOF while filling buffer', self)
842 self._eof = True
844 def __str__(self):
845 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key)
847 def __repr__(self):
848 return (
849 "smart_open.s3.Reader("
850 "bucket=%r, "
851 "key=%r, "
852 "version_id=%r, "
853 "buffer_size=%r, "
854 "line_terminator=%r)"
855 ) % (
856 self._bucket,
857 self._key,
858 self._version_id,
859 self._buffer_size,
860 self._line_terminator,
861 )
864class MultipartWriter(io.BufferedIOBase):
865 """Writes bytes to S3 using the multi part API.
867 Implements the io.BufferedIOBase interface of the standard library."""
868 _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called
870 def __init__(
871 self,
872 bucket,
873 key,
874 part_size=DEFAULT_PART_SIZE,
875 client=None,
876 client_kwargs=None,
877 writebuffer: io.BytesIO | None = None,
878 ):
879 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE)
880 if part_size != adjusted_ps:
881 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}")
882 part_size = adjusted_ps
883 self._part_size = part_size
885 _initialize_boto3(self, client, client_kwargs, bucket, key)
886 self._client: S3Client
887 self._bucket: str
888 self._key: str
890 try:
891 partial = functools.partial(
892 self._client.create_multipart_upload,
893 Bucket=bucket,
894 Key=key,
895 )
896 self._upload_id = RETRY._do(partial)['UploadId']
897 except botocore.client.ClientError as error:
898 raise ValueError(
899 'the bucket %r does not exist, or is forbidden for access (%r)' % (
900 bucket, error
901 )
902 ) from error
904 if writebuffer is None:
905 self._buf = io.BytesIO()
906 else:
907 self._buf = writebuffer
909 self._total_bytes = 0
910 self._total_parts = 0
911 self._parts: list[dict[str, object]] = []
913 #
914 # This member is part of the io.BufferedIOBase interface.
915 #
916 self.raw = None # type: ignore[assignment]
918 def flush(self):
919 pass
921 #
922 # Override some methods from io.IOBase.
923 #
924 def close(self):
925 logger.debug("close: called")
926 if self.closed:
927 return
929 if self._buf.tell():
930 self._upload_next_part()
932 logger.debug('%s: completing multipart upload', self)
933 if self._total_bytes and self._upload_id:
934 partial = functools.partial(
935 self._client.complete_multipart_upload,
936 Bucket=self._bucket,
937 Key=self._key,
938 UploadId=self._upload_id,
939 MultipartUpload={'Parts': self._parts},
940 )
941 RETRY._do(partial)
942 logger.debug('%s: completed multipart upload', self)
943 elif self._upload_id:
944 #
945 # AWS complains with "The XML you provided was not well-formed or
946 # did not validate against our published schema" when the input is
947 # completely empty => abort the upload, no file created.
948 #
949 # We work around this by creating an empty file explicitly.
950 #
951 self._client.abort_multipart_upload(
952 Bucket=self._bucket,
953 Key=self._key,
954 UploadId=self._upload_id,
955 )
956 self._client.put_object(
957 Bucket=self._bucket,
958 Key=self._key,
959 Body=b'',
960 )
961 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self)
962 self._upload_id = None
964 @property
965 def closed(self):
966 return self._upload_id is None
968 def writable(self):
969 """Return True if the stream supports writing."""
970 return True
972 def seekable(self):
973 """If False, seek(), tell() and truncate() will raise IOError.
975 We offer only tell support, and no seek or truncate support."""
976 return True
978 def seek(self, offset, whence=constants.WHENCE_START):
979 """Unsupported."""
980 raise io.UnsupportedOperation
982 def truncate(self, size=None):
983 """Unsupported."""
984 raise io.UnsupportedOperation
986 def tell(self):
987 """Return the current stream position."""
988 return self._total_bytes
990 #
991 # io.BufferedIOBase methods.
992 #
993 def detach(self):
994 raise io.UnsupportedOperation("detach() not supported")
996 def write(self, b: Buffer) -> int:
997 """Write the given buffer (bytes, bytearray, memoryview or any buffer
998 interface implementation) to the S3 file.
1000 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html
1002 There's buffering happening under the covers, so this may not actually
1003 do any HTTP transfer right away."""
1004 offset = 0
1005 mv = memoryview(b)
1006 self._total_bytes += len(mv)
1008 #
1009 # botocore does not accept memoryview, otherwise we could've gotten
1010 # away with not needing to write a copy to the buffer aside from cases
1011 # where b is smaller than part_size
1012 #
1013 while offset < len(mv):
1014 start = offset
1015 end = offset + self._part_size - self._buf.tell()
1016 self._buf.write(mv[start:end])
1017 if self._buf.tell() < self._part_size:
1018 #
1019 # Not enough data to write a new part just yet. The assert
1020 # ensures that we've consumed all of the input buffer.
1021 #
1022 assert end >= len(mv)
1023 return len(mv)
1025 self._upload_next_part()
1026 offset = end
1027 return len(mv)
1029 def terminate(self):
1030 """Cancel the underlying multipart upload."""
1031 if self.closed:
1032 return
1033 logger.debug('%s: terminating multipart upload', self)
1034 self._client.abort_multipart_upload(
1035 Bucket=self._bucket,
1036 Key=self._key,
1037 UploadId=self._upload_id,
1038 )
1039 self._upload_id = None
1040 logger.debug('%s: terminated multipart upload', self)
1042 def to_boto3(self, resource):
1043 """Create an **independent** `boto3.s3.Object` instance that points to
1044 the same S3 object as this instance.
1045 Changes to the returned object will not affect the current instance.
1046 """
1047 assert resource, 'resource must be a boto3.resource instance'
1048 return resource.Object(self._bucket, self._key)
1050 #
1051 # Internal methods.
1052 #
1053 def _upload_next_part(self) -> None:
1054 part_num = self._total_parts + 1
1055 logger.info(
1056 "%s: uploading part_num: %i, %i bytes (total %.3fGB)",
1057 self,
1058 part_num,
1059 self._buf.tell(),
1060 self._total_bytes / 1024.0 ** 3,
1061 )
1062 self._buf.seek(0)
1064 #
1065 # Network problems in the middle of an upload are particularly
1066 # troublesome. We don't want to abort the entire upload just because
1067 # of a temporary connection problem, so this part needs to be
1068 # especially robust.
1069 #
1070 upload = RETRY._do(
1071 functools.partial(
1072 self._client.upload_part,
1073 Bucket=self._bucket,
1074 Key=self._key,
1075 UploadId=self._upload_id,
1076 PartNumber=part_num,
1077 Body=self._buf,
1078 )
1079 )
1081 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num})
1082 logger.debug("%s: upload of part_num #%i finished", self, part_num)
1084 self._total_parts += 1
1086 self._buf.seek(0)
1087 self._buf.truncate(0)
1089 def __enter__(self):
1090 return self
1092 def __exit__(self, exc_type, exc_val, exc_tb):
1093 if exc_type is not None:
1094 self.terminate()
1095 else:
1096 self.close()
1098 def __str__(self):
1099 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key)
1101 def __repr__(self):
1102 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % (
1103 self._bucket,
1104 self._key,
1105 self._part_size,
1106 )
1109class SinglepartWriter(io.BufferedIOBase):
1110 """Writes bytes to S3 using the single part API.
1112 Implements the io.BufferedIOBase interface of the standard library.
1114 This class buffers all of its input in memory until its `close` method is called. Only then will
1115 the data be written to S3 and the buffer is released."""
1116 _buf = None # so `closed` property works in case __init__ fails and __del__ is called
1118 def __init__(
1119 self,
1120 bucket,
1121 key,
1122 client=None,
1123 client_kwargs=None,
1124 writebuffer=None,
1125 ):
1126 _initialize_boto3(self, client, client_kwargs, bucket, key)
1128 if writebuffer is None:
1129 self._buf = io.BytesIO()
1130 elif not writebuffer.seekable():
1131 raise ValueError('writebuffer needs to be seekable')
1132 else:
1133 self._buf = writebuffer
1135 def flush(self):
1136 pass
1138 #
1139 # Override some methods from io.IOBase.
1140 #
1141 def close(self):
1142 logger.debug("close: called")
1143 if self.closed:
1144 return
1146 self.seek(0)
1148 try:
1149 self._client.put_object(
1150 Bucket=self._bucket,
1151 Key=self._key,
1152 Body=self._buf,
1153 )
1154 except botocore.client.ClientError as e:
1155 raise ValueError(
1156 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e
1158 logger.debug("%s: direct upload finished", self)
1159 self._buf.close()
1161 @property
1162 def closed(self):
1163 return self._buf is None or self._buf.closed
1165 def readable(self):
1166 """Propagate."""
1167 return self._buf.readable()
1169 def writable(self):
1170 """Propagate."""
1171 return self._buf.writable()
1173 def seekable(self):
1174 """Propagate."""
1175 return self._buf.seekable()
1177 def seek(self, offset, whence=constants.WHENCE_START):
1178 """Propagate."""
1179 return self._buf.seek(offset, whence)
1181 def truncate(self, size=None):
1182 """Propagate."""
1183 return self._buf.truncate(size)
1185 def tell(self):
1186 """Propagate."""
1187 return self._buf.tell()
1189 def write(self, b):
1190 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1191 interface implementation) into the buffer. Content of the buffer will be
1192 written to S3 on close as a single-part upload.
1194 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html"""
1195 return self._buf.write(b)
1197 def read(self, size=-1):
1198 """Propagate."""
1199 return self._buf.read(size)
1201 def read1(self, size=-1):
1202 """Propagate."""
1203 return self._buf.read1(size)
1205 def terminate(self):
1206 """Close buffer and skip upload."""
1207 self._buf.close()
1208 logger.debug('%s: terminated singlepart upload', self)
1210 #
1211 # Internal methods.
1212 #
1213 def __enter__(self):
1214 return self
1216 def __exit__(self, exc_type, exc_val, exc_tb):
1217 if exc_type is not None:
1218 self.terminate()
1219 else:
1220 self.close()
1222 def __str__(self):
1223 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key)
1225 def __repr__(self):
1226 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key)
1229def _accept_all(key):
1230 return True
1233def iter_bucket(
1234 bucket_name,
1235 prefix='',
1236 accept_key=None,
1237 key_limit=None,
1238 workers=16,
1239 retries=3,
1240 **session_kwargs):
1241 """
1242 Iterate and download all S3 objects under `s3://bucket_name/prefix`.
1244 Parameters
1245 ----------
1246 bucket_name: str
1247 The name of the bucket.
1248 prefix: str, optional
1249 Limits the iteration to keys starting with the prefix.
1250 accept_key: callable, optional
1251 This is a function that accepts a key name (unicode string) and
1252 returns True/False, signalling whether the given key should be downloaded.
1253 The default behavior is to accept all keys.
1254 key_limit: int, optional
1255 If specified, the iterator will stop after yielding this many results.
1256 workers: int, optional
1257 The number of subprocesses to use.
1258 retries: int, optional
1259 The number of time to retry a failed download.
1260 session_kwargs: dict, optional
1261 Keyword arguments to pass when creating a new session.
1262 For a list of available names and values, see:
1263 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session
1266 Yields
1267 ------
1268 str
1269 The full key name (does not include the bucket name).
1270 bytes
1271 The full contents of the key.
1273 Notes
1274 -----
1275 The keys are processed in parallel, using `workers` processes (default: 16),
1276 to speed up downloads greatly. If multiprocessing is not available, thus
1277 _MULTIPROCESSING is False, this parameter will be ignored.
1279 Examples
1280 --------
1282 >>> # get all JSON files under "mybucket/foo/"
1283 >>> for key, content in iter_bucket(
1284 ... bucket_name, prefix='foo/',
1285 ... accept_key=lambda key: key.endswith('.json')):
1286 ... print key, len(content)
1288 >>> # limit to 10k files, using 32 parallel workers (default is 16)
1289 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32):
1290 ... print key, len(content)
1291 """
1292 if accept_key is None:
1293 accept_key = _accept_all
1295 #
1296 # If people insist on giving us bucket instances, silently extract the name
1297 # before moving on. Works for boto3 as well as boto.
1298 #
1299 try:
1300 bucket_name = bucket_name.name
1301 except AttributeError:
1302 pass
1304 total_size, key_no = 0, -1
1305 key_iterator = _list_bucket(
1306 bucket_name,
1307 prefix=prefix,
1308 accept_key=accept_key,
1309 **session_kwargs)
1310 download_key = functools.partial(
1311 _download_key,
1312 bucket_name=bucket_name,
1313 retries=retries,
1314 **session_kwargs)
1316 with smart_open.concurrency.create_pool(processes=workers) as pool:
1317 result_iterator = pool.imap_unordered(download_key, key_iterator)
1318 key_no = 0
1319 while True:
1320 try:
1321 (key, content) = result_iterator.__next__()
1322 if key_no % 1000 == 0:
1323 logger.info(
1324 "yielding key #%i: %s, size %i (total %.1fMB)",
1325 key_no, key, len(content), total_size / 1024.0 ** 2
1326 )
1327 yield key, content
1328 total_size += len(content)
1329 if key_limit is not None and key_no + 1 >= key_limit:
1330 # we were asked to output only a limited number of keys => we're done
1331 break
1332 except botocore.exceptions.ClientError as err:
1333 #
1334 # ignore 404 not found errors: they mean the object was deleted
1335 # after we listed the contents of the bucket, but before we
1336 # downloaded the object.
1337 #
1338 if not ('Error' in err.response and err.response['Error'].get('Code') == '404'):
1339 raise err
1340 except StopIteration:
1341 break
1342 key_no += 1
1343 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))
1346def _list_bucket(
1347 bucket_name,
1348 prefix='',
1349 accept_key=lambda k: True,
1350 **session_kwargs):
1351 session = boto3.session.Session(**session_kwargs)
1352 client = session.client('s3')
1353 ctoken = None
1355 while True:
1356 # list_objects_v2 doesn't like a None value for ContinuationToken
1357 # so we don't set it if we don't have one.
1358 if ctoken:
1359 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken)
1360 else:
1361 kwargs = dict(Bucket=bucket_name, Prefix=prefix)
1362 response = client.list_objects_v2(**kwargs)
1363 try:
1364 content = response['Contents']
1365 except KeyError:
1366 pass
1367 else:
1368 for c in content:
1369 key = c['Key']
1370 if accept_key(key):
1371 yield key
1372 ctoken = response.get('NextContinuationToken', None)
1373 if not ctoken:
1374 break
1377def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs):
1378 if bucket_name is None:
1379 raise ValueError('bucket_name may not be None')
1381 #
1382 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-or-multiprocessing-with-resources
1383 #
1384 session = boto3.session.Session(**session_kwargs)
1385 s3 = session.resource('s3')
1386 bucket = s3.Bucket(bucket_name)
1388 # Sometimes, https://github.com/boto/boto/issues/2409 can happen
1389 # because of network issues on either side.
1390 # Retry up to 3 times to ensure its not a transient issue.
1391 for x in range(retries + 1):
1392 try:
1393 content_bytes = _download_fileobj(bucket, key_name)
1394 except botocore.client.ClientError:
1395 # Actually fail on last pass through the loop
1396 if x == retries:
1397 raise
1398 # Otherwise, try again, as this might be a transient timeout
1399 pass
1400 else:
1401 return key_name, content_bytes
1404def _download_fileobj(bucket, key_name):
1405 #
1406 # This is a separate function only because it makes it easier to inject
1407 # exceptions during tests.
1408 #
1409 buf = io.BytesIO()
1410 bucket.download_fileobj(key_name, buf)
1411 return buf.getvalue()