Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/s3.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8"""Implements file-like objects for reading and writing from/to AWS S3."""
9from __future__ import annotations
11import http
12import io
13import functools
14import logging
15import time
16import warnings
17from math import inf
19from typing import (
20 Callable,
21 List,
22 TYPE_CHECKING,
23)
25try:
26 import boto3
27 import botocore.client
28 import botocore.exceptions
29 import urllib3.exceptions
30except ImportError:
31 MISSING_DEPS = True
33import smart_open.bytebuffer
34import smart_open.concurrency
35import smart_open.utils
37from smart_open import constants
40if TYPE_CHECKING:
41 from mypy_boto3_s3.client import S3Client
42 from typing_extensions import Buffer
44logger = logging.getLogger(__name__)
46#
47# AWS puts restrictions on the part size for multipart uploads.
48# Each part must be more than 5MB, and less than 5GB.
49#
50# On top of that, our MultipartWriter has a min_part_size option.
51# In retrospect, it's an unfortunate name, because it conflicts with the
52# minimum allowable part size (5MB), but it's too late to change it, because
53# people are using that parameter (unlike the MIN, DEFAULT, MAX constants).
54# It really just means "part size": as soon as you have this many bytes,
55# write a part to S3 (see the MultipartWriter.write method).
56#
58MIN_PART_SIZE = 5 * 1024 ** 2
59"""The absolute minimum permitted by Amazon."""
61DEFAULT_PART_SIZE = 50 * 1024**2
62"""The default part size for S3 multipart uploads, chosen carefully by smart_open"""
64MAX_PART_SIZE = 5 * 1024 ** 3
65"""The absolute maximum permitted by Amazon."""
67SCHEMES = ("s3", "s3n", 's3u', "s3a")
68DEFAULT_PORT = 443
69DEFAULT_HOST = 's3.amazonaws.com'
71DEFAULT_BUFFER_SIZE = 128 * 1024
73URI_EXAMPLES = (
74 's3://my_bucket/my_key',
75 's3://my_key:my_secret@my_bucket/my_key',
76 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key',
77)
79# Returned by AWS when we try to seek beyond EOF.
80_OUT_OF_RANGE = 'InvalidRange'
83class Retry:
84 def __init__(self):
85 self.attempts: int = 6
86 self.sleep_seconds: int = 10
87 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError]
88 self.client_error_codes: List[str] = ['NoSuchUpload']
90 def _do(self, fn: Callable):
91 for attempt in range(self.attempts):
92 try:
93 return fn()
94 except tuple(self.exceptions) as err:
95 logger.critical(
96 'Caught non-fatal %s, retrying %d more times',
97 err,
98 self.attempts - attempt - 1,
99 )
100 logger.exception(err)
101 time.sleep(self.sleep_seconds)
102 except botocore.exceptions.ClientError as err:
103 error_code = err.response['Error'].get('Code')
104 if error_code not in self.client_error_codes:
105 raise
106 logger.critical(
107 'Caught non-fatal ClientError (%s), retrying %d more times',
108 error_code,
109 self.attempts - attempt - 1,
110 )
111 logger.exception(err)
112 time.sleep(self.sleep_seconds)
113 else:
114 logger.critical('encountered too many non-fatal errors, giving up')
115 raise IOError('%s failed after %d attempts', fn.func, self.attempts)
118#
119# The retry mechanism for this submodule. Client code may modify it, e.g. by
120# updating RETRY.sleep_seconds and friends.
121#
122if 'MISSING_DEPS' not in locals():
123 RETRY = Retry()
126class _ClientWrapper:
127 """Wraps a client to inject the appropriate keyword args into each method call.
129 The keyword args are a dictionary keyed by the fully qualified method name.
130 For example, S3.Client.create_multipart_upload.
132 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client
134 This wrapper behaves identically to the client otherwise.
135 """
136 def __init__(self, client, kwargs):
137 self.client = client
138 self.kwargs = kwargs
140 def __getattr__(self, method_name):
141 method = getattr(self.client, method_name)
142 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {})
143 return functools.partial(method, **kwargs)
146def parse_uri(uri_as_string):
147 #
148 # Restrictions on bucket names and labels:
149 #
150 # - Bucket names must be at least 3 and no more than 63 characters long.
151 # - Bucket names must be a series of one or more labels.
152 # - Adjacent labels are separated by a single period (.).
153 # - Bucket names can contain lowercase letters, numbers, and hyphens.
154 # - Each label must start and end with a lowercase letter or a number.
155 #
156 # We use the above as a guide only, and do not perform any validation. We
157 # let boto3 take care of that for us.
158 #
159 split_uri = smart_open.utils.safe_urlsplit(uri_as_string)
160 assert split_uri.scheme in SCHEMES
162 port = DEFAULT_PORT
163 host = DEFAULT_HOST
164 ordinary_calling_format = False
165 #
166 # These defaults tell boto3 to look for credentials elsewhere
167 #
168 access_id, access_secret = None, None
170 #
171 # Common URI template [secret:key@][host[:port]@]bucket/object
172 #
173 # The urlparse function doesn't handle the above schema, so we have to do
174 # it ourselves.
175 #
176 uri = split_uri.netloc + split_uri.path
178 #
179 # Attempt to extract edge-case authentication details from the URL.
180 #
181 # See:
182 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/
183 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases
184 #
185 if '@' in uri:
186 maybe_auth, rest = uri.split('@', 1)
187 if ':' in maybe_auth:
188 maybe_id, maybe_secret = maybe_auth.split(':', 1)
189 if '/' not in maybe_id:
190 access_id, access_secret = maybe_id, maybe_secret
191 uri = rest
193 head, key_id = uri.split('/', 1)
194 if '@' in head and ':' in head:
195 ordinary_calling_format = True
196 host_port, bucket_id = head.split('@')
197 host, port = host_port.split(':', 1)
198 port = int(port)
199 elif '@' in head:
200 ordinary_calling_format = True
201 host, bucket_id = head.split('@')
202 else:
203 bucket_id = head
205 return dict(
206 scheme=split_uri.scheme,
207 bucket_id=bucket_id,
208 key_id=key_id,
209 port=port,
210 host=host,
211 ordinary_calling_format=ordinary_calling_format,
212 access_id=access_id,
213 access_secret=access_secret,
214 )
217def _consolidate_params(uri, transport_params):
218 """Consolidates the parsed Uri with the additional parameters.
220 This is necessary because the user can pass some of the parameters can in
221 two different ways:
223 1) Via the URI itself
224 2) Via the transport parameters
226 These are not mutually exclusive, but we have to pick one over the other
227 in a sensible way in order to proceed.
229 """
230 transport_params = dict(transport_params)
232 def inject(**kwargs):
233 try:
234 client_kwargs = transport_params['client_kwargs']
235 except KeyError:
236 client_kwargs = transport_params['client_kwargs'] = {}
238 try:
239 init_kwargs = client_kwargs['S3.Client']
240 except KeyError:
241 init_kwargs = client_kwargs['S3.Client'] = {}
243 init_kwargs.update(**kwargs)
245 client = transport_params.get('client')
246 if client is not None and (uri['access_id'] or uri['access_secret']):
247 logger.warning(
248 'ignoring credentials parsed from URL because they conflict with '
249 'transport_params["client"]. Set transport_params["client"] to None '
250 'to suppress this warning.'
251 )
252 uri.update(access_id=None, access_secret=None)
253 elif (uri['access_id'] and uri['access_secret']):
254 inject(
255 aws_access_key_id=uri['access_id'],
256 aws_secret_access_key=uri['access_secret'],
257 )
258 uri.update(access_id=None, access_secret=None)
260 if client is not None and uri['host'] != DEFAULT_HOST:
261 logger.warning(
262 'ignoring endpoint_url parsed from URL because they conflict with '
263 'transport_params["client"]. Set transport_params["client"] to None '
264 'to suppress this warning.'
265 )
266 uri.update(host=None)
267 elif uri['host'] != DEFAULT_HOST:
268 if uri['scheme'] == 's3u':
269 scheme = 'http'
270 else:
271 scheme = 'https'
272 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri)
273 uri.update(host=None)
275 return uri, transport_params
278def open_uri(uri, mode, transport_params):
279 deprecated = (
280 'multipart_upload_kwargs',
281 'object_kwargs',
282 'resource',
283 'resource_kwargs',
284 'session',
285 'singlepart_upload_kwargs',
286 )
287 detected = [k for k in deprecated if k in transport_params]
288 if detected:
289 doc_url = (
290 'https://github.com/piskvorky/smart_open/blob/develop/'
291 'MIGRATING_FROM_OLDER_VERSIONS.rst'
292 )
293 #
294 # We use warnings.warn /w UserWarning instead of logger.warn here because
295 #
296 # 1) Not everyone has logging enabled; and
297 # 2) check_kwargs (below) already uses logger.warn with a similar message
298 #
299 # https://github.com/piskvorky/smart_open/issues/614
300 #
301 message = (
302 'ignoring the following deprecated transport parameters: %r. '
303 'See <%s> for details' % (detected, doc_url)
304 )
305 warnings.warn(message, UserWarning)
306 parsed_uri = parse_uri(uri)
307 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params)
308 kwargs = smart_open.utils.check_kwargs(open, transport_params)
309 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs)
312def open(
313 bucket_id,
314 key_id,
315 mode,
316 version_id=None,
317 buffer_size=DEFAULT_BUFFER_SIZE,
318 min_part_size=DEFAULT_PART_SIZE,
319 multipart_upload=True,
320 defer_seek=False,
321 client=None,
322 client_kwargs=None,
323 writebuffer=None,
324 range_chunk_size=None,
325):
326 """Open an S3 object for reading or writing.
328 Parameters
329 ----------
330 bucket_id: str
331 The name of the bucket this object resides in.
332 key_id: str
333 The name of the key within the bucket.
334 mode: str
335 The mode for opening the object. Must be either "rb" or "wb".
336 buffer_size: int, optional
337 Default: 128KB
338 The buffer size in bytes for reading. Controls memory usage. Data is streamed
339 from a S3 network stream in buffer_size chunks. Forward seeks within
340 the current buffer are satisfied without additional GET requests. Backward
341 seeks always open a new GET request. For forward seek-intensive workloads,
342 increase buffer_size to reduce GET requests at the cost of higher memory usage.
343 min_part_size: int, optional
344 The minimum part size for multipart uploads, in bytes.
345 When the writebuffer contains this many bytes, smart_open will upload
346 the bytes to S3 as a single part of a multi-part upload, freeing the
347 buffer either partially or entirely. When you close the writer, it
348 will assemble the parts together.
349 The value determines the upper limit for the writebuffer. If buffer
350 space is short (e.g. you are buffering to memory), then use a smaller
351 value for min_part_size, or consider buffering to disk instead (see
352 the writebuffer option).
353 The value must be between 5MB and 5GB. If you specify a value outside
354 of this range, smart_open will adjust it for you, because otherwise the
355 upload _will_ fail.
356 For writing only. Does not apply if you set multipart_upload=False.
357 multipart_upload: bool, optional
358 Default: `True`
359 If set to `True`, will use multipart upload for writing to S3. If set
360 to `False`, S3 upload will use the S3 Single-Part Upload API, which
361 is more ideal for small file sizes.
362 For writing only.
363 version_id: str, optional
364 Version of the object, used when reading object.
365 If None, will fetch the most recent version.
366 defer_seek: boolean, optional
367 Default: `False`
368 If set to `True` on a file opened for reading, GetObject will not be
369 called until the first seek() or read().
370 Avoids redundant API queries when seeking before reading.
371 range_chunk_size: int, optional
372 Default: `None`
373 Maximum byte range per S3 GET request when reading.
374 When None (default), a single GET request is made for the entire file,
375 and data is streamed from that single botocore.response.StreamingBody
376 in buffer_size chunks.
377 When set to a positive integer, multiple GET requests are made, each
378 limited to at most this many bytes via HTTP Range headers. Each GET
379 returns a new StreamingBody that is streamed in buffer_size chunks.
380 Useful for reading small portions of large files without forcing
381 S3-compatible systems like SeaweedFS/Ceph to load the entire file.
382 Larger values mean fewer billable GET requests but higher load on S3
383 servers. Smaller values mean more GET requests but less server load per request.
384 Values larger than the file size result in a single GET for the whole file.
385 Affects reading only. Does not affect memory usage (controlled by buffer_size).
386 client: object, optional
387 The S3 client to use when working with boto3.
388 If you don't specify this, then smart_open will create a new client for you.
389 client_kwargs: dict, optional
390 Additional parameters to pass to the relevant functions of the client.
391 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`.
392 The values are kwargs to pass to that method each time it is called.
393 writebuffer: IO[bytes], optional
394 By default, this module will buffer data in memory using io.BytesIO
395 when writing. Pass another binary IO instance here to use it instead.
396 For example, you may pass a file object to buffer to local disk instead
397 of in RAM. Use this to keep RAM usage low at the expense of additional
398 disk IO. If you pass in an open file, then you are responsible for
399 cleaning it up after writing completes.
400 """
401 logger.debug('%r', locals())
402 if mode not in constants.BINARY_MODES:
403 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
405 if (mode == constants.WRITE_BINARY) and (version_id is not None):
406 raise ValueError("version_id must be None when writing")
408 if mode == constants.READ_BINARY:
409 fileobj = Reader(
410 bucket_id,
411 key_id,
412 version_id=version_id,
413 buffer_size=buffer_size,
414 defer_seek=defer_seek,
415 client=client,
416 client_kwargs=client_kwargs,
417 range_chunk_size=range_chunk_size,
418 )
419 elif mode == constants.WRITE_BINARY:
420 if multipart_upload:
421 fileobj = MultipartWriter(
422 bucket_id,
423 key_id,
424 client=client,
425 client_kwargs=client_kwargs,
426 writebuffer=writebuffer,
427 part_size=min_part_size,
428 )
429 else:
430 fileobj = SinglepartWriter(
431 bucket_id,
432 key_id,
433 client=client,
434 client_kwargs=client_kwargs,
435 writebuffer=writebuffer,
436 )
437 else:
438 assert False, 'unexpected mode: %r' % mode
440 fileobj.name = key_id
441 return fileobj
444def _get(client, bucket, key, version, range_string):
445 try:
446 params = dict(Bucket=bucket, Key=key)
447 if version:
448 params["VersionId"] = version
449 if range_string:
450 params["Range"] = range_string
452 return client.get_object(**params)
453 except botocore.client.ClientError as error:
454 wrapped_error = IOError(
455 'unable to access bucket: %r key: %r version: %r error: %s' % (
456 bucket, key, version, error
457 )
458 )
459 wrapped_error.backend_error = error
460 raise wrapped_error from error
463def _unwrap_ioerror(ioe):
464 """Given an IOError from _get, return the 'Error' dictionary from boto."""
465 try:
466 return ioe.backend_error.response['Error']
467 except (AttributeError, KeyError):
468 return None
471class _SeekableRawReader(object):
472 """Read an S3 object.
474 This class is internal to the S3 submodule.
475 """
477 def __init__(
478 self,
479 client,
480 bucket,
481 key,
482 version_id=None,
483 range_chunk_size=None,
484 ):
485 self._client = client
486 self._bucket = bucket
487 self._key = key
488 self._version_id = version_id
489 self._range_chunk_size = range_chunk_size
491 self._content_length = None
492 self._position = 0
493 self._body = None
495 @property
496 def closed(self):
497 return self._body is None
499 def close(self):
500 if not self.closed:
501 self._body.close()
502 self._body = None
504 def seek(self, offset, whence=constants.WHENCE_START):
505 """Seek to the specified position.
507 :param int offset: The offset in bytes.
508 :param int whence: Where the offset is from.
510 :returns: the position after seeking.
511 :rtype: int
512 """
513 if whence not in constants.WHENCE_CHOICES:
514 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES)
516 #
517 # Close old body explicitly.
518 #
519 self.close()
521 start = None
522 stop = None
523 if whence == constants.WHENCE_START:
524 start = max(0, offset)
525 elif whence == constants.WHENCE_CURRENT:
526 start = max(0, offset + self._position)
527 else:
528 stop = max(0, -offset)
530 #
531 # If we can figure out that we've read past the EOF, then we can save
532 # an extra API call.
533 #
534 if self._content_length is None:
535 reached_eof = False
536 elif start is not None and start >= self._content_length:
537 reached_eof = True
538 elif stop == 0:
539 reached_eof = True
540 else:
541 reached_eof = False
543 if reached_eof:
544 self._body = io.BytesIO()
545 self._position = self._content_length
546 else:
547 self._open_body(start, stop)
549 return self._position
551 def _open_body(self, start=None, stop=None):
552 """Open a connection to download the specified range of bytes. Store
553 the open file handle in self._body.
555 If no range is specified, start defaults to self._position.
556 start and stop follow the semantics of the http range header,
557 so a stop without a start will read bytes beginning at stop.
559 If self._range_chunk_size is set, the S3 server is protected from open range
560 headers and stop will be set such that at most self._range_chunk_size bytes
561 are returned in a single GET request.
563 As a side effect, set self._content_length. Set self._position
564 to self._content_length if start is past end of file.
565 """
566 if start is None and stop is None:
567 start = self._position
569 # Apply chunking: limit the stop position if range_chunk_size is set
570 if stop is None and self._range_chunk_size is not None:
571 stop = start + self._range_chunk_size - 1
572 # Don't request beyond known content length
573 if self._content_length is not None:
574 stop = min(stop, self._content_length - 1)
576 range_string = smart_open.utils.make_range_string(start, stop)
578 try:
579 # Optimistically try to fetch the requested content range.
580 response = _get(
581 self._client,
582 self._bucket,
583 self._key,
584 self._version_id,
585 range_string,
586 )
587 except IOError as ioe:
588 # Handle requested content range exceeding content size.
589 error_response = _unwrap_ioerror(ioe)
590 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE:
591 raise
593 actual_object_size = int(error_response.get('ActualObjectSize', 0))
594 if (
595 # empty file (==) or start is past end of file (>)
596 (start is not None and start >= actual_object_size)
597 # negative seek requested more bytes than file has
598 or (start is None and stop is not None and stop >= actual_object_size)
599 ):
600 self._position = self._content_length = actual_object_size
601 self._body = io.BytesIO()
602 else: # stop is past end of file: request the correct remainder instead
603 self._open_body(start=start, stop=actual_object_size - 1)
604 return
606 #
607 # Keep track of how many times boto3's built-in retry mechanism
608 # activated.
609 #
610 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response
611 #
612 logger.debug(
613 '%s: RetryAttempts: %d',
614 self,
615 response['ResponseMetadata']['RetryAttempts'],
616 )
617 #
618 # range request may not always return partial content, see:
619 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses
620 #
621 status_code = response['ResponseMetadata']['HTTPStatusCode']
622 if status_code == http.HTTPStatus.PARTIAL_CONTENT:
623 # 206 guarantees that the response body only contains the requested byte range
624 _, resp_start, _, length = smart_open.utils.parse_content_range(response['ContentRange'])
625 self._position = resp_start
626 self._content_length = length
627 self._body = response['Body']
628 elif status_code == http.HTTPStatus.OK:
629 # 200 guarantees the response body contains the full file (server ignored range header)
630 self._position = 0
631 self._content_length = response["ContentLength"]
632 self._body = response['Body']
633 #
634 # If we got a full request when we were actually expecting a range, we need to
635 # read some data to ensure that the body starts in the place that the caller expects
636 #
637 if start is not None:
638 expected_position = min(self._content_length, start)
639 elif start is None and stop is not None:
640 expected_position = max(0, self._content_length - stop)
641 else:
642 expected_position = 0
643 if expected_position > 0:
644 logger.debug(
645 '%s: discarding %d bytes to reach expected position',
646 self,
647 expected_position,
648 )
649 self._position = len(self._body.read(expected_position))
650 else:
651 raise ValueError("Unexpected status code %r" % status_code)
653 def read(self, size=-1):
654 """Read from the continuous connection with the remote peer."""
655 if size < -1:
656 raise ValueError(f'size must be >= -1, got {size}')
658 if size == -1:
659 size = inf # makes for a simple while-condition below
661 binary_collected = io.BytesIO()
663 #
664 # Boto3 has built-in error handling and retry mechanisms:
665 #
666 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
667 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
668 #
669 # Unfortunately, it isn't always enough. There is still a non-zero
670 # possibility that an exception will slip past these mechanisms and
671 # terminate the read prematurely. Luckily, at this stage, it's very
672 # simple to recover from the problem: wait a little bit, reopen the
673 # HTTP connection and try again. Usually, a single retry attempt is
674 # enough to recover, but we try multiple times "just in case".
675 #
676 def retry_read(attempts=(1, 2, 4, 8, 16)) -> bytes:
677 for seconds in attempts:
678 if self.closed:
679 self._open_body()
680 try:
681 if size == inf:
682 return self._body.read()
683 return self._body.read(size - binary_collected.tell())
684 except (
685 ConnectionResetError,
686 botocore.exceptions.BotoCoreError,
687 urllib3.exceptions.HTTPError,
688 ) as err:
689 logger.warning(
690 '%s: caught %r while reading %d bytes, sleeping %ds before retry',
691 self,
692 err,
693 -1 if size == inf else size,
694 seconds,
695 )
696 self.close()
697 time.sleep(seconds)
698 raise IOError(
699 '%s: failed to read %d bytes after %d attempts' %
700 (self, -1 if size == inf else size, len(attempts)),
701 )
703 while (
704 self._content_length is None # very first read call
705 or (
706 self._position < self._content_length # not yet end of file
707 and binary_collected.tell() < size # not yet read enough
708 )
709 ):
710 binary = retry_read()
711 self._position += len(binary)
712 binary_collected.write(binary)
713 if not binary: # end of stream
714 self.close()
716 return binary_collected.getvalue()
718 def __str__(self):
719 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key)
722def _initialize_boto3(rw, client, client_kwargs, bucket, key):
723 """Created the required objects for accessing S3. Ideally, they have
724 been already created for us and we can just reuse them."""
725 if client_kwargs is None:
726 client_kwargs = {}
728 if client is None:
729 init_kwargs = client_kwargs.get('S3.Client', {})
730 if 'config' not in init_kwargs:
731 init_kwargs['config'] = botocore.client.Config(
732 max_pool_connections=64,
733 tcp_keepalive=True,
734 retries={"max_attempts": 6, "mode": "adaptive"}
735 )
736 # boto3.client re-uses the default session which is not thread-safe when this is called
737 # from within a thread. when using smart_open with multithreading, create a thread-safe
738 # client with the config above and share it between threads using transport_params
739 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111
740 client = boto3.client('s3', **init_kwargs)
741 assert client
743 rw._client = _ClientWrapper(client, client_kwargs)
744 rw._bucket = bucket
745 rw._key = key
748class Reader(io.BufferedIOBase):
749 """Reads bytes from S3.
751 Implements the io.BufferedIOBase interface of the standard library."""
753 def __init__(
754 self,
755 bucket,
756 key,
757 version_id=None,
758 buffer_size=DEFAULT_BUFFER_SIZE,
759 line_terminator=constants.BINARY_NEWLINE,
760 defer_seek=False,
761 client=None,
762 client_kwargs=None,
763 range_chunk_size=None,
764 ):
765 self._version_id = version_id
766 self._buffer_size = buffer_size
768 _initialize_boto3(self, client, client_kwargs, bucket, key)
770 self._raw_reader = _SeekableRawReader(
771 self._client,
772 bucket,
773 key,
774 self._version_id,
775 range_chunk_size=range_chunk_size,
776 )
777 self._current_pos = 0
778 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
779 self._eof = False
780 self._line_terminator = line_terminator
781 self._seek_initialized = False
783 #
784 # This member is part of the io.BufferedIOBase interface.
785 #
786 self.raw = None
788 if not defer_seek:
789 self.seek(0)
791 #
792 # io.BufferedIOBase methods.
793 #
795 def close(self):
796 """Flush and close this stream."""
797 logger.debug("close: called")
798 pass
800 def readable(self):
801 """Return True if the stream can be read from."""
802 return True
804 def read(self, size=-1):
805 """Read up to size bytes from the object and return them."""
806 if size == 0:
807 return b''
808 elif size < 0:
809 # call read() before setting _current_pos to make sure _content_length is set
810 out = self._read_from_buffer() + self._raw_reader.read()
811 self._current_pos = self._raw_reader._content_length
812 return out
814 #
815 # Return unused data first
816 #
817 if len(self._buffer) >= size:
818 return self._read_from_buffer(size)
820 #
821 # If the stream is finished, return what we have.
822 #
823 if self._eof:
824 return self._read_from_buffer()
826 self._fill_buffer(size)
827 return self._read_from_buffer(size)
829 def read1(self, size=-1):
830 """This is the same as read()."""
831 return self.read(size=size)
833 def readinto(self, b):
834 """Read up to len(b) bytes into b, and return the number of bytes
835 read."""
836 data = self.read(len(b))
837 if not data:
838 return 0
839 b[:len(data)] = data
840 return len(data)
842 def readline(self, limit=-1):
843 """Read up to and including the next newline. Returns the bytes read."""
844 if limit != -1:
845 raise NotImplementedError('limits other than -1 not implemented yet')
847 #
848 # A single line may span multiple buffers.
849 #
850 line = io.BytesIO()
851 while not (self._eof and len(self._buffer) == 0):
852 line_part = self._buffer.readline(self._line_terminator)
853 line.write(line_part)
854 self._current_pos += len(line_part)
856 if line_part.endswith(self._line_terminator):
857 break
858 else:
859 self._fill_buffer()
861 return line.getvalue()
863 def seekable(self):
864 """If False, seek(), tell() and truncate() will raise IOError.
866 We offer only seek support, and no truncate support."""
867 return True
869 def seek(self, offset, whence=constants.WHENCE_START):
870 """Seek to the specified position.
872 :param int offset: The offset in bytes.
873 :param int whence: Where the offset is from.
875 Returns the position after seeking."""
876 # Convert relative offset to absolute, since self._raw_reader
877 # doesn't know our current position.
878 if whence == constants.WHENCE_CURRENT:
879 whence = constants.WHENCE_START
880 offset += self._current_pos
882 # Check if we can satisfy seek from buffer
883 if whence == constants.WHENCE_START and offset > self._current_pos:
884 buffer_end = self._current_pos + len(self._buffer)
885 if offset <= buffer_end:
886 # Forward seek within buffered data - avoid S3 request
887 self._buffer.read(offset - self._current_pos)
888 self._current_pos = offset
889 return self._current_pos
891 if not self._seek_initialized or not (
892 whence == constants.WHENCE_START and offset == self._current_pos
893 ):
894 self._current_pos = self._raw_reader.seek(offset, whence)
895 self._buffer.empty()
897 self._eof = self._current_pos == self._raw_reader._content_length
899 self._seek_initialized = True
900 return self._current_pos
902 def tell(self):
903 """Return the current position within the file."""
904 return self._current_pos
906 def truncate(self, size=None):
907 """Unsupported."""
908 raise io.UnsupportedOperation
910 def detach(self):
911 """Unsupported."""
912 raise io.UnsupportedOperation
914 def terminate(self):
915 """Do nothing."""
916 pass
918 def to_boto3(self, resource):
919 """Create an **independent** `boto3.s3.Object` instance that points to
920 the same S3 object as this instance.
921 Changes to the returned object will not affect the current instance.
922 """
923 assert resource, 'resource must be a boto3.resource instance'
924 obj = resource.Object(self._bucket, self._key)
925 if self._version_id is not None:
926 return obj.Version(self._version_id)
927 else:
928 return obj
930 #
931 # Internal methods.
932 #
933 def _read_from_buffer(self, size=-1):
934 """Remove at most size bytes from our buffer and return them."""
935 size = size if size >= 0 else len(self._buffer)
936 part = self._buffer.read(size)
937 self._current_pos += len(part)
938 return part
940 def _fill_buffer(self, size=-1):
941 size = max(size, self._buffer._chunk_size)
942 while len(self._buffer) < size and not self._eof:
943 bytes_read = self._buffer.fill(self._raw_reader)
944 if bytes_read == 0:
945 logger.debug('%s: reached EOF while filling buffer', self)
946 self._eof = True
948 def __str__(self):
949 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key)
951 def __repr__(self):
952 return (
953 "smart_open.s3.Reader("
954 "bucket=%r, "
955 "key=%r, "
956 "version_id=%r, "
957 "buffer_size=%r, "
958 "line_terminator=%r)"
959 ) % (
960 self._bucket,
961 self._key,
962 self._version_id,
963 self._buffer_size,
964 self._line_terminator,
965 )
968class MultipartWriter(io.BufferedIOBase):
969 """Writes bytes to S3 using the multi part API.
971 Implements the io.BufferedIOBase interface of the standard library."""
972 _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called
974 def __init__(
975 self,
976 bucket,
977 key,
978 part_size=DEFAULT_PART_SIZE,
979 client=None,
980 client_kwargs=None,
981 writebuffer: io.BytesIO | None = None,
982 ):
983 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE)
984 if part_size != adjusted_ps:
985 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}")
986 part_size = adjusted_ps
987 self._part_size = part_size
989 _initialize_boto3(self, client, client_kwargs, bucket, key)
990 self._client: S3Client
991 self._bucket: str
992 self._key: str
994 try:
995 partial = functools.partial(
996 self._client.create_multipart_upload,
997 Bucket=bucket,
998 Key=key,
999 )
1000 self._upload_id = RETRY._do(partial)['UploadId']
1001 except botocore.client.ClientError as error:
1002 raise ValueError(
1003 'the bucket %r does not exist, or is forbidden for access (%r)' % (
1004 bucket, error
1005 )
1006 ) from error
1008 if writebuffer is None:
1009 self._buf = io.BytesIO()
1010 else:
1011 self._buf = writebuffer
1013 self._total_bytes = 0
1014 self._total_parts = 0
1015 self._parts: list[dict[str, object]] = []
1017 #
1018 # This member is part of the io.BufferedIOBase interface.
1019 #
1020 self.raw = None # type: ignore[assignment]
1022 def flush(self):
1023 pass
1025 #
1026 # Override some methods from io.IOBase.
1027 #
1028 def close(self):
1029 logger.debug("close: called")
1030 if self.closed:
1031 return
1033 if self._buf.tell():
1034 self._upload_next_part()
1036 logger.debug('%s: completing multipart upload', self)
1037 if self._total_bytes and self._upload_id:
1038 partial = functools.partial(
1039 self._client.complete_multipart_upload,
1040 Bucket=self._bucket,
1041 Key=self._key,
1042 UploadId=self._upload_id,
1043 MultipartUpload={'Parts': self._parts},
1044 )
1045 RETRY._do(partial)
1046 logger.debug('%s: completed multipart upload', self)
1047 elif self._upload_id:
1048 #
1049 # AWS complains with "The XML you provided was not well-formed or
1050 # did not validate against our published schema" when the input is
1051 # completely empty => abort the upload, no file created.
1052 #
1053 # We work around this by creating an empty file explicitly.
1054 #
1055 self._client.abort_multipart_upload(
1056 Bucket=self._bucket,
1057 Key=self._key,
1058 UploadId=self._upload_id,
1059 )
1060 self._client.put_object(
1061 Bucket=self._bucket,
1062 Key=self._key,
1063 Body=b'',
1064 )
1065 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self)
1066 self._upload_id = None
1068 @property
1069 def closed(self):
1070 return self._upload_id is None
1072 def writable(self):
1073 """Return True if the stream supports writing."""
1074 return True
1076 def seekable(self):
1077 """If False, seek(), tell() and truncate() will raise IOError.
1079 We offer only tell support, and no seek or truncate support."""
1080 return True
1082 def seek(self, offset, whence=constants.WHENCE_START):
1083 """Unsupported."""
1084 raise io.UnsupportedOperation
1086 def truncate(self, size=None):
1087 """Unsupported."""
1088 raise io.UnsupportedOperation
1090 def tell(self):
1091 """Return the current stream position."""
1092 return self._total_bytes
1094 #
1095 # io.BufferedIOBase methods.
1096 #
1097 def detach(self):
1098 raise io.UnsupportedOperation("detach() not supported")
1100 def write(self, b: Buffer) -> int:
1101 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1102 interface implementation) to the S3 file.
1104 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html
1106 There's buffering happening under the covers, so this may not actually
1107 do any HTTP transfer right away."""
1108 offset = 0
1109 mv = memoryview(b)
1110 self._total_bytes += len(mv)
1112 #
1113 # botocore does not accept memoryview, otherwise we could've gotten
1114 # away with not needing to write a copy to the buffer aside from cases
1115 # where b is smaller than part_size
1116 #
1117 while offset < len(mv):
1118 start = offset
1119 end = offset + self._part_size - self._buf.tell()
1120 self._buf.write(mv[start:end])
1121 if self._buf.tell() < self._part_size:
1122 #
1123 # Not enough data to write a new part just yet. The assert
1124 # ensures that we've consumed all of the input buffer.
1125 #
1126 assert end >= len(mv)
1127 return len(mv)
1129 self._upload_next_part()
1130 offset = end
1131 return len(mv)
1133 def terminate(self):
1134 """Cancel the underlying multipart upload."""
1135 if self.closed:
1136 return
1137 logger.debug('%s: terminating multipart upload', self)
1138 self._client.abort_multipart_upload(
1139 Bucket=self._bucket,
1140 Key=self._key,
1141 UploadId=self._upload_id,
1142 )
1143 self._upload_id = None
1144 logger.debug('%s: terminated multipart upload', self)
1146 def to_boto3(self, resource):
1147 """Create an **independent** `boto3.s3.Object` instance that points to
1148 the same S3 object as this instance.
1149 Changes to the returned object will not affect the current instance.
1150 """
1151 assert resource, 'resource must be a boto3.resource instance'
1152 return resource.Object(self._bucket, self._key)
1154 #
1155 # Internal methods.
1156 #
1157 def _upload_next_part(self) -> None:
1158 part_num = self._total_parts + 1
1159 logger.info(
1160 "%s: uploading part_num: %i, %i bytes (total %.3fGB)",
1161 self,
1162 part_num,
1163 self._buf.tell(),
1164 self._total_bytes / 1024.0 ** 3,
1165 )
1166 self._buf.seek(0)
1168 #
1169 # Network problems in the middle of an upload are particularly
1170 # troublesome. We don't want to abort the entire upload just because
1171 # of a temporary connection problem, so this part needs to be
1172 # especially robust.
1173 #
1174 upload = RETRY._do(
1175 functools.partial(
1176 self._client.upload_part,
1177 Bucket=self._bucket,
1178 Key=self._key,
1179 UploadId=self._upload_id,
1180 PartNumber=part_num,
1181 Body=self._buf,
1182 )
1183 )
1185 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num})
1186 logger.debug("%s: upload of part_num #%i finished", self, part_num)
1188 self._total_parts += 1
1190 self._buf.seek(0)
1191 self._buf.truncate(0)
1193 def __enter__(self):
1194 return self
1196 def __exit__(self, exc_type, exc_val, exc_tb):
1197 if exc_type is not None:
1198 self.terminate()
1199 else:
1200 self.close()
1202 def __str__(self):
1203 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key)
1205 def __repr__(self):
1206 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % (
1207 self._bucket,
1208 self._key,
1209 self._part_size,
1210 )
1213class SinglepartWriter(io.BufferedIOBase):
1214 """Writes bytes to S3 using the single part API.
1216 Implements the io.BufferedIOBase interface of the standard library.
1218 This class buffers all of its input in memory until its `close` method is called. Only then will
1219 the data be written to S3 and the buffer is released."""
1220 _buf = None # so `closed` property works in case __init__ fails and __del__ is called
1222 def __init__(
1223 self,
1224 bucket,
1225 key,
1226 client=None,
1227 client_kwargs=None,
1228 writebuffer=None,
1229 ):
1230 _initialize_boto3(self, client, client_kwargs, bucket, key)
1232 if writebuffer is None:
1233 self._buf = io.BytesIO()
1234 elif not writebuffer.seekable():
1235 raise ValueError('writebuffer needs to be seekable')
1236 else:
1237 self._buf = writebuffer
1239 def flush(self):
1240 pass
1242 #
1243 # Override some methods from io.IOBase.
1244 #
1245 def close(self):
1246 logger.debug("close: called")
1247 if self.closed:
1248 return
1250 self.seek(0)
1252 try:
1253 self._client.put_object(
1254 Bucket=self._bucket,
1255 Key=self._key,
1256 Body=self._buf,
1257 )
1258 except botocore.client.ClientError as e:
1259 raise ValueError(
1260 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e
1261 else:
1262 logger.debug("%s: direct upload finished", self)
1263 finally:
1264 self._buf.close()
1266 @property
1267 def closed(self):
1268 return self._buf is None or self._buf.closed
1270 def readable(self):
1271 """Propagate."""
1272 return self._buf.readable()
1274 def writable(self):
1275 """Propagate."""
1276 return self._buf.writable()
1278 def seekable(self):
1279 """Propagate."""
1280 return self._buf.seekable()
1282 def seek(self, offset, whence=constants.WHENCE_START):
1283 """Propagate."""
1284 return self._buf.seek(offset, whence)
1286 def truncate(self, size=None):
1287 """Propagate."""
1288 return self._buf.truncate(size)
1290 def tell(self):
1291 """Propagate."""
1292 return self._buf.tell()
1294 def write(self, b):
1295 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1296 interface implementation) into the buffer. Content of the buffer will be
1297 written to S3 on close as a single-part upload.
1299 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html"""
1300 return self._buf.write(b)
1302 def read(self, size=-1):
1303 """Propagate."""
1304 return self._buf.read(size)
1306 def read1(self, size=-1):
1307 """Propagate."""
1308 return self._buf.read1(size)
1310 def terminate(self):
1311 """Close buffer and skip upload."""
1312 self._buf.close()
1313 logger.debug('%s: terminated singlepart upload', self)
1315 #
1316 # Internal methods.
1317 #
1318 def __enter__(self):
1319 return self
1321 def __exit__(self, exc_type, exc_val, exc_tb):
1322 if exc_type is not None:
1323 self.terminate()
1324 else:
1325 self.close()
1327 def __str__(self):
1328 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key)
1330 def __repr__(self):
1331 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key)
1334def _accept_all(key):
1335 return True
1338def iter_bucket(
1339 bucket_name,
1340 prefix='',
1341 accept_key=None,
1342 key_limit=None,
1343 workers=16,
1344 retries=3,
1345 max_threads_per_fileobj=4,
1346 **session_kwargs):
1347 """
1348 Iterate and download all S3 objects under `s3://bucket_name/prefix`.
1350 Parameters
1351 ----------
1352 bucket_name: str
1353 The name of the bucket.
1354 prefix: str, optional
1355 Limits the iteration to keys starting with the prefix.
1356 accept_key: callable, optional
1357 This is a function that accepts a key name (unicode string) and
1358 returns True/False, signalling whether the given key should be downloaded.
1359 The default behavior is to accept all keys.
1360 key_limit: int, optional
1361 If specified, the iterator will stop after yielding this many results.
1362 workers: int, optional
1363 The number of objects to download concurrently. The entire operation uses
1364 a single ThreadPoolExecutor and shared thread-safe boto3 S3.Client. Default: 16
1365 retries: int, optional
1366 The number of time to retry a failed download. Default: 3
1367 max_threads_per_fileobj: int, optional
1368 The maximum number of download threads per worker. The maximum size of the
1369 connection pool will be `workers * max_threads_per_fileobj + 1`. Default: 4
1370 session_kwargs: dict, optional
1371 Keyword arguments to pass when creating a new session.
1372 For a list of available names and values, see:
1373 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session
1376 Yields
1377 ------
1378 str
1379 The full key name (does not include the bucket name).
1380 bytes
1381 The full contents of the key.
1383 Notes
1384 -----
1385 The keys are processed in parallel, using `workers` processes (default: 16),
1386 to speed up downloads greatly. If multiprocessing is not available, thus
1387 _MULTIPROCESSING is False, this parameter will be ignored.
1389 Examples
1390 --------
1392 >>> # get all JSON files under "mybucket/foo/"
1393 >>> for key, content in iter_bucket(
1394 ... bucket_name, prefix='foo/',
1395 ... accept_key=lambda key: key.endswith('.json')):
1396 ... print key, len(content)
1398 >>> # limit to 10k files, using 32 parallel workers (default is 16)
1399 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32):
1400 ... print key, len(content)
1401 """
1402 if accept_key is None:
1403 accept_key = _accept_all
1405 #
1406 # If people insist on giving us bucket instances, silently extract the name
1407 # before moving on. Works for boto3 as well as boto.
1408 #
1409 try:
1410 bucket_name = bucket_name.name
1411 except AttributeError:
1412 pass
1414 if bucket_name is None:
1415 raise ValueError('bucket_name may not be None')
1417 total_size, key_no = 0, -1
1419 # thread-safe client to share across _list_bucket and _download_key calls
1420 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111
1421 session = boto3.session.Session(**session_kwargs)
1422 config = botocore.client.Config(
1423 max_pool_connections=workers * max_threads_per_fileobj + 1, # 1 thread for _list_bucket
1424 tcp_keepalive=True,
1425 retries={"max_attempts": retries * 2, "mode": "adaptive"},
1426 )
1427 client = session.client('s3', config=config)
1429 transfer_config = boto3.s3.transfer.TransferConfig(max_concurrency=max_threads_per_fileobj)
1431 key_iterator = _list_bucket(
1432 bucket_name=bucket_name,
1433 prefix=prefix,
1434 accept_key=accept_key,
1435 client=client,
1436 )
1437 download_key = functools.partial(
1438 _download_key,
1439 bucket_name=bucket_name,
1440 retries=retries,
1441 client=client,
1442 transfer_config=transfer_config,
1443 )
1445 with smart_open.concurrency.create_pool(workers) as pool:
1446 result_iterator = pool.imap_unordered(download_key, key_iterator)
1447 key_no = 0
1448 while True:
1449 try:
1450 (key, content) = result_iterator.__next__()
1451 except StopIteration:
1452 break
1453 # Skip deleted objects (404 responses)
1454 if key is None:
1455 continue
1456 if key_no % 1000 == 0:
1457 logger.info(
1458 "yielding key #%i: %s, size %i (total %.1fMB)",
1459 key_no, key, len(content), total_size / 1024.0 ** 2
1460 )
1461 yield key, content
1462 total_size += len(content)
1463 if key_limit is not None and key_no + 1 >= key_limit:
1464 # we were asked to output only a limited number of keys => we're done
1465 break
1466 key_no += 1
1467 logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))
1470def _list_bucket(
1471 *,
1472 bucket_name,
1473 client,
1474 prefix='',
1475 accept_key=lambda k: True,
1476):
1477 ctoken = None
1479 while True:
1480 # list_objects_v2 doesn't like a None value for ContinuationToken
1481 # so we don't set it if we don't have one.
1482 if ctoken:
1483 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken)
1484 else:
1485 kwargs = dict(Bucket=bucket_name, Prefix=prefix)
1486 response = client.list_objects_v2(**kwargs)
1487 try:
1488 content = response['Contents']
1489 except KeyError:
1490 pass
1491 else:
1492 for c in content:
1493 key = c['Key']
1494 if accept_key(key):
1495 yield key
1496 ctoken = response.get('NextContinuationToken', None)
1497 if not ctoken:
1498 break
1501def _download_key(key_name, *, client, bucket_name, retries, transfer_config):
1502 # Sometimes, https://github.com/boto/boto/issues/2409 can happen
1503 # because of network issues on either side.
1504 # Retry up to 3 times to ensure its not a transient issue.
1505 for x in range(retries + 1):
1506 try:
1507 content_bytes = _download_fileobj(
1508 client=client,
1509 bucket_name=bucket_name,
1510 key_name=key_name,
1511 transfer_config=transfer_config,
1512 )
1513 except botocore.exceptions.ClientError as err:
1514 #
1515 # ignore 404 not found errors: they mean the object was deleted
1516 # after we listed the contents of the bucket, but before we
1517 # downloaded the object.
1518 #
1519 if 'Error' in err.response and err.response['Error'].get('Code') == '404':
1520 return None, None
1521 # Actually fail on last pass through the loop
1522 if x == retries:
1523 raise
1524 # Otherwise, try again, as this might be a transient timeout
1525 continue
1526 return key_name, content_bytes
1529def _download_fileobj(*, client, bucket_name, key_name, transfer_config):
1530 #
1531 # This is a separate function only because it makes it easier to inject
1532 # exceptions during tests.
1533 #
1534 buf = io.BytesIO()
1535 client.download_fileobj(
1536 Bucket=bucket_name,
1537 Key=key_name,
1538 Fileobj=buf,
1539 Config=transfer_config,
1540 )
1541 return buf.getvalue()