Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/s3.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8"""Implements file-like objects for reading and writing from/to AWS S3."""
9from __future__ import annotations
11import http
12import io
13import functools
14import logging
15import time
16import warnings
18from typing import (
19 Callable,
20 List,
21 TYPE_CHECKING,
22)
24try:
25 import boto3
26 import botocore.client
27 import botocore.exceptions
28 import urllib3.exceptions
29except ImportError:
30 MISSING_DEPS = True
32import smart_open.bytebuffer
33import smart_open.concurrency
34import smart_open.utils
36from smart_open import constants
39if TYPE_CHECKING:
40 from mypy_boto3_s3.client import S3Client
41 from typing_extensions import Buffer
43logger = logging.getLogger(__name__)
45#
46# AWS puts restrictions on the part size for multipart uploads.
47# Each part must be more than 5MB, and less than 5GB.
48#
49# On top of that, our MultipartWriter has a min_part_size option.
50# In retrospect, it's an unfortunate name, because it conflicts with the
51# minimum allowable part size (5MB), but it's too late to change it, because
52# people are using that parameter (unlike the MIN, DEFAULT, MAX constants).
53# It really just means "part size": as soon as you have this many bytes,
54# write a part to S3 (see the MultipartWriter.write method).
55#
57MIN_PART_SIZE = 5 * 1024 ** 2
58"""The absolute minimum permitted by Amazon."""
60DEFAULT_PART_SIZE = 50 * 1024**2
61"""The default part size for S3 multipart uploads, chosen carefully by smart_open"""
63MAX_PART_SIZE = 5 * 1024 ** 3
64"""The absolute maximum permitted by Amazon."""
66SCHEMES = ("s3", "s3n", 's3u', "s3a")
67DEFAULT_PORT = 443
68DEFAULT_HOST = 's3.amazonaws.com'
70DEFAULT_BUFFER_SIZE = 128 * 1024
72URI_EXAMPLES = (
73 's3://my_bucket/my_key',
74 's3://my_key:my_secret@my_bucket/my_key',
75 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key',
76)
78# Returned by AWS when we try to seek beyond EOF.
79_OUT_OF_RANGE = 'InvalidRange'
82class Retry:
83 def __init__(self):
84 self.attempts: int = 6
85 self.sleep_seconds: int = 10
86 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError]
87 self.client_error_codes: List[str] = ['NoSuchUpload']
89 def _do(self, fn: Callable):
90 for attempt in range(self.attempts):
91 try:
92 return fn()
93 except tuple(self.exceptions) as err:
94 logger.critical(
95 'Caught non-fatal %s, retrying %d more times',
96 err,
97 self.attempts - attempt - 1,
98 )
99 logger.exception(err)
100 time.sleep(self.sleep_seconds)
101 except botocore.exceptions.ClientError as err:
102 error_code = err.response['Error'].get('Code')
103 if error_code not in self.client_error_codes:
104 raise
105 logger.critical(
106 'Caught non-fatal ClientError (%s), retrying %d more times',
107 error_code,
108 self.attempts - attempt - 1,
109 )
110 logger.exception(err)
111 time.sleep(self.sleep_seconds)
112 else:
113 logger.critical('encountered too many non-fatal errors, giving up')
114 raise IOError('%s failed after %d attempts', fn.func, self.attempts)
117#
118# The retry mechanism for this submodule. Client code may modify it, e.g. by
119# updating RETRY.sleep_seconds and friends.
120#
121if 'MISSING_DEPS' not in locals():
122 RETRY = Retry()
125class _ClientWrapper:
126 """Wraps a client to inject the appropriate keyword args into each method call.
128 The keyword args are a dictionary keyed by the fully qualified method name.
129 For example, S3.Client.create_multipart_upload.
131 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client
133 This wrapper behaves identically to the client otherwise.
134 """
135 def __init__(self, client, kwargs):
136 self.client = client
137 self.kwargs = kwargs
139 def __getattr__(self, method_name):
140 method = getattr(self.client, method_name)
141 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {})
142 return functools.partial(method, **kwargs)
145def parse_uri(uri_as_string):
146 #
147 # Restrictions on bucket names and labels:
148 #
149 # - Bucket names must be at least 3 and no more than 63 characters long.
150 # - Bucket names must be a series of one or more labels.
151 # - Adjacent labels are separated by a single period (.).
152 # - Bucket names can contain lowercase letters, numbers, and hyphens.
153 # - Each label must start and end with a lowercase letter or a number.
154 #
155 # We use the above as a guide only, and do not perform any validation. We
156 # let boto3 take care of that for us.
157 #
158 split_uri = smart_open.utils.safe_urlsplit(uri_as_string)
159 assert split_uri.scheme in SCHEMES
161 port = DEFAULT_PORT
162 host = DEFAULT_HOST
163 ordinary_calling_format = False
164 #
165 # These defaults tell boto3 to look for credentials elsewhere
166 #
167 access_id, access_secret = None, None
169 #
170 # Common URI template [secret:key@][host[:port]@]bucket/object
171 #
172 # The urlparse function doesn't handle the above schema, so we have to do
173 # it ourselves.
174 #
175 uri = split_uri.netloc + split_uri.path
177 #
178 # Attempt to extract edge-case authentication details from the URL.
179 #
180 # See:
181 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/
182 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases
183 #
184 if '@' in uri:
185 maybe_auth, rest = uri.split('@', 1)
186 if ':' in maybe_auth:
187 maybe_id, maybe_secret = maybe_auth.split(':', 1)
188 if '/' not in maybe_id:
189 access_id, access_secret = maybe_id, maybe_secret
190 uri = rest
192 head, key_id = uri.split('/', 1)
193 if '@' in head and ':' in head:
194 ordinary_calling_format = True
195 host_port, bucket_id = head.split('@')
196 host, port = host_port.split(':', 1)
197 port = int(port)
198 elif '@' in head:
199 ordinary_calling_format = True
200 host, bucket_id = head.split('@')
201 else:
202 bucket_id = head
204 return dict(
205 scheme=split_uri.scheme,
206 bucket_id=bucket_id,
207 key_id=key_id,
208 port=port,
209 host=host,
210 ordinary_calling_format=ordinary_calling_format,
211 access_id=access_id,
212 access_secret=access_secret,
213 )
216def _consolidate_params(uri, transport_params):
217 """Consolidates the parsed Uri with the additional parameters.
219 This is necessary because the user can pass some of the parameters can in
220 two different ways:
222 1) Via the URI itself
223 2) Via the transport parameters
225 These are not mutually exclusive, but we have to pick one over the other
226 in a sensible way in order to proceed.
228 """
229 transport_params = dict(transport_params)
231 def inject(**kwargs):
232 try:
233 client_kwargs = transport_params['client_kwargs']
234 except KeyError:
235 client_kwargs = transport_params['client_kwargs'] = {}
237 try:
238 init_kwargs = client_kwargs['S3.Client']
239 except KeyError:
240 init_kwargs = client_kwargs['S3.Client'] = {}
242 init_kwargs.update(**kwargs)
244 client = transport_params.get('client')
245 if client is not None and (uri['access_id'] or uri['access_secret']):
246 logger.warning(
247 'ignoring credentials parsed from URL because they conflict with '
248 'transport_params["client"]. Set transport_params["client"] to None '
249 'to suppress this warning.'
250 )
251 uri.update(access_id=None, access_secret=None)
252 elif (uri['access_id'] and uri['access_secret']):
253 inject(
254 aws_access_key_id=uri['access_id'],
255 aws_secret_access_key=uri['access_secret'],
256 )
257 uri.update(access_id=None, access_secret=None)
259 if client is not None and uri['host'] != DEFAULT_HOST:
260 logger.warning(
261 'ignoring endpoint_url parsed from URL because they conflict with '
262 'transport_params["client"]. Set transport_params["client"] to None '
263 'to suppress this warning.'
264 )
265 uri.update(host=None)
266 elif uri['host'] != DEFAULT_HOST:
267 if uri['scheme'] == 's3u':
268 scheme = 'http'
269 else:
270 scheme = 'https'
271 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri)
272 uri.update(host=None)
274 return uri, transport_params
277def open_uri(uri, mode, transport_params):
278 deprecated = (
279 'multipart_upload_kwargs',
280 'object_kwargs',
281 'resource',
282 'resource_kwargs',
283 'session',
284 'singlepart_upload_kwargs',
285 )
286 detected = [k for k in deprecated if k in transport_params]
287 if detected:
288 doc_url = (
289 'https://github.com/piskvorky/smart_open/blob/develop/'
290 'MIGRATING_FROM_OLDER_VERSIONS.rst'
291 )
292 #
293 # We use warnings.warn /w UserWarning instead of logger.warn here because
294 #
295 # 1) Not everyone has logging enabled; and
296 # 2) check_kwargs (below) already uses logger.warn with a similar message
297 #
298 # https://github.com/piskvorky/smart_open/issues/614
299 #
300 message = (
301 'ignoring the following deprecated transport parameters: %r. '
302 'See <%s> for details' % (detected, doc_url)
303 )
304 warnings.warn(message, UserWarning)
305 parsed_uri = parse_uri(uri)
306 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params)
307 kwargs = smart_open.utils.check_kwargs(open, transport_params)
308 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs)
311def open(
312 bucket_id,
313 key_id,
314 mode,
315 version_id=None,
316 buffer_size=DEFAULT_BUFFER_SIZE,
317 min_part_size=DEFAULT_PART_SIZE,
318 multipart_upload=True,
319 defer_seek=False,
320 client=None,
321 client_kwargs=None,
322 writebuffer=None,
323):
324 """Open an S3 object for reading or writing.
326 Parameters
327 ----------
328 bucket_id: str
329 The name of the bucket this object resides in.
330 key_id: str
331 The name of the key within the bucket.
332 mode: str
333 The mode for opening the object. Must be either "rb" or "wb".
334 buffer_size: int, optional
335 Default: 128KB
336 The buffer size in bytes for reading. Controls memory usage. Data is streamed
337 from a S3 network stream in buffer_size chunks. Forward seeks within
338 the current buffer are satisfied without additional GET requests. Backward
339 seeks always open a new GET request. For forward seek-intensive workloads,
340 increase buffer_size to reduce GET requests at the cost of higher memory usage.
341 min_part_size: int, optional
342 The minimum part size for multipart uploads, in bytes.
343 When the writebuffer contains this many bytes, smart_open will upload
344 the bytes to S3 as a single part of a multi-part upload, freeing the
345 buffer either partially or entirely. When you close the writer, it
346 will assemble the parts together.
347 The value determines the upper limit for the writebuffer. If buffer
348 space is short (e.g. you are buffering to memory), then use a smaller
349 value for min_part_size, or consider buffering to disk instead (see
350 the writebuffer option).
351 The value must be between 5MB and 5GB. If you specify a value outside
352 of this range, smart_open will adjust it for you, because otherwise the
353 upload _will_ fail.
354 For writing only. Does not apply if you set multipart_upload=False.
355 multipart_upload: bool, optional
356 Default: `True`
357 If set to `True`, will use multipart upload for writing to S3. If set
358 to `False`, S3 upload will use the S3 Single-Part Upload API, which
359 is more ideal for small file sizes.
360 For writing only.
361 version_id: str, optional
362 Version of the object, used when reading object.
363 If None, will fetch the most recent version.
364 defer_seek: boolean, optional
365 Default: `False`
366 If set to `True` on a file opened for reading, GetObject will not be
367 called until the first seek() or read().
368 Avoids redundant API queries when seeking before reading.
369 client: object, optional
370 The S3 client to use when working with boto3.
371 If you don't specify this, then smart_open will create a new client for you.
372 client_kwargs: dict, optional
373 Additional parameters to pass to the relevant functions of the client.
374 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`.
375 The values are kwargs to pass to that method each time it is called.
376 writebuffer: IO[bytes], optional
377 By default, this module will buffer data in memory using io.BytesIO
378 when writing. Pass another binary IO instance here to use it instead.
379 For example, you may pass a file object to buffer to local disk instead
380 of in RAM. Use this to keep RAM usage low at the expense of additional
381 disk IO. If you pass in an open file, then you are responsible for
382 cleaning it up after writing completes.
383 """
384 logger.debug('%r', locals())
385 if mode not in constants.BINARY_MODES:
386 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
388 if (mode == constants.WRITE_BINARY) and (version_id is not None):
389 raise ValueError("version_id must be None when writing")
391 if mode == constants.READ_BINARY:
392 fileobj = Reader(
393 bucket_id,
394 key_id,
395 version_id=version_id,
396 buffer_size=buffer_size,
397 defer_seek=defer_seek,
398 client=client,
399 client_kwargs=client_kwargs,
400 )
401 elif mode == constants.WRITE_BINARY:
402 if multipart_upload:
403 fileobj = MultipartWriter(
404 bucket_id,
405 key_id,
406 client=client,
407 client_kwargs=client_kwargs,
408 writebuffer=writebuffer,
409 part_size=min_part_size,
410 )
411 else:
412 fileobj = SinglepartWriter(
413 bucket_id,
414 key_id,
415 client=client,
416 client_kwargs=client_kwargs,
417 writebuffer=writebuffer,
418 )
419 else:
420 assert False, 'unexpected mode: %r' % mode
422 fileobj.name = key_id
423 return fileobj
426def _get(client, bucket, key, version, range_string):
427 try:
428 params = dict(Bucket=bucket, Key=key)
429 if version:
430 params["VersionId"] = version
431 if range_string:
432 params["Range"] = range_string
434 return client.get_object(**params)
435 except botocore.client.ClientError as error:
436 wrapped_error = IOError(
437 'unable to access bucket: %r key: %r version: %r error: %s' % (
438 bucket, key, version, error
439 )
440 )
441 wrapped_error.backend_error = error
442 raise wrapped_error from error
445def _unwrap_ioerror(ioe):
446 """Given an IOError from _get, return the 'Error' dictionary from boto."""
447 try:
448 return ioe.backend_error.response['Error']
449 except (AttributeError, KeyError):
450 return None
453class _SeekableRawReader(object):
454 """Read an S3 object.
456 This class is internal to the S3 submodule.
457 """
459 def __init__(
460 self,
461 client,
462 bucket,
463 key,
464 version_id=None,
465 ):
466 self._client = client
467 self._bucket = bucket
468 self._key = key
469 self._version_id = version_id
471 self._content_length = None
472 self._position = 0
473 self._body = None
475 @property
476 def closed(self):
477 return self._body is None
479 def close(self):
480 if not self.closed:
481 self._body.close()
482 self._body = None
484 def seek(self, offset, whence=constants.WHENCE_START):
485 """Seek to the specified position.
487 :param int offset: The offset in bytes.
488 :param int whence: Where the offset is from.
490 :returns: the position after seeking.
491 :rtype: int
492 """
493 if whence not in constants.WHENCE_CHOICES:
494 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES)
496 #
497 # Close old body explicitly.
498 #
499 self.close()
501 start = None
502 stop = None
503 if whence == constants.WHENCE_START:
504 start = max(0, offset)
505 elif whence == constants.WHENCE_CURRENT:
506 start = max(0, offset + self._position)
507 else:
508 stop = max(0, -offset)
510 #
511 # If we can figure out that we've read past the EOF, then we can save
512 # an extra API call.
513 #
514 if self._content_length is None:
515 reached_eof = False
516 elif start is not None and start >= self._content_length:
517 reached_eof = True
518 elif stop == 0:
519 reached_eof = True
520 else:
521 reached_eof = False
523 if reached_eof:
524 self._body = io.BytesIO()
525 self._position = self._content_length
526 else:
527 self._open_body(start, stop)
529 return self._position
531 def _open_body(self, start=None, stop=None):
532 """Open a connection to download the specified range of bytes. Store
533 the open file handle in self._body.
535 If no range is specified, start defaults to self._position.
536 start and stop follow the semantics of the http range header,
537 so a stop without a start will read bytes beginning at stop.
539 As a side effect, set self._content_length. Set self._position
540 to self._content_length if start is past end of file.
541 """
542 if start is None and stop is None:
543 start = self._position
544 range_string = smart_open.utils.make_range_string(start, stop)
546 try:
547 # Optimistically try to fetch the requested content range.
548 response = _get(
549 self._client,
550 self._bucket,
551 self._key,
552 self._version_id,
553 range_string,
554 )
555 except IOError as ioe:
556 # Handle requested content range exceeding content size.
557 error_response = _unwrap_ioerror(ioe)
558 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE:
559 raise
561 actual_object_size = int(error_response.get('ActualObjectSize', 0))
562 if (
563 # empty file (==) or start is past end of file (>)
564 (start is not None and start >= actual_object_size)
565 # negative seek requested more bytes than file has
566 or (start is None and stop is not None and stop >= actual_object_size)
567 ):
568 self._position = self._content_length = actual_object_size
569 self._body = io.BytesIO()
570 else: # stop is past end of file: request the correct remainder instead
571 self._open_body(start=start, stop=actual_object_size - 1)
572 return
574 #
575 # Keep track of how many times boto3's built-in retry mechanism
576 # activated.
577 #
578 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response
579 #
580 logger.debug(
581 '%s: RetryAttempts: %d',
582 self,
583 response['ResponseMetadata']['RetryAttempts'],
584 )
585 #
586 # range request may not always return partial content, see:
587 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses
588 #
589 status_code = response['ResponseMetadata']['HTTPStatusCode']
590 if status_code == http.HTTPStatus.PARTIAL_CONTENT:
591 # 206 guarantees that the response body only contains the requested byte range
592 _, resp_start, _, length = smart_open.utils.parse_content_range(response['ContentRange'])
593 self._position = resp_start
594 self._content_length = length
595 self._body = response['Body']
596 elif status_code == http.HTTPStatus.OK:
597 # 200 guarantees the response body contains the full file (server ignored range header)
598 self._position = 0
599 self._content_length = response["ContentLength"]
600 self._body = response['Body']
601 #
602 # If we got a full request when we were actually expecting a range, we need to
603 # read some data to ensure that the body starts in the place that the caller expects
604 #
605 if start is not None:
606 expected_position = min(self._content_length, start)
607 elif start is None and stop is not None:
608 expected_position = max(0, self._content_length - stop)
609 else:
610 expected_position = 0
611 if expected_position > 0:
612 logger.debug(
613 '%s: discarding %d bytes to reach expected position',
614 self,
615 expected_position,
616 )
617 self._position = len(self._body.read(expected_position))
618 else:
619 raise ValueError("Unexpected status code %r" % status_code)
621 def read(self, size=-1):
622 """Read from the continuous connection with the remote peer."""
623 if self.closed:
624 # This is necessary for the very first read() after __init__().
625 self._open_body()
626 if self._position >= self._content_length:
627 return b''
629 #
630 # Boto3 has built-in error handling and retry mechanisms:
631 #
632 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
633 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
634 #
635 # Unfortunately, it isn't always enough. There is still a non-zero
636 # possibility that an exception will slip past these mechanisms and
637 # terminate the read prematurely. Luckily, at this stage, it's very
638 # simple to recover from the problem: wait a little bit, reopen the
639 # HTTP connection and try again. Usually, a single retry attempt is
640 # enough to recover, but we try multiple times "just in case".
641 #
642 for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1):
643 try:
644 if size == -1:
645 binary = self._body.read()
646 else:
647 binary = self._body.read(size)
648 except (
649 ConnectionResetError,
650 botocore.exceptions.BotoCoreError,
651 urllib3.exceptions.HTTPError,
652 ) as err:
653 logger.warning(
654 '%s: caught %r while reading %d bytes, sleeping %ds before retry',
655 self,
656 err,
657 size,
658 seconds,
659 )
660 time.sleep(seconds)
661 self._open_body()
662 else:
663 self._position += len(binary)
664 return binary
666 raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt))
668 def __str__(self):
669 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key)
672def _initialize_boto3(rw, client, client_kwargs, bucket, key):
673 """Created the required objects for accessing S3. Ideally, they have
674 been already created for us and we can just reuse them."""
675 if client_kwargs is None:
676 client_kwargs = {}
678 if client is None:
679 init_kwargs = client_kwargs.get('S3.Client', {})
680 if 'config' not in init_kwargs:
681 init_kwargs['config'] = botocore.client.Config(
682 max_pool_connections=64,
683 tcp_keepalive=True,
684 retries={"max_attempts": 6, "mode": "adaptive"}
685 )
686 # boto3.client re-uses the default session which is not thread-safe when this is called
687 # from within a thread. when using smart_open with multithreading, create a thread-safe
688 # client with the config above and share it between threads using transport_params
689 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111
690 client = boto3.client('s3', **init_kwargs)
691 assert client
693 rw._client = _ClientWrapper(client, client_kwargs)
694 rw._bucket = bucket
695 rw._key = key
698class Reader(io.BufferedIOBase):
699 """Reads bytes from S3.
701 Implements the io.BufferedIOBase interface of the standard library."""
703 def __init__(
704 self,
705 bucket,
706 key,
707 version_id=None,
708 buffer_size=DEFAULT_BUFFER_SIZE,
709 line_terminator=constants.BINARY_NEWLINE,
710 defer_seek=False,
711 client=None,
712 client_kwargs=None,
713 ):
714 self._version_id = version_id
715 self._buffer_size = buffer_size
717 _initialize_boto3(self, client, client_kwargs, bucket, key)
719 self._raw_reader = _SeekableRawReader(
720 self._client,
721 bucket,
722 key,
723 self._version_id,
724 )
725 self._current_pos = 0
726 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
727 self._eof = False
728 self._line_terminator = line_terminator
729 self._seek_initialized = False
731 #
732 # This member is part of the io.BufferedIOBase interface.
733 #
734 self.raw = None
736 if not defer_seek:
737 self.seek(0)
739 #
740 # io.BufferedIOBase methods.
741 #
743 def close(self):
744 """Flush and close this stream."""
745 logger.debug("close: called")
746 pass
748 def readable(self):
749 """Return True if the stream can be read from."""
750 return True
752 def read(self, size=-1):
753 """Read up to size bytes from the object and return them."""
754 if size == 0:
755 return b''
756 elif size < 0:
757 # call read() before setting _current_pos to make sure _content_length is set
758 out = self._read_from_buffer() + self._raw_reader.read()
759 self._current_pos = self._raw_reader._content_length
760 return out
762 #
763 # Return unused data first
764 #
765 if len(self._buffer) >= size:
766 return self._read_from_buffer(size)
768 #
769 # If the stream is finished, return what we have.
770 #
771 if self._eof:
772 return self._read_from_buffer()
774 self._fill_buffer(size)
775 return self._read_from_buffer(size)
777 def read1(self, size=-1):
778 """This is the same as read()."""
779 return self.read(size=size)
781 def readinto(self, b):
782 """Read up to len(b) bytes into b, and return the number of bytes
783 read."""
784 data = self.read(len(b))
785 if not data:
786 return 0
787 b[:len(data)] = data
788 return len(data)
790 def readline(self, limit=-1):
791 """Read up to and including the next newline. Returns the bytes read."""
792 if limit != -1:
793 raise NotImplementedError('limits other than -1 not implemented yet')
795 #
796 # A single line may span multiple buffers.
797 #
798 line = io.BytesIO()
799 while not (self._eof and len(self._buffer) == 0):
800 line_part = self._buffer.readline(self._line_terminator)
801 line.write(line_part)
802 self._current_pos += len(line_part)
804 if line_part.endswith(self._line_terminator):
805 break
806 else:
807 self._fill_buffer()
809 return line.getvalue()
811 def seekable(self):
812 """If False, seek(), tell() and truncate() will raise IOError.
814 We offer only seek support, and no truncate support."""
815 return True
817 def seek(self, offset, whence=constants.WHENCE_START):
818 """Seek to the specified position.
820 :param int offset: The offset in bytes.
821 :param int whence: Where the offset is from.
823 Returns the position after seeking."""
824 # Convert relative offset to absolute, since self._raw_reader
825 # doesn't know our current position.
826 if whence == constants.WHENCE_CURRENT:
827 whence = constants.WHENCE_START
828 offset += self._current_pos
830 # Check if we can satisfy seek from buffer
831 if whence == constants.WHENCE_START and offset > self._current_pos:
832 buffer_end = self._current_pos + len(self._buffer)
833 if offset <= buffer_end:
834 # Forward seek within buffered data - avoid S3 request
835 self._buffer.read(offset - self._current_pos)
836 self._current_pos = offset
837 return self._current_pos
839 if not self._seek_initialized or not (
840 whence == constants.WHENCE_START and offset == self._current_pos
841 ):
842 self._current_pos = self._raw_reader.seek(offset, whence)
843 self._buffer.empty()
845 self._eof = self._current_pos == self._raw_reader._content_length
847 self._seek_initialized = True
848 return self._current_pos
850 def tell(self):
851 """Return the current position within the file."""
852 return self._current_pos
854 def truncate(self, size=None):
855 """Unsupported."""
856 raise io.UnsupportedOperation
858 def detach(self):
859 """Unsupported."""
860 raise io.UnsupportedOperation
862 def terminate(self):
863 """Do nothing."""
864 pass
866 def to_boto3(self, resource):
867 """Create an **independent** `boto3.s3.Object` instance that points to
868 the same S3 object as this instance.
869 Changes to the returned object will not affect the current instance.
870 """
871 assert resource, 'resource must be a boto3.resource instance'
872 obj = resource.Object(self._bucket, self._key)
873 if self._version_id is not None:
874 return obj.Version(self._version_id)
875 else:
876 return obj
878 #
879 # Internal methods.
880 #
881 def _read_from_buffer(self, size=-1):
882 """Remove at most size bytes from our buffer and return them."""
883 size = size if size >= 0 else len(self._buffer)
884 part = self._buffer.read(size)
885 self._current_pos += len(part)
886 return part
888 def _fill_buffer(self, size=-1):
889 size = max(size, self._buffer._chunk_size)
890 while len(self._buffer) < size and not self._eof:
891 bytes_read = self._buffer.fill(self._raw_reader)
892 if bytes_read == 0:
893 logger.debug('%s: reached EOF while filling buffer', self)
894 self._eof = True
896 def __str__(self):
897 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key)
899 def __repr__(self):
900 return (
901 "smart_open.s3.Reader("
902 "bucket=%r, "
903 "key=%r, "
904 "version_id=%r, "
905 "buffer_size=%r, "
906 "line_terminator=%r)"
907 ) % (
908 self._bucket,
909 self._key,
910 self._version_id,
911 self._buffer_size,
912 self._line_terminator,
913 )
916class MultipartWriter(io.BufferedIOBase):
917 """Writes bytes to S3 using the multi part API.
919 Implements the io.BufferedIOBase interface of the standard library."""
920 _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called
922 def __init__(
923 self,
924 bucket,
925 key,
926 part_size=DEFAULT_PART_SIZE,
927 client=None,
928 client_kwargs=None,
929 writebuffer: io.BytesIO | None = None,
930 ):
931 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE)
932 if part_size != adjusted_ps:
933 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}")
934 part_size = adjusted_ps
935 self._part_size = part_size
937 _initialize_boto3(self, client, client_kwargs, bucket, key)
938 self._client: S3Client
939 self._bucket: str
940 self._key: str
942 try:
943 partial = functools.partial(
944 self._client.create_multipart_upload,
945 Bucket=bucket,
946 Key=key,
947 )
948 self._upload_id = RETRY._do(partial)['UploadId']
949 except botocore.client.ClientError as error:
950 raise ValueError(
951 'the bucket %r does not exist, or is forbidden for access (%r)' % (
952 bucket, error
953 )
954 ) from error
956 if writebuffer is None:
957 self._buf = io.BytesIO()
958 else:
959 self._buf = writebuffer
961 self._total_bytes = 0
962 self._total_parts = 0
963 self._parts: list[dict[str, object]] = []
965 #
966 # This member is part of the io.BufferedIOBase interface.
967 #
968 self.raw = None # type: ignore[assignment]
970 def flush(self):
971 pass
973 #
974 # Override some methods from io.IOBase.
975 #
976 def close(self):
977 logger.debug("close: called")
978 if self.closed:
979 return
981 if self._buf.tell():
982 self._upload_next_part()
984 logger.debug('%s: completing multipart upload', self)
985 if self._total_bytes and self._upload_id:
986 partial = functools.partial(
987 self._client.complete_multipart_upload,
988 Bucket=self._bucket,
989 Key=self._key,
990 UploadId=self._upload_id,
991 MultipartUpload={'Parts': self._parts},
992 )
993 RETRY._do(partial)
994 logger.debug('%s: completed multipart upload', self)
995 elif self._upload_id:
996 #
997 # AWS complains with "The XML you provided was not well-formed or
998 # did not validate against our published schema" when the input is
999 # completely empty => abort the upload, no file created.
1000 #
1001 # We work around this by creating an empty file explicitly.
1002 #
1003 self._client.abort_multipart_upload(
1004 Bucket=self._bucket,
1005 Key=self._key,
1006 UploadId=self._upload_id,
1007 )
1008 self._client.put_object(
1009 Bucket=self._bucket,
1010 Key=self._key,
1011 Body=b'',
1012 )
1013 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self)
1014 self._upload_id = None
1016 @property
1017 def closed(self):
1018 return self._upload_id is None
1020 def writable(self):
1021 """Return True if the stream supports writing."""
1022 return True
1024 def seekable(self):
1025 """If False, seek(), tell() and truncate() will raise IOError.
1027 We offer only tell support, and no seek or truncate support."""
1028 return True
1030 def seek(self, offset, whence=constants.WHENCE_START):
1031 """Unsupported."""
1032 raise io.UnsupportedOperation
1034 def truncate(self, size=None):
1035 """Unsupported."""
1036 raise io.UnsupportedOperation
1038 def tell(self):
1039 """Return the current stream position."""
1040 return self._total_bytes
1042 #
1043 # io.BufferedIOBase methods.
1044 #
1045 def detach(self):
1046 raise io.UnsupportedOperation("detach() not supported")
1048 def write(self, b: Buffer) -> int:
1049 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1050 interface implementation) to the S3 file.
1052 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html
1054 There's buffering happening under the covers, so this may not actually
1055 do any HTTP transfer right away."""
1056 offset = 0
1057 mv = memoryview(b)
1058 self._total_bytes += len(mv)
1060 #
1061 # botocore does not accept memoryview, otherwise we could've gotten
1062 # away with not needing to write a copy to the buffer aside from cases
1063 # where b is smaller than part_size
1064 #
1065 while offset < len(mv):
1066 start = offset
1067 end = offset + self._part_size - self._buf.tell()
1068 self._buf.write(mv[start:end])
1069 if self._buf.tell() < self._part_size:
1070 #
1071 # Not enough data to write a new part just yet. The assert
1072 # ensures that we've consumed all of the input buffer.
1073 #
1074 assert end >= len(mv)
1075 return len(mv)
1077 self._upload_next_part()
1078 offset = end
1079 return len(mv)
1081 def terminate(self):
1082 """Cancel the underlying multipart upload."""
1083 if self.closed:
1084 return
1085 logger.debug('%s: terminating multipart upload', self)
1086 self._client.abort_multipart_upload(
1087 Bucket=self._bucket,
1088 Key=self._key,
1089 UploadId=self._upload_id,
1090 )
1091 self._upload_id = None
1092 logger.debug('%s: terminated multipart upload', self)
1094 def to_boto3(self, resource):
1095 """Create an **independent** `boto3.s3.Object` instance that points to
1096 the same S3 object as this instance.
1097 Changes to the returned object will not affect the current instance.
1098 """
1099 assert resource, 'resource must be a boto3.resource instance'
1100 return resource.Object(self._bucket, self._key)
1102 #
1103 # Internal methods.
1104 #
1105 def _upload_next_part(self) -> None:
1106 part_num = self._total_parts + 1
1107 logger.info(
1108 "%s: uploading part_num: %i, %i bytes (total %.3fGB)",
1109 self,
1110 part_num,
1111 self._buf.tell(),
1112 self._total_bytes / 1024.0 ** 3,
1113 )
1114 self._buf.seek(0)
1116 #
1117 # Network problems in the middle of an upload are particularly
1118 # troublesome. We don't want to abort the entire upload just because
1119 # of a temporary connection problem, so this part needs to be
1120 # especially robust.
1121 #
1122 upload = RETRY._do(
1123 functools.partial(
1124 self._client.upload_part,
1125 Bucket=self._bucket,
1126 Key=self._key,
1127 UploadId=self._upload_id,
1128 PartNumber=part_num,
1129 Body=self._buf,
1130 )
1131 )
1133 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num})
1134 logger.debug("%s: upload of part_num #%i finished", self, part_num)
1136 self._total_parts += 1
1138 self._buf.seek(0)
1139 self._buf.truncate(0)
1141 def __enter__(self):
1142 return self
1144 def __exit__(self, exc_type, exc_val, exc_tb):
1145 if exc_type is not None:
1146 self.terminate()
1147 else:
1148 self.close()
1150 def __str__(self):
1151 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key)
1153 def __repr__(self):
1154 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % (
1155 self._bucket,
1156 self._key,
1157 self._part_size,
1158 )
1161class SinglepartWriter(io.BufferedIOBase):
1162 """Writes bytes to S3 using the single part API.
1164 Implements the io.BufferedIOBase interface of the standard library.
1166 This class buffers all of its input in memory until its `close` method is called. Only then will
1167 the data be written to S3 and the buffer is released."""
1168 _buf = None # so `closed` property works in case __init__ fails and __del__ is called
1170 def __init__(
1171 self,
1172 bucket,
1173 key,
1174 client=None,
1175 client_kwargs=None,
1176 writebuffer=None,
1177 ):
1178 _initialize_boto3(self, client, client_kwargs, bucket, key)
1180 if writebuffer is None:
1181 self._buf = io.BytesIO()
1182 elif not writebuffer.seekable():
1183 raise ValueError('writebuffer needs to be seekable')
1184 else:
1185 self._buf = writebuffer
1187 def flush(self):
1188 pass
1190 #
1191 # Override some methods from io.IOBase.
1192 #
1193 def close(self):
1194 logger.debug("close: called")
1195 if self.closed:
1196 return
1198 self.seek(0)
1200 try:
1201 self._client.put_object(
1202 Bucket=self._bucket,
1203 Key=self._key,
1204 Body=self._buf,
1205 )
1206 except botocore.client.ClientError as e:
1207 raise ValueError(
1208 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e
1210 logger.debug("%s: direct upload finished", self)
1211 self._buf.close()
1213 @property
1214 def closed(self):
1215 return self._buf is None or self._buf.closed
1217 def readable(self):
1218 """Propagate."""
1219 return self._buf.readable()
1221 def writable(self):
1222 """Propagate."""
1223 return self._buf.writable()
1225 def seekable(self):
1226 """Propagate."""
1227 return self._buf.seekable()
1229 def seek(self, offset, whence=constants.WHENCE_START):
1230 """Propagate."""
1231 return self._buf.seek(offset, whence)
1233 def truncate(self, size=None):
1234 """Propagate."""
1235 return self._buf.truncate(size)
1237 def tell(self):
1238 """Propagate."""
1239 return self._buf.tell()
1241 def write(self, b):
1242 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1243 interface implementation) into the buffer. Content of the buffer will be
1244 written to S3 on close as a single-part upload.
1246 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html"""
1247 return self._buf.write(b)
1249 def read(self, size=-1):
1250 """Propagate."""
1251 return self._buf.read(size)
1253 def read1(self, size=-1):
1254 """Propagate."""
1255 return self._buf.read1(size)
1257 def terminate(self):
1258 """Close buffer and skip upload."""
1259 self._buf.close()
1260 logger.debug('%s: terminated singlepart upload', self)
1262 #
1263 # Internal methods.
1264 #
1265 def __enter__(self):
1266 return self
1268 def __exit__(self, exc_type, exc_val, exc_tb):
1269 if exc_type is not None:
1270 self.terminate()
1271 else:
1272 self.close()
1274 def __str__(self):
1275 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key)
1277 def __repr__(self):
1278 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key)
1281def _accept_all(key):
1282 return True
1285def iter_bucket(
1286 bucket_name,
1287 prefix='',
1288 accept_key=None,
1289 key_limit=None,
1290 workers=16,
1291 retries=3,
1292 **session_kwargs):
1293 """
1294 Iterate and download all S3 objects under `s3://bucket_name/prefix`.
1296 Parameters
1297 ----------
1298 bucket_name: str
1299 The name of the bucket.
1300 prefix: str, optional
1301 Limits the iteration to keys starting with the prefix.
1302 accept_key: callable, optional
1303 This is a function that accepts a key name (unicode string) and
1304 returns True/False, signalling whether the given key should be downloaded.
1305 The default behavior is to accept all keys.
1306 key_limit: int, optional
1307 If specified, the iterator will stop after yielding this many results.
1308 workers: int, optional
1309 The number of subprocesses to use.
1310 retries: int, optional
1311 The number of time to retry a failed download.
1312 session_kwargs: dict, optional
1313 Keyword arguments to pass when creating a new session.
1314 For a list of available names and values, see:
1315 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session
1318 Yields
1319 ------
1320 str
1321 The full key name (does not include the bucket name).
1322 bytes
1323 The full contents of the key.
1325 Notes
1326 -----
1327 The keys are processed in parallel, using `workers` processes (default: 16),
1328 to speed up downloads greatly. If multiprocessing is not available, thus
1329 _MULTIPROCESSING is False, this parameter will be ignored.
1331 Examples
1332 --------
1334 >>> # get all JSON files under "mybucket/foo/"
1335 >>> for key, content in iter_bucket(
1336 ... bucket_name, prefix='foo/',
1337 ... accept_key=lambda key: key.endswith('.json')):
1338 ... print key, len(content)
1340 >>> # limit to 10k files, using 32 parallel workers (default is 16)
1341 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32):
1342 ... print key, len(content)
1343 """
1344 if accept_key is None:
1345 accept_key = _accept_all
1347 #
1348 # If people insist on giving us bucket instances, silently extract the name
1349 # before moving on. Works for boto3 as well as boto.
1350 #
1351 try:
1352 bucket_name = bucket_name.name
1353 except AttributeError:
1354 pass
1356 total_size, key_no = 0, -1
1357 key_iterator = _list_bucket(
1358 bucket_name,
1359 prefix=prefix,
1360 accept_key=accept_key,
1361 **session_kwargs)
1362 download_key = functools.partial(
1363 _download_key,
1364 bucket_name=bucket_name,
1365 retries=retries,
1366 **session_kwargs)
1368 with smart_open.concurrency.create_pool(processes=workers) as pool:
1369 result_iterator = pool.imap_unordered(download_key, key_iterator)
1370 key_no = 0
1371 while True:
1372 try:
1373 (key, content) = result_iterator.__next__()
1374 if key_no % 1000 == 0:
1375 logger.info(
1376 "yielding key #%i: %s, size %i (total %.1fMB)",
1377 key_no, key, len(content), total_size / 1024.0 ** 2
1378 )
1379 yield key, content
1380 total_size += len(content)
1381 if key_limit is not None and key_no + 1 >= key_limit:
1382 # we were asked to output only a limited number of keys => we're done
1383 break
1384 except botocore.exceptions.ClientError as err:
1385 #
1386 # ignore 404 not found errors: they mean the object was deleted
1387 # after we listed the contents of the bucket, but before we
1388 # downloaded the object.
1389 #
1390 if not ('Error' in err.response and err.response['Error'].get('Code') == '404'):
1391 raise err
1392 except StopIteration:
1393 break
1394 key_no += 1
1395 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))
1398def _list_bucket(
1399 bucket_name,
1400 prefix='',
1401 accept_key=lambda k: True,
1402 **session_kwargs):
1403 session = boto3.session.Session(**session_kwargs)
1404 client = session.client('s3')
1405 ctoken = None
1407 while True:
1408 # list_objects_v2 doesn't like a None value for ContinuationToken
1409 # so we don't set it if we don't have one.
1410 if ctoken:
1411 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken)
1412 else:
1413 kwargs = dict(Bucket=bucket_name, Prefix=prefix)
1414 response = client.list_objects_v2(**kwargs)
1415 try:
1416 content = response['Contents']
1417 except KeyError:
1418 pass
1419 else:
1420 for c in content:
1421 key = c['Key']
1422 if accept_key(key):
1423 yield key
1424 ctoken = response.get('NextContinuationToken', None)
1425 if not ctoken:
1426 break
1429def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs):
1430 if bucket_name is None:
1431 raise ValueError('bucket_name may not be None')
1433 #
1434 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-or-multiprocessing-with-resources
1435 #
1436 session = boto3.session.Session(**session_kwargs)
1437 s3 = session.resource('s3')
1438 bucket = s3.Bucket(bucket_name)
1440 # Sometimes, https://github.com/boto/boto/issues/2409 can happen
1441 # because of network issues on either side.
1442 # Retry up to 3 times to ensure its not a transient issue.
1443 for x in range(retries + 1):
1444 try:
1445 content_bytes = _download_fileobj(bucket, key_name)
1446 except botocore.client.ClientError:
1447 # Actually fail on last pass through the loop
1448 if x == retries:
1449 raise
1450 # Otherwise, try again, as this might be a transient timeout
1451 pass
1452 else:
1453 return key_name, content_bytes
1456def _download_fileobj(bucket, key_name):
1457 #
1458 # This is a separate function only because it makes it easier to inject
1459 # exceptions during tests.
1460 #
1461 buf = io.BytesIO()
1462 bucket.download_fileobj(key_name, buf)
1463 return buf.getvalue()