Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/s3.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8"""Implements file-like objects for reading and writing from/to AWS S3."""
9from __future__ import annotations
11import http
12import io
13import functools
14import itertools
15import logging
16import time
17import warnings
18from math import inf
20from typing import (
21 Callable,
22 List,
23 TYPE_CHECKING,
24)
26try:
27 import boto3
28 import botocore.client
29 import botocore.exceptions
30 import urllib3.exceptions
31except ImportError:
32 MISSING_DEPS = True
34import smart_open.bytebuffer
35import smart_open.concurrency
36import smart_open.utils
38from smart_open import constants
41if TYPE_CHECKING:
42 from mypy_boto3_s3.client import S3Client
43 from typing_extensions import Buffer
45logger = logging.getLogger(__name__)
47#
48# AWS puts restrictions on the part size for multipart uploads.
49# Each part must be more than 5MB, and less than 5GB.
50#
51# On top of that, our MultipartWriter has a min_part_size option.
52# In retrospect, it's an unfortunate name, because it conflicts with the
53# minimum allowable part size (5MB), but it's too late to change it, because
54# people are using that parameter (unlike the MIN, DEFAULT, MAX constants).
55# It really just means "part size": as soon as you have this many bytes,
56# write a part to S3 (see the MultipartWriter.write method).
57#
59MIN_PART_SIZE = 5 * 1024 ** 2
60"""The absolute minimum permitted by Amazon."""
62DEFAULT_PART_SIZE = 50 * 1024**2
63"""The default part size for S3 multipart uploads, chosen carefully by smart_open"""
65MAX_PART_SIZE = 5 * 1024 ** 3
66"""The absolute maximum permitted by Amazon."""
68SCHEMES = ("s3", "s3n", 's3u', "s3a")
69DEFAULT_PORT = 443
70DEFAULT_HOST = 's3.amazonaws.com'
72DEFAULT_BUFFER_SIZE = 128 * 1024
74URI_EXAMPLES = (
75 's3://my_bucket/my_key',
76 's3://my_key:my_secret@my_bucket/my_key',
77 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key',
78)
80# Returned by AWS when we try to seek beyond EOF.
81_OUT_OF_RANGE = 'InvalidRange'
84class Retry:
85 def __init__(self):
86 self.attempts: int = 6
87 self.sleep_seconds: int = 10
88 self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError]
89 self.client_error_codes: List[str] = ['NoSuchUpload']
91 def _do(self, fn: Callable):
92 for attempt in range(self.attempts):
93 try:
94 return fn()
95 except tuple(self.exceptions) as err:
96 logger.critical(
97 'Caught non-fatal %s, retrying %d more times',
98 err,
99 self.attempts - attempt - 1,
100 )
101 logger.exception(err)
102 time.sleep(self.sleep_seconds)
103 except botocore.exceptions.ClientError as err:
104 error_code = err.response['Error'].get('Code')
105 if error_code not in self.client_error_codes:
106 raise
107 logger.critical(
108 'Caught non-fatal ClientError (%s), retrying %d more times',
109 error_code,
110 self.attempts - attempt - 1,
111 )
112 logger.exception(err)
113 time.sleep(self.sleep_seconds)
114 else:
115 logger.critical('encountered too many non-fatal errors, giving up')
116 raise IOError('%s failed after %d attempts', fn.func, self.attempts)
119#
120# The retry mechanism for this submodule. Client code may modify it, e.g. by
121# updating RETRY.sleep_seconds and friends.
122#
123if 'MISSING_DEPS' not in locals():
124 RETRY = Retry()
127class _ClientWrapper:
128 """Wraps a client to inject the appropriate keyword args into each method call.
130 The keyword args are a dictionary keyed by the fully qualified method name.
131 For example, S3.Client.create_multipart_upload.
133 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client
135 This wrapper behaves identically to the client otherwise.
136 """
137 def __init__(self, client, kwargs):
138 self.client = client
139 self.kwargs = kwargs
141 def __getattr__(self, method_name):
142 method = getattr(self.client, method_name)
143 kwargs = self.kwargs.get('S3.Client.%s' % method_name, {})
144 return functools.partial(method, **kwargs)
147def parse_uri(uri_as_string):
148 #
149 # Restrictions on bucket names and labels:
150 #
151 # - Bucket names must be at least 3 and no more than 63 characters long.
152 # - Bucket names must be a series of one or more labels.
153 # - Adjacent labels are separated by a single period (.).
154 # - Bucket names can contain lowercase letters, numbers, and hyphens.
155 # - Each label must start and end with a lowercase letter or a number.
156 #
157 # We use the above as a guide only, and do not perform any validation. We
158 # let boto3 take care of that for us.
159 #
160 split_uri = smart_open.utils.safe_urlsplit(uri_as_string)
161 assert split_uri.scheme in SCHEMES
163 port = DEFAULT_PORT
164 host = DEFAULT_HOST
165 ordinary_calling_format = False
166 #
167 # These defaults tell boto3 to look for credentials elsewhere
168 #
169 access_id, access_secret = None, None
171 #
172 # Common URI template [secret:key@][host[:port]@]bucket/object
173 #
174 # The urlparse function doesn't handle the above schema, so we have to do
175 # it ourselves.
176 #
177 uri = split_uri.netloc + split_uri.path
179 #
180 # Attempt to extract edge-case authentication details from the URL.
181 #
182 # See:
183 # 1. https://summitroute.com/blog/2018/06/20/aws_security_credential_formats/
184 # 2. test_s3_uri_with_credentials* in test_smart_open.py for example edge cases
185 #
186 if '@' in uri:
187 maybe_auth, rest = uri.split('@', 1)
188 if ':' in maybe_auth:
189 maybe_id, maybe_secret = maybe_auth.split(':', 1)
190 if '/' not in maybe_id:
191 access_id, access_secret = maybe_id, maybe_secret
192 uri = rest
194 head, key_id = uri.split('/', 1)
195 if '@' in head and ':' in head:
196 ordinary_calling_format = True
197 host_port, bucket_id = head.split('@')
198 host, port = host_port.split(':', 1)
199 port = int(port)
200 elif '@' in head:
201 ordinary_calling_format = True
202 host, bucket_id = head.split('@')
203 else:
204 bucket_id = head
206 return dict(
207 scheme=split_uri.scheme,
208 bucket_id=bucket_id,
209 key_id=key_id,
210 port=port,
211 host=host,
212 ordinary_calling_format=ordinary_calling_format,
213 access_id=access_id,
214 access_secret=access_secret,
215 )
218def _consolidate_params(uri, transport_params):
219 """Consolidates the parsed Uri with the additional parameters.
221 This is necessary because the user can pass some of the parameters can in
222 two different ways:
224 1) Via the URI itself
225 2) Via the transport parameters
227 These are not mutually exclusive, but we have to pick one over the other
228 in a sensible way in order to proceed.
230 """
231 transport_params = dict(transport_params)
233 def inject(**kwargs):
234 try:
235 client_kwargs = transport_params['client_kwargs']
236 except KeyError:
237 client_kwargs = transport_params['client_kwargs'] = {}
239 try:
240 init_kwargs = client_kwargs['S3.Client']
241 except KeyError:
242 init_kwargs = client_kwargs['S3.Client'] = {}
244 init_kwargs.update(**kwargs)
246 client = transport_params.get('client')
247 if client is not None and (uri['access_id'] or uri['access_secret']):
248 logger.warning(
249 'ignoring credentials parsed from URL because they conflict with '
250 'transport_params["client"]. Set transport_params["client"] to None '
251 'to suppress this warning.'
252 )
253 uri.update(access_id=None, access_secret=None)
254 elif (uri['access_id'] and uri['access_secret']):
255 inject(
256 aws_access_key_id=uri['access_id'],
257 aws_secret_access_key=uri['access_secret'],
258 )
259 uri.update(access_id=None, access_secret=None)
261 if client is not None and uri['host'] != DEFAULT_HOST:
262 logger.warning(
263 'ignoring endpoint_url parsed from URL because they conflict with '
264 'transport_params["client"]. Set transport_params["client"] to None '
265 'to suppress this warning.'
266 )
267 uri.update(host=None)
268 elif uri['host'] != DEFAULT_HOST:
269 if uri['scheme'] == 's3u':
270 scheme = 'http'
271 else:
272 scheme = 'https'
273 inject(endpoint_url=scheme + '://%(host)s:%(port)d' % uri)
274 uri.update(host=None)
276 return uri, transport_params
279def open_uri(uri, mode, transport_params):
280 deprecated = (
281 'multipart_upload_kwargs',
282 'object_kwargs',
283 'resource',
284 'resource_kwargs',
285 'session',
286 'singlepart_upload_kwargs',
287 )
288 detected = [k for k in deprecated if k in transport_params]
289 if detected:
290 doc_url = (
291 'https://github.com/piskvorky/smart_open/blob/develop/'
292 'MIGRATING_FROM_OLDER_VERSIONS.rst'
293 )
294 #
295 # We use warnings.warn /w UserWarning instead of logger.warn here because
296 #
297 # 1) Not everyone has logging enabled; and
298 # 2) check_kwargs (below) already uses logger.warn with a similar message
299 #
300 # https://github.com/piskvorky/smart_open/issues/614
301 #
302 message = (
303 'ignoring the following deprecated transport parameters: %r. '
304 'See <%s> for details' % (detected, doc_url)
305 )
306 warnings.warn(message, UserWarning)
307 parsed_uri = parse_uri(uri)
308 parsed_uri, transport_params = _consolidate_params(parsed_uri, transport_params)
309 kwargs = smart_open.utils.check_kwargs(open, transport_params)
310 return open(parsed_uri['bucket_id'], parsed_uri['key_id'], mode, **kwargs)
313def open(
314 bucket_id,
315 key_id,
316 mode,
317 version_id=None,
318 buffer_size=DEFAULT_BUFFER_SIZE,
319 min_part_size=DEFAULT_PART_SIZE,
320 multipart_upload=True,
321 defer_seek=False,
322 client=None,
323 client_kwargs=None,
324 writebuffer=None,
325 range_chunk_size=None,
326):
327 """Open an S3 object for reading or writing.
329 Parameters
330 ----------
331 bucket_id: str
332 The name of the bucket this object resides in.
333 key_id: str
334 The name of the key within the bucket.
335 mode: str
336 The mode for opening the object. Must be either "rb" or "wb".
337 buffer_size: int, optional
338 Default: 128KB
339 The buffer size in bytes for reading. Controls memory usage. Data is streamed
340 from a S3 network stream in buffer_size chunks. Forward seeks within
341 the current buffer are satisfied without additional GET requests. Backward
342 seeks always open a new GET request. For forward seek-intensive workloads,
343 increase buffer_size to reduce GET requests at the cost of higher memory usage.
344 min_part_size: int, optional
345 The minimum part size for multipart uploads, in bytes.
346 When the writebuffer contains this many bytes, smart_open will upload
347 the bytes to S3 as a single part of a multi-part upload, freeing the
348 buffer either partially or entirely. When you close the writer, it
349 will assemble the parts together.
350 The value determines the upper limit for the writebuffer. If buffer
351 space is short (e.g. you are buffering to memory), then use a smaller
352 value for min_part_size, or consider buffering to disk instead (see
353 the writebuffer option).
354 The value must be between 5MB and 5GB. If you specify a value outside
355 of this range, smart_open will adjust it for you, because otherwise the
356 upload _will_ fail.
357 For writing only. Does not apply if you set multipart_upload=False.
358 multipart_upload: bool, optional
359 Default: `True`
360 If set to `True`, will use multipart upload for writing to S3. If set
361 to `False`, S3 upload will use the S3 Single-Part Upload API, which
362 is more ideal for small file sizes.
363 For writing only.
364 version_id: str, optional
365 Version of the object, used when reading object.
366 If None, will fetch the most recent version.
367 defer_seek: boolean, optional
368 Default: `False`
369 If set to `True` on a file opened for reading, GetObject will not be
370 called until the first seek() or read().
371 Avoids redundant API queries when seeking before reading.
372 range_chunk_size: int, optional
373 Default: `None`
374 Maximum byte range per S3 GET request when reading.
375 When None (default), a single GET request is made for the entire file,
376 and data is streamed from that single botocore.response.StreamingBody
377 in buffer_size chunks.
378 When set to a positive integer, multiple GET requests are made, each
379 limited to at most this many bytes via HTTP Range headers. Each GET
380 returns a new StreamingBody that is streamed in buffer_size chunks.
381 Useful for reading small portions of large files without forcing
382 S3-compatible systems like SeaweedFS/Ceph to load the entire file.
383 Larger values mean fewer billable GET requests but higher load on S3
384 servers. Smaller values mean more GET requests but less server load per request.
385 Values larger than the file size result in a single GET for the whole file.
386 Affects reading only. Does not affect memory usage (controlled by buffer_size).
387 client: object, optional
388 The S3 client to use when working with boto3.
389 If you don't specify this, then smart_open will create a new client for you.
390 client_kwargs: dict, optional
391 Additional parameters to pass to the relevant functions of the client.
392 The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`.
393 The values are kwargs to pass to that method each time it is called.
394 writebuffer: IO[bytes], optional
395 By default, this module will buffer data in memory using io.BytesIO
396 when writing. Pass another binary IO instance here to use it instead.
397 For example, you may pass a file object to buffer to local disk instead
398 of in RAM. Use this to keep RAM usage low at the expense of additional
399 disk IO. If you pass in an open file, then you are responsible for
400 cleaning it up after writing completes.
401 """
402 logger.debug('%r', locals())
403 if mode not in constants.BINARY_MODES:
404 raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
406 if (mode == constants.WRITE_BINARY) and (version_id is not None):
407 raise ValueError("version_id must be None when writing")
409 if mode == constants.READ_BINARY:
410 fileobj = Reader(
411 bucket_id,
412 key_id,
413 version_id=version_id,
414 buffer_size=buffer_size,
415 defer_seek=defer_seek,
416 client=client,
417 client_kwargs=client_kwargs,
418 range_chunk_size=range_chunk_size,
419 )
420 elif mode == constants.WRITE_BINARY:
421 if multipart_upload:
422 fileobj = MultipartWriter(
423 bucket_id,
424 key_id,
425 client=client,
426 client_kwargs=client_kwargs,
427 writebuffer=writebuffer,
428 part_size=min_part_size,
429 )
430 else:
431 fileobj = SinglepartWriter(
432 bucket_id,
433 key_id,
434 client=client,
435 client_kwargs=client_kwargs,
436 writebuffer=writebuffer,
437 )
438 else:
439 assert False, 'unexpected mode: %r' % mode
441 fileobj.name = key_id
442 return fileobj
445def _get(client, bucket, key, version, range_string):
446 try:
447 params = dict(Bucket=bucket, Key=key)
448 if version:
449 params["VersionId"] = version
450 if range_string:
451 params["Range"] = range_string
453 return client.get_object(**params)
454 except botocore.client.ClientError as error:
455 wrapped_error = IOError(
456 'unable to access bucket: %r key: %r version: %r error: %s' % (
457 bucket, key, version, error
458 )
459 )
460 wrapped_error.backend_error = error
461 raise wrapped_error from error
464def _unwrap_ioerror(ioe):
465 """Given an IOError from _get, return the 'Error' dictionary from boto."""
466 try:
467 return ioe.backend_error.response['Error']
468 except (AttributeError, KeyError):
469 return None
472class _SeekableRawReader(object):
473 """Read an S3 object.
475 This class is internal to the S3 submodule.
476 """
478 def __init__(
479 self,
480 client,
481 bucket,
482 key,
483 version_id=None,
484 range_chunk_size=None,
485 ):
486 self._client = client
487 self._bucket = bucket
488 self._key = key
489 self._version_id = version_id
490 self._range_chunk_size = range_chunk_size
492 self._content_length = None
493 self._position = 0
494 self._body = None
496 @property
497 def closed(self):
498 return self._body is None
500 def close(self):
501 if not self.closed:
502 self._body.close()
503 self._body = None
505 def seek(self, offset, whence=constants.WHENCE_START):
506 """Seek to the specified position.
508 :param int offset: The offset in bytes.
509 :param int whence: Where the offset is from.
511 :returns: the position after seeking.
512 :rtype: int
513 """
514 if whence not in constants.WHENCE_CHOICES:
515 raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES)
517 #
518 # Close old body explicitly.
519 #
520 self.close()
522 start = None
523 stop = None
524 if whence == constants.WHENCE_START:
525 start = max(0, offset)
526 elif whence == constants.WHENCE_CURRENT:
527 start = max(0, offset + self._position)
528 elif whence == constants.WHENCE_END:
529 stop = max(0, -offset)
531 #
532 # If we can figure out that we've read past the EOF, then we can save
533 # an extra API call.
534 #
535 if self._content_length is None: # _open_body has not been called yet
536 if start is None and stop == 0:
537 # seek(0, WHENCE_END) seeks straight to EOF:
538 # make a minimal request to populate _content_length
539 self._open_body(start=0, stop=0)
540 self.close()
541 reached_eof = True
542 else:
543 reached_eof = False
544 elif start is not None and start >= self._content_length:
545 reached_eof = True
546 elif stop == 0:
547 reached_eof = True
548 else:
549 reached_eof = False
551 if reached_eof:
552 self._body = io.BytesIO()
553 self._position = self._content_length
554 else:
555 self._open_body(start, stop)
557 return self._position
559 def _open_body(self, start=None, stop=None):
560 """Open a connection to download the specified range of bytes. Store
561 the open file handle in self._body.
563 If no range is specified, start defaults to self._position.
564 start and stop follow the semantics of the http range header,
565 so a stop without a start will read bytes beginning at stop.
567 If self._range_chunk_size is set, the S3 server is protected from open range
568 headers and stop will be set such that at most self._range_chunk_size bytes
569 are returned in a single GET request.
571 As a side effect, set self._content_length. Set self._position
572 to self._content_length if start is past end of file.
573 """
574 if start is None and stop is None:
575 start = self._position
577 # Apply chunking: limit the stop position if range_chunk_size is set
578 if stop is None and self._range_chunk_size is not None:
579 stop = start + self._range_chunk_size - 1
580 # Don't request beyond known content length
581 if self._content_length is not None:
582 stop = min(stop, self._content_length - 1)
584 range_string = smart_open.utils.make_range_string(start, stop)
586 try:
587 # Optimistically try to fetch the requested content range.
588 response = _get(
589 self._client,
590 self._bucket,
591 self._key,
592 self._version_id,
593 range_string,
594 )
595 except IOError as ioe:
596 # Handle requested content range exceeding content size.
597 error_response = _unwrap_ioerror(ioe)
598 if error_response is None or error_response.get('Code') != _OUT_OF_RANGE:
599 raise
601 actual_object_size = int(error_response.get('ActualObjectSize', 0))
602 if (
603 # empty file (==) or start is past end of file (>)
604 (start is not None and start >= actual_object_size)
605 # negative seek requested more bytes than file has
606 or (start is None and stop is not None and stop >= actual_object_size)
607 ):
608 self._position = self._content_length = actual_object_size
609 self._body = io.BytesIO()
610 else: # stop is past end of file: request the correct remainder instead
611 self._open_body(start=start, stop=actual_object_size - 1)
612 return
614 #
615 # Keep track of how many times boto3's built-in retry mechanism
616 # activated.
617 #
618 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#checking-retry-attempts-in-an-aws-service-response
619 #
620 logger.debug(
621 '%s: RetryAttempts: %d',
622 self,
623 response['ResponseMetadata']['RetryAttempts'],
624 )
625 #
626 # range request may not always return partial content, see:
627 # https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses
628 #
629 status_code = response['ResponseMetadata']['HTTPStatusCode']
630 if status_code == http.HTTPStatus.PARTIAL_CONTENT:
631 # 206 guarantees that the response body only contains the requested byte range
632 _, resp_start, _, length = smart_open.utils.parse_content_range(response['ContentRange'])
633 self._position = resp_start
634 self._content_length = length
635 self._body = response['Body']
636 elif status_code == http.HTTPStatus.OK:
637 # 200 guarantees the response body contains the full file (server ignored range header)
638 self._position = 0
639 self._content_length = response["ContentLength"]
640 self._body = response['Body']
641 #
642 # If we got a full request when we were actually expecting a range, we need to
643 # read some data to ensure that the body starts in the place that the caller expects
644 #
645 if start is not None:
646 expected_position = min(self._content_length, start)
647 elif start is None and stop is not None:
648 expected_position = max(0, self._content_length - stop)
649 else:
650 expected_position = 0
651 if expected_position > 0:
652 logger.debug(
653 '%s: discarding %d bytes to reach expected position',
654 self,
655 expected_position,
656 )
657 self._position = len(self._body.read(expected_position))
658 else:
659 raise ValueError("Unexpected status code %r" % status_code)
661 def read(self, size=-1):
662 """Read from the continuous connection with the remote peer."""
663 if size < -1:
664 raise ValueError(f'size must be >= -1, got {size}')
666 if size == -1:
667 size = inf # makes for a simple while-condition below
669 binary_collected = io.BytesIO()
671 #
672 # Boto3 has built-in error handling and retry mechanisms:
673 #
674 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
675 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
676 #
677 # Unfortunately, it isn't always enough. There is still a non-zero
678 # possibility that an exception will slip past these mechanisms and
679 # terminate the read prematurely. Luckily, at this stage, it's very
680 # simple to recover from the problem: wait a little bit, reopen the
681 # HTTP connection and try again. Usually, a single retry attempt is
682 # enough to recover, but we try multiple times "just in case".
683 #
684 def retry_read(attempts=(1, 2, 4, 8, 16)) -> bytes:
685 for seconds in attempts:
686 if self.closed:
687 self._open_body()
688 try:
689 if size == inf:
690 return self._body.read()
691 return self._body.read(size - binary_collected.tell())
692 except (
693 ConnectionResetError,
694 botocore.exceptions.BotoCoreError,
695 urllib3.exceptions.HTTPError,
696 ) as err:
697 logger.warning(
698 '%s: caught %r while reading %d bytes, sleeping %ds before retry',
699 self,
700 err,
701 -1 if size == inf else size,
702 seconds,
703 )
704 self.close()
705 time.sleep(seconds)
706 raise IOError(
707 '%s: failed to read %d bytes after %d attempts' %
708 (self, -1 if size == inf else size, len(attempts)),
709 )
711 while (
712 self._content_length is None # very first read call
713 or (
714 self._position < self._content_length # not yet end of file
715 and binary_collected.tell() < size # not yet read enough
716 )
717 ):
718 binary = retry_read()
719 self._position += len(binary)
720 binary_collected.write(binary)
721 if not binary: # end of stream
722 self.close()
724 return binary_collected.getvalue()
726 def __str__(self):
727 return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key)
730def _initialize_boto3(rw, client, client_kwargs, bucket, key):
731 """Created the required objects for accessing S3. Ideally, they have
732 been already created for us and we can just reuse them."""
733 if client_kwargs is None:
734 client_kwargs = {}
736 if client is None:
737 init_kwargs = client_kwargs.get('S3.Client', {})
738 if 'config' not in init_kwargs:
739 init_kwargs['config'] = botocore.client.Config(
740 max_pool_connections=64,
741 tcp_keepalive=True,
742 retries={"max_attempts": 6, "mode": "adaptive"}
743 )
744 # boto3.client re-uses the default session which is not thread-safe when this is called
745 # from within a thread. when using smart_open with multithreading, create a thread-safe
746 # client with the config above and share it between threads using transport_params
747 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111
748 client = boto3.client('s3', **init_kwargs)
749 assert client
751 rw._client = _ClientWrapper(client, client_kwargs)
752 rw._bucket = bucket
753 rw._key = key
756class Reader(io.BufferedIOBase):
757 """Reads bytes from S3.
759 Implements the io.BufferedIOBase interface of the standard library."""
761 def __init__(
762 self,
763 bucket,
764 key,
765 version_id=None,
766 buffer_size=DEFAULT_BUFFER_SIZE,
767 line_terminator=constants.BINARY_NEWLINE,
768 defer_seek=False,
769 client=None,
770 client_kwargs=None,
771 range_chunk_size=None,
772 ):
773 self._version_id = version_id
774 self._buffer_size = buffer_size
776 _initialize_boto3(self, client, client_kwargs, bucket, key)
778 self._raw_reader = _SeekableRawReader(
779 self._client,
780 bucket,
781 key,
782 self._version_id,
783 range_chunk_size=range_chunk_size,
784 )
785 self._current_pos = 0
786 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
787 self._eof = False
788 self._line_terminator = line_terminator
789 self._seek_initialized = False
791 if not defer_seek:
792 self.seek(0)
794 #
795 # io.BufferedIOBase methods.
796 #
798 def close(self):
799 """Flush and close this stream."""
800 logger.debug("close: called")
801 pass
803 def readable(self):
804 """Return True if the stream can be read from."""
805 return True
807 def read(self, size=-1):
808 """Read up to size bytes from the object and return them."""
809 if size == 0:
810 return b''
811 elif size < 0:
812 # call read() before setting _current_pos to make sure _content_length is set
813 out = self._read_from_buffer() + self._raw_reader.read()
814 self._current_pos = self._raw_reader._content_length
815 return out
817 #
818 # Return unused data first
819 #
820 if len(self._buffer) >= size:
821 return self._read_from_buffer(size)
823 #
824 # If the stream is finished, return what we have.
825 #
826 if self._eof:
827 return self._read_from_buffer()
829 self._fill_buffer(size)
830 return self._read_from_buffer(size)
832 def read1(self, size=-1):
833 """This is the same as read()."""
834 return self.read(size=size)
836 def readinto(self, b):
837 """Read up to len(b) bytes into b, and return the number of bytes
838 read."""
839 data = self.read(len(b))
840 if not data:
841 return 0
842 b[:len(data)] = data
843 return len(data)
845 def readline(self, limit=-1):
846 """Read up to and including the next newline. Returns the bytes read."""
847 if limit != -1:
848 raise NotImplementedError('limits other than -1 not implemented yet')
850 #
851 # A single line may span multiple buffers.
852 #
853 line = io.BytesIO()
854 while not (self._eof and len(self._buffer) == 0):
855 line_part = self._buffer.readline(self._line_terminator)
856 line.write(line_part)
857 self._current_pos += len(line_part)
859 if line_part.endswith(self._line_terminator):
860 break
861 else:
862 self._fill_buffer()
864 return line.getvalue()
866 def seekable(self):
867 """If False, seek(), tell() and truncate() will raise IOError.
869 We offer only seek support, and no truncate support."""
870 return True
872 def seek(self, offset, whence=constants.WHENCE_START):
873 """Seek to the specified position.
875 :param int offset: The offset in bytes.
876 :param int whence: Where the offset is from.
878 Returns the position after seeking."""
879 # Convert relative offset to absolute, since self._raw_reader
880 # doesn't know our current position.
881 if whence == constants.WHENCE_CURRENT:
882 whence = constants.WHENCE_START
883 offset += self._current_pos
885 # Check if we can satisfy seek from buffer
886 if whence == constants.WHENCE_START and offset > self._current_pos:
887 buffer_end = self._current_pos + len(self._buffer)
888 if offset <= buffer_end:
889 # Forward seek within buffered data - avoid S3 request
890 self._buffer.read(offset - self._current_pos)
891 self._current_pos = offset
892 return self._current_pos
894 if not self._seek_initialized or not (
895 whence == constants.WHENCE_START and offset == self._current_pos
896 ):
897 self._current_pos = self._raw_reader.seek(offset, whence)
898 self._buffer.empty()
900 self._eof = self._current_pos == self._raw_reader._content_length
902 self._seek_initialized = True
903 return self._current_pos
905 def tell(self):
906 """Return the current position within the file."""
907 return self._current_pos
909 def truncate(self, size=None):
910 """Unsupported."""
911 raise io.UnsupportedOperation
913 def detach(self):
914 """Unsupported."""
915 raise io.UnsupportedOperation
917 def terminate(self):
918 """Do nothing."""
919 pass
921 def to_boto3(self, resource):
922 """Create an **independent** `boto3.s3.Object` instance that points to
923 the same S3 object as this instance.
924 Changes to the returned object will not affect the current instance.
925 """
926 assert resource, 'resource must be a boto3.resource instance'
927 obj = resource.Object(self._bucket, self._key)
928 if self._version_id is not None:
929 return obj.Version(self._version_id)
930 else:
931 return obj
933 #
934 # Internal methods.
935 #
936 def _read_from_buffer(self, size=-1):
937 """Remove at most size bytes from our buffer and return them."""
938 size = size if size >= 0 else len(self._buffer)
939 part = self._buffer.read(size)
940 self._current_pos += len(part)
941 return part
943 def _fill_buffer(self, size=-1):
944 size = max(size, self._buffer._chunk_size)
945 while len(self._buffer) < size and not self._eof:
946 bytes_read = self._buffer.fill(self._raw_reader)
947 if bytes_read == 0:
948 logger.debug('%s: reached EOF while filling buffer', self)
949 self._eof = True
951 def __str__(self):
952 return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key)
954 def __repr__(self):
955 return (
956 "smart_open.s3.Reader("
957 "bucket=%r, "
958 "key=%r, "
959 "version_id=%r, "
960 "buffer_size=%r, "
961 "line_terminator=%r)"
962 ) % (
963 self._bucket,
964 self._key,
965 self._version_id,
966 self._buffer_size,
967 self._line_terminator,
968 )
971class MultipartWriter(io.BufferedIOBase):
972 """Writes bytes to S3 using the multi part API.
974 Implements the io.BufferedIOBase interface of the standard library."""
975 _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called
977 def __init__(
978 self,
979 bucket,
980 key,
981 part_size=DEFAULT_PART_SIZE,
982 client=None,
983 client_kwargs=None,
984 writebuffer: io.BytesIO | None = None,
985 ):
986 adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE)
987 if part_size != adjusted_ps:
988 logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}")
989 part_size = adjusted_ps
990 self._part_size = part_size
992 _initialize_boto3(self, client, client_kwargs, bucket, key)
993 self._client: S3Client
994 self._bucket: str
995 self._key: str
997 try:
998 partial = functools.partial(
999 self._client.create_multipart_upload,
1000 Bucket=bucket,
1001 Key=key,
1002 )
1003 self._upload_id = RETRY._do(partial)['UploadId']
1004 except botocore.client.ClientError as error:
1005 raise ValueError(
1006 'the bucket %r does not exist, or is forbidden for access (%r)' % (
1007 bucket, error
1008 )
1009 ) from error
1011 if writebuffer is None:
1012 self._buf = io.BytesIO()
1013 else:
1014 self._buf = writebuffer
1016 self._total_bytes = 0
1017 self._total_parts = 0
1018 self._parts: list[dict[str, object]] = []
1020 def flush(self):
1021 pass
1023 #
1024 # Override some methods from io.IOBase.
1025 #
1026 def close(self):
1027 logger.debug("close: called")
1028 if self.closed:
1029 return
1031 if self._buf.tell():
1032 self._upload_next_part()
1034 logger.debug('%s: completing multipart upload', self)
1035 if self._total_bytes and self._upload_id:
1036 partial = functools.partial(
1037 self._client.complete_multipart_upload,
1038 Bucket=self._bucket,
1039 Key=self._key,
1040 UploadId=self._upload_id,
1041 MultipartUpload={'Parts': self._parts},
1042 )
1043 RETRY._do(partial)
1044 logger.debug('%s: completed multipart upload', self)
1045 elif self._upload_id:
1046 #
1047 # AWS complains with "The XML you provided was not well-formed or
1048 # did not validate against our published schema" when the input is
1049 # completely empty => abort the upload, no file created.
1050 #
1051 # We work around this by creating an empty file explicitly.
1052 #
1053 self._client.abort_multipart_upload(
1054 Bucket=self._bucket,
1055 Key=self._key,
1056 UploadId=self._upload_id,
1057 )
1058 self._client.put_object(
1059 Bucket=self._bucket,
1060 Key=self._key,
1061 Body=b'',
1062 )
1063 logger.debug('%s: wrote 0 bytes to imitate multipart upload', self)
1064 self._upload_id = None
1066 @property
1067 def closed(self):
1068 return self._upload_id is None
1070 def writable(self):
1071 """Return True if the stream supports writing."""
1072 return True
1074 def seekable(self):
1075 """If False, seek(), tell() and truncate() will raise IOError.
1077 We offer only tell support, and no seek or truncate support."""
1078 return True
1080 def seek(self, offset, whence=constants.WHENCE_START):
1081 """Unsupported."""
1082 raise io.UnsupportedOperation
1084 def truncate(self, size=None):
1085 """Unsupported."""
1086 raise io.UnsupportedOperation
1088 def tell(self):
1089 """Return the current stream position."""
1090 return self._total_bytes
1092 #
1093 # io.BufferedIOBase methods.
1094 #
1095 def detach(self):
1096 raise io.UnsupportedOperation("detach() not supported")
1098 def write(self, b: Buffer) -> int:
1099 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1100 interface implementation) to the S3 file.
1102 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html
1104 There's buffering happening under the covers, so this may not actually
1105 do any HTTP transfer right away."""
1106 offset = 0
1107 mv = memoryview(b)
1108 self._total_bytes += len(mv)
1110 #
1111 # botocore does not accept memoryview, otherwise we could've gotten
1112 # away with not needing to write a copy to the buffer aside from cases
1113 # where b is smaller than part_size
1114 #
1115 while offset < len(mv):
1116 start = offset
1117 end = offset + self._part_size - self._buf.tell()
1118 self._buf.write(mv[start:end])
1119 if self._buf.tell() < self._part_size:
1120 #
1121 # Not enough data to write a new part just yet. The assert
1122 # ensures that we've consumed all of the input buffer.
1123 #
1124 assert end >= len(mv)
1125 return len(mv)
1127 self._upload_next_part()
1128 offset = end
1129 return len(mv)
1131 def terminate(self):
1132 """Cancel the underlying multipart upload."""
1133 if self.closed:
1134 return
1135 logger.debug('%s: terminating multipart upload', self)
1136 self._client.abort_multipart_upload(
1137 Bucket=self._bucket,
1138 Key=self._key,
1139 UploadId=self._upload_id,
1140 )
1141 self._upload_id = None
1142 logger.debug('%s: terminated multipart upload', self)
1144 def to_boto3(self, resource):
1145 """Create an **independent** `boto3.s3.Object` instance that points to
1146 the same S3 object as this instance.
1147 Changes to the returned object will not affect the current instance.
1148 """
1149 assert resource, 'resource must be a boto3.resource instance'
1150 return resource.Object(self._bucket, self._key)
1152 #
1153 # Internal methods.
1154 #
1155 def _upload_next_part(self) -> None:
1156 part_num = self._total_parts + 1
1157 logger.info(
1158 "%s: uploading part_num: %i, %i bytes (total %.3fGB)",
1159 self,
1160 part_num,
1161 self._buf.tell(),
1162 self._total_bytes / 1024.0 ** 3,
1163 )
1164 self._buf.seek(0)
1166 #
1167 # Network problems in the middle of an upload are particularly
1168 # troublesome. We don't want to abort the entire upload just because
1169 # of a temporary connection problem, so this part needs to be
1170 # especially robust.
1171 #
1172 upload = RETRY._do(
1173 functools.partial(
1174 self._client.upload_part,
1175 Bucket=self._bucket,
1176 Key=self._key,
1177 UploadId=self._upload_id,
1178 PartNumber=part_num,
1179 Body=self._buf,
1180 )
1181 )
1183 self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num})
1184 logger.debug("%s: upload of part_num #%i finished", self, part_num)
1186 self._total_parts += 1
1188 self._buf.seek(0)
1189 self._buf.truncate(0)
1191 def __enter__(self):
1192 return self
1194 def __exit__(self, exc_type, exc_val, exc_tb):
1195 if exc_type is not None:
1196 self.terminate()
1197 else:
1198 self.close()
1200 def __str__(self):
1201 return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key)
1203 def __repr__(self):
1204 return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % (
1205 self._bucket,
1206 self._key,
1207 self._part_size,
1208 )
1211class SinglepartWriter(io.BufferedIOBase):
1212 """Writes bytes to S3 using the single part API.
1214 Implements the io.BufferedIOBase interface of the standard library.
1216 This class buffers all of its input in memory until its `close` method is called. Only then will
1217 the data be written to S3 and the buffer is released."""
1218 _buf = None # so `closed` property works in case __init__ fails and __del__ is called
1220 def __init__(
1221 self,
1222 bucket,
1223 key,
1224 client=None,
1225 client_kwargs=None,
1226 writebuffer=None,
1227 ):
1228 _initialize_boto3(self, client, client_kwargs, bucket, key)
1230 if writebuffer is None:
1231 self._buf = io.BytesIO()
1232 elif not writebuffer.seekable():
1233 raise ValueError('writebuffer needs to be seekable')
1234 else:
1235 self._buf = writebuffer
1237 def flush(self):
1238 pass
1240 #
1241 # Override some methods from io.IOBase.
1242 #
1243 def close(self):
1244 logger.debug("close: called")
1245 if self.closed:
1246 return
1248 self.seek(0)
1250 try:
1251 self._client.put_object(
1252 Bucket=self._bucket,
1253 Key=self._key,
1254 Body=self._buf,
1255 )
1256 except botocore.client.ClientError as e:
1257 raise ValueError(
1258 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e
1259 else:
1260 logger.debug("%s: direct upload finished", self)
1261 finally:
1262 self._buf.close()
1264 @property
1265 def closed(self):
1266 return self._buf is None or self._buf.closed
1268 def readable(self):
1269 """Propagate."""
1270 return self._buf.readable()
1272 def writable(self):
1273 """Propagate."""
1274 return self._buf.writable()
1276 def seekable(self):
1277 """Propagate."""
1278 return self._buf.seekable()
1280 def seek(self, offset, whence=constants.WHENCE_START):
1281 """Propagate."""
1282 return self._buf.seek(offset, whence)
1284 def truncate(self, size=None):
1285 """Propagate."""
1286 return self._buf.truncate(size)
1288 def tell(self):
1289 """Propagate."""
1290 return self._buf.tell()
1292 def write(self, b):
1293 """Write the given buffer (bytes, bytearray, memoryview or any buffer
1294 interface implementation) into the buffer. Content of the buffer will be
1295 written to S3 on close as a single-part upload.
1297 For more information about buffers, see https://docs.python.org/3/c-api/buffer.html"""
1298 return self._buf.write(b)
1300 def read(self, size=-1):
1301 """Propagate."""
1302 return self._buf.read(size)
1304 def read1(self, size=-1):
1305 """Propagate."""
1306 return self._buf.read1(size)
1308 def terminate(self):
1309 """Close buffer and skip upload."""
1310 self._buf.close()
1311 logger.debug('%s: terminated singlepart upload', self)
1313 #
1314 # Internal methods.
1315 #
1316 def __enter__(self):
1317 return self
1319 def __exit__(self, exc_type, exc_val, exc_tb):
1320 if exc_type is not None:
1321 self.terminate()
1322 else:
1323 self.close()
1325 def __str__(self):
1326 return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key)
1328 def __repr__(self):
1329 return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key)
1332def _accept_all(key):
1333 return True
1336def iter_bucket(
1337 bucket_name,
1338 prefix='',
1339 accept_key=None,
1340 key_limit=None,
1341 workers=16,
1342 retries=3,
1343 max_threads_per_fileobj=4,
1344 client_kwargs=None,
1345 **session_kwargs, # double star notation for backwards compatibility
1346):
1347 """
1348 Iterate and download all S3 objects under `s3://bucket_name/prefix`.
1350 Parameters
1351 ----------
1352 bucket_name: str
1353 The name of the bucket.
1354 prefix: str, optional
1355 Limits the iteration to keys starting with the prefix.
1356 accept_key: callable, optional
1357 This is a function that accepts a key name (unicode string) and
1358 returns True/False, signalling whether the given key should be downloaded.
1359 The default behavior is to accept all keys.
1360 key_limit: int, optional
1361 If specified, the iterator will stop after yielding this many results.
1362 workers: int, optional
1363 The number of objects to download concurrently. The entire operation uses
1364 a single ThreadPoolExecutor and shared thread-safe boto3 S3.Client. Default: 16
1365 retries: int, optional
1366 The number of time to retry a failed download. Default: 3
1367 max_threads_per_fileobj: int, optional
1368 The maximum number of download threads per worker. The maximum size of the
1369 connection pool will be `workers * max_threads_per_fileobj + 1`. Default: 4
1370 client_kwargs: dict, optional
1371 Keyword arguments to pass when creating a new session.
1372 For a list of available names and values, see:
1373 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client
1374 session_kwargs: dict, optional
1375 Keyword arguments to pass when creating a new session.
1376 For a list of available names and values, see:
1377 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session
1380 Yields
1381 ------
1382 str
1383 The full key name (does not include the bucket name).
1384 bytes
1385 The full contents of the key.
1387 Notes
1388 -----
1389 The keys are processed in parallel, using `workers` processes (default: 16),
1390 to speed up downloads greatly. If multiprocessing is not available, thus
1391 _MULTIPROCESSING is False, this parameter will be ignored.
1393 Examples
1394 --------
1396 >>> # get all JSON files under "mybucket/foo/"
1397 >>> for key, content in iter_bucket(
1398 ... bucket_name, prefix='foo/',
1399 ... accept_key=lambda key: key.endswith('.json')):
1400 ... print key, len(content)
1402 >>> # limit to 10k files, using 32 parallel workers (default is 16)
1403 >>> for key, content in iter_bucket(bucket_name, key_limit=10000, workers=32):
1404 ... print key, len(content)
1405 """
1406 if accept_key is None:
1407 accept_key = _accept_all
1409 #
1410 # If people insist on giving us bucket instances, silently extract the name
1411 # before moving on. Works for boto3 as well as boto.
1412 #
1413 try:
1414 bucket_name = bucket_name.name
1415 except AttributeError:
1416 pass
1418 if bucket_name is None:
1419 raise ValueError('bucket_name may not be None')
1421 total_size, key_no = 0, 0
1423 # thread-safe client to share across _list_bucket and _download_key calls
1424 # https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111
1425 session = boto3.session.Session(**session_kwargs)
1426 if client_kwargs is None:
1427 client_kwargs = {}
1428 if 'config' not in client_kwargs:
1429 client_kwargs['config'] = botocore.client.Config(
1430 max_pool_connections=workers * max_threads_per_fileobj + 1, # 1 thread for _list_bucket
1431 tcp_keepalive=True,
1432 retries={'max_attempts': retries * 2, 'mode': 'adaptive'},
1433 )
1434 client = session.client('s3', **client_kwargs)
1436 transfer_config = boto3.s3.transfer.TransferConfig(max_concurrency=max_threads_per_fileobj)
1438 key_iterator = _list_bucket(
1439 bucket_name=bucket_name,
1440 prefix=prefix,
1441 accept_key=accept_key,
1442 client=client,
1443 )
1444 download_key = functools.partial(
1445 _download_key,
1446 bucket_name=bucket_name,
1447 retries=retries,
1448 client=client,
1449 transfer_config=transfer_config,
1450 )
1452 # Limit the iterator ('infinite' iterators are supported, key_limit=None is supported)
1453 key_iterator = itertools.islice(key_iterator, key_limit)
1455 with smart_open.concurrency.ThreadPoolExecutor(workers) as executor:
1456 result_iterator = executor.imap(download_key, key_iterator)
1457 for key_no, (key, content) in enumerate(result_iterator, start=1):
1458 # Skip deleted objects (404 responses)
1459 if key is None:
1460 continue
1462 if key_no % 1000 == 0:
1463 logger.info(
1464 "yielding key #%i: %s, size %i (total %.1f MB)",
1465 key_no, key, len(content), total_size / 1024.0 ** 2
1466 )
1468 yield key, content
1469 total_size += len(content)
1470 logger.info(
1471 "processed %i keys, total size %.1f MB",
1472 key_no,
1473 total_size / 1024.0 ** 2,
1474 )
1477def _list_bucket(
1478 *,
1479 bucket_name,
1480 client,
1481 prefix='',
1482 accept_key=lambda k: True,
1483):
1484 ctoken = None
1486 while True:
1487 # list_objects_v2 doesn't like a None value for ContinuationToken
1488 # so we don't set it if we don't have one.
1489 if ctoken:
1490 kwargs = dict(Bucket=bucket_name, Prefix=prefix, ContinuationToken=ctoken)
1491 else:
1492 kwargs = dict(Bucket=bucket_name, Prefix=prefix)
1493 response = client.list_objects_v2(**kwargs)
1494 try:
1495 content = response['Contents']
1496 except KeyError:
1497 pass
1498 else:
1499 for c in content:
1500 key = c['Key']
1501 if accept_key(key):
1502 yield key
1503 ctoken = response.get('NextContinuationToken', None)
1504 if not ctoken:
1505 break
1508def _download_key(key_name, *, client, bucket_name, retries, transfer_config):
1509 # Sometimes, https://github.com/boto/boto/issues/2409 can happen
1510 # because of network issues on either side.
1511 # Retry up to 3 times to ensure its not a transient issue.
1512 for x in range(retries + 1):
1513 try:
1514 content_bytes = _download_fileobj(
1515 client=client,
1516 bucket_name=bucket_name,
1517 key_name=key_name,
1518 transfer_config=transfer_config,
1519 )
1520 except botocore.exceptions.ClientError as err:
1521 #
1522 # ignore 404 not found errors: they mean the object was deleted
1523 # after we listed the contents of the bucket, but before we
1524 # downloaded the object.
1525 #
1526 if 'Error' in err.response and err.response['Error'].get('Code') == '404':
1527 return None, None
1528 # Actually fail on last pass through the loop
1529 if x == retries:
1530 raise
1531 # Otherwise, try again, as this might be a transient timeout
1532 continue
1533 return key_name, content_bytes
1536def _download_fileobj(*, client, bucket_name, key_name, transfer_config):
1537 #
1538 # This is a separate function only because it makes it easier to inject
1539 # exceptions during tests.
1540 #
1541 buf = io.BytesIO()
1542 client.download_fileobj(
1543 Bucket=bucket_name,
1544 Key=key_name,
1545 Fileobj=buf,
1546 Config=transfer_config,
1547 )
1548 return buf.getvalue()