1# Copyright 2017 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Virtual bases classes for uploading media via Google APIs.
16
17Supported here are:
18
19* simple (media) uploads
20* multipart uploads that contain both metadata and a small file as payload
21* resumable uploads (with metadata as well)
22"""
23
24import http.client
25import json
26import os
27import random
28import re
29import sys
30import urllib.parse
31
32from google import resumable_media
33from google.resumable_media import _helpers
34from google.resumable_media import common
35
36from xml.etree import ElementTree
37
38
39_CONTENT_TYPE_HEADER = "content-type"
40_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}"
41_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*"
42_EMPTY_RANGE_TEMPLATE = "bytes */{:d}"
43_BOUNDARY_WIDTH = len(str(sys.maxsize - 1))
44_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH)
45_MULTIPART_SEP = b"--"
46_CRLF = b"\r\n"
47_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n"
48_RELATED_HEADER = b'multipart/related; boundary="'
49_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE)
50_STREAM_ERROR_TEMPLATE = (
51 "Bytes stream is in unexpected state. "
52 "The local stream has had {:d} bytes read from it while "
53 "{:d} bytes have already been updated (they should match)."
54)
55_STREAM_READ_PAST_TEMPLATE = (
56 "{:d} bytes have been read from the stream, which exceeds the expected total {:d}."
57)
58_DELETE = "DELETE"
59_POST = "POST"
60_PUT = "PUT"
61_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = (
62 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the "
63 "remote host, ``{}``, did not match."
64)
65_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
66 "Response metadata had no ``{}`` value; checksum could not be validated."
67)
68_UPLOAD_HEADER_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
69 "Response headers had no ``{}`` value; checksum could not be validated."
70)
71_MPU_INITIATE_QUERY = "?uploads"
72_MPU_PART_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}"
73_S3_COMPAT_XML_NAMESPACE = "{http://s3.amazonaws.com/doc/2006-03-01/}"
74_UPLOAD_ID_NODE = "UploadId"
75_MPU_FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}"
76
77
78class UploadBase(object):
79 """Base class for upload helpers.
80
81 Defines core shared behavior across different upload types.
82
83 Args:
84 upload_url (str): The URL where the content will be uploaded.
85 headers (Optional[Mapping[str, str]]): Extra headers that should
86 be sent with the request, e.g. headers for encrypted data.
87
88 Attributes:
89 upload_url (str): The URL where the content will be uploaded.
90 """
91
92 def __init__(self, upload_url, headers=None):
93 self.upload_url = upload_url
94 if headers is None:
95 headers = {}
96 self._headers = headers
97 self._finished = False
98 self._retry_strategy = common.RetryStrategy()
99
100 @property
101 def finished(self):
102 """bool: Flag indicating if the upload has completed."""
103 return self._finished
104
105 def _process_response(self, response):
106 """Process the response from an HTTP request.
107
108 This is everything that must be done after a request that doesn't
109 require network I/O (or other I/O). This is based on the `sans-I/O`_
110 philosophy.
111
112 Args:
113 response (object): The HTTP response object.
114
115 Raises:
116 ~google.resumable_media.common.InvalidResponse: If the status
117 code is not 200.
118
119 .. _sans-I/O: https://sans-io.readthedocs.io/
120 """
121 # Tombstone the current upload so it cannot be used again (in either
122 # failure or success).
123 self._finished = True
124 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
125
126 @staticmethod
127 def _get_status_code(response):
128 """Access the status code from an HTTP response.
129
130 Args:
131 response (object): The HTTP response object.
132
133 Raises:
134 NotImplementedError: Always, since virtual.
135 """
136 raise NotImplementedError("This implementation is virtual.")
137
138 @staticmethod
139 def _get_headers(response):
140 """Access the headers from an HTTP response.
141
142 Args:
143 response (object): The HTTP response object.
144
145 Raises:
146 NotImplementedError: Always, since virtual.
147 """
148 raise NotImplementedError("This implementation is virtual.")
149
150 @staticmethod
151 def _get_body(response):
152 """Access the response body from an HTTP response.
153
154 Args:
155 response (object): The HTTP response object.
156
157 Raises:
158 NotImplementedError: Always, since virtual.
159 """
160 raise NotImplementedError("This implementation is virtual.")
161
162
163class SimpleUpload(UploadBase):
164 """Upload a resource to a Google API.
165
166 A **simple** media upload sends no metadata and completes the upload
167 in a single request.
168
169 Args:
170 upload_url (str): The URL where the content will be uploaded.
171 headers (Optional[Mapping[str, str]]): Extra headers that should
172 be sent with the request, e.g. headers for encrypted data.
173
174 Attributes:
175 upload_url (str): The URL where the content will be uploaded.
176 """
177
178 def _prepare_request(self, data, content_type):
179 """Prepare the contents of an HTTP request.
180
181 This is everything that must be done before a request that doesn't
182 require network I/O (or other I/O). This is based on the `sans-I/O`_
183 philosophy.
184
185 .. note:
186
187 This method will be used only once, so ``headers`` will be
188 mutated by having a new key added to it.
189
190 Args:
191 data (bytes): The resource content to be uploaded.
192 content_type (str): The content type for the request.
193
194 Returns:
195 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
196
197 * HTTP verb for the request (always POST)
198 * the URL for the request
199 * the body of the request
200 * headers for the request
201
202 Raises:
203 ValueError: If the current upload has already finished.
204 TypeError: If ``data`` isn't bytes.
205
206 .. _sans-I/O: https://sans-io.readthedocs.io/
207 """
208 if self.finished:
209 raise ValueError("An upload can only be used once.")
210
211 if not isinstance(data, bytes):
212 raise TypeError("`data` must be bytes, received", type(data))
213 self._headers[_CONTENT_TYPE_HEADER] = content_type
214 return _POST, self.upload_url, data, self._headers
215
216 def transmit(self, transport, data, content_type, timeout=None):
217 """Transmit the resource to be uploaded.
218
219 Args:
220 transport (object): An object which can make authenticated
221 requests.
222 data (bytes): The resource content to be uploaded.
223 content_type (str): The content type of the resource, e.g. a JPEG
224 image has content type ``image/jpeg``.
225 timeout (Optional[Union[float, Tuple[float, float]]]):
226 The number of seconds to wait for the server response.
227 Depending on the retry strategy, a request may be repeated
228 several times using the same timeout each time.
229
230 Can also be passed as a tuple (connect_timeout, read_timeout).
231 See :meth:`requests.Session.request` documentation for details.
232
233 Raises:
234 NotImplementedError: Always, since virtual.
235 """
236 raise NotImplementedError("This implementation is virtual.")
237
238
239class MultipartUpload(UploadBase):
240 """Upload a resource with metadata to a Google API.
241
242 A **multipart** upload sends both metadata and the resource in a single
243 (multipart) request.
244
245 Args:
246 upload_url (str): The URL where the content will be uploaded.
247 headers (Optional[Mapping[str, str]]): Extra headers that should
248 be sent with the request, e.g. headers for encrypted data.
249 checksum (Optional([str])): The type of checksum to compute to verify
250 the integrity of the object. The request metadata will be amended
251 to include the computed value. Using this option will override a
252 manually-set checksum value. Supported values are "md5", "crc32c"
253 and None. The default is None.
254
255 Attributes:
256 upload_url (str): The URL where the content will be uploaded.
257 """
258
259 def __init__(self, upload_url, headers=None, checksum=None):
260 super(MultipartUpload, self).__init__(upload_url, headers=headers)
261 self._checksum_type = checksum
262
263 def _prepare_request(self, data, metadata, content_type):
264 """Prepare the contents of an HTTP request.
265
266 This is everything that must be done before a request that doesn't
267 require network I/O (or other I/O). This is based on the `sans-I/O`_
268 philosophy.
269
270 .. note:
271
272 This method will be used only once, so ``headers`` will be
273 mutated by having a new key added to it.
274
275 Args:
276 data (bytes): The resource content to be uploaded.
277 metadata (Mapping[str, str]): The resource metadata, such as an
278 ACL list.
279 content_type (str): The content type of the resource, e.g. a JPEG
280 image has content type ``image/jpeg``.
281
282 Returns:
283 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
284
285 * HTTP verb for the request (always POST)
286 * the URL for the request
287 * the body of the request
288 * headers for the request
289
290 Raises:
291 ValueError: If the current upload has already finished.
292 TypeError: If ``data`` isn't bytes.
293
294 .. _sans-I/O: https://sans-io.readthedocs.io/
295 """
296 if self.finished:
297 raise ValueError("An upload can only be used once.")
298
299 if not isinstance(data, bytes):
300 raise TypeError("`data` must be bytes, received", type(data))
301
302 checksum_object = _helpers._get_checksum_object(self._checksum_type)
303 if checksum_object is not None:
304 checksum_object.update(data)
305 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest())
306 metadata_key = _helpers._get_metadata_key(self._checksum_type)
307 metadata[metadata_key] = actual_checksum
308
309 content, multipart_boundary = construct_multipart_request(
310 data, metadata, content_type
311 )
312 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"'
313 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type
314
315 return _POST, self.upload_url, content, self._headers
316
317 def transmit(self, transport, data, metadata, content_type, timeout=None):
318 """Transmit the resource to be uploaded.
319
320 Args:
321 transport (object): An object which can make authenticated
322 requests.
323 data (bytes): The resource content to be uploaded.
324 metadata (Mapping[str, str]): The resource metadata, such as an
325 ACL list.
326 content_type (str): The content type of the resource, e.g. a JPEG
327 image has content type ``image/jpeg``.
328 timeout (Optional[Union[float, Tuple[float, float]]]):
329 The number of seconds to wait for the server response.
330 Depending on the retry strategy, a request may be repeated
331 several times using the same timeout each time.
332
333 Can also be passed as a tuple (connect_timeout, read_timeout).
334 See :meth:`requests.Session.request` documentation for details.
335
336 Raises:
337 NotImplementedError: Always, since virtual.
338 """
339 raise NotImplementedError("This implementation is virtual.")
340
341
342class ResumableUpload(UploadBase):
343 """Initiate and fulfill a resumable upload to a Google API.
344
345 A **resumable** upload sends an initial request with the resource metadata
346 and then gets assigned an upload ID / upload URL to send bytes to.
347 Using the upload URL, the upload is then done in chunks (determined by
348 the user) until all bytes have been uploaded.
349
350 Args:
351 upload_url (str): The URL where the resumable upload will be initiated.
352 chunk_size (int): The size of each chunk used to upload the resource.
353 headers (Optional[Mapping[str, str]]): Extra headers that should
354 be sent with every request.
355 checksum (Optional([str])): The type of checksum to compute to verify
356 the integrity of the object. After the upload is complete, the
357 server-computed checksum of the resulting object will be read
358 and google.resumable_media.common.DataCorruption will be raised on
359 a mismatch. The corrupted file will not be deleted from the remote
360 host automatically. Supported values are "md5", "crc32c" and None.
361 The default is None.
362
363 Attributes:
364 upload_url (str): The URL where the content will be uploaded.
365
366 Raises:
367 ValueError: If ``chunk_size`` is not a multiple of
368 :data:`.UPLOAD_CHUNK_SIZE`.
369 """
370
371 def __init__(self, upload_url, chunk_size, checksum=None, headers=None):
372 super(ResumableUpload, self).__init__(upload_url, headers=headers)
373 if chunk_size % resumable_media.UPLOAD_CHUNK_SIZE != 0:
374 raise ValueError(
375 "{} KB must divide chunk size".format(
376 resumable_media.UPLOAD_CHUNK_SIZE / 1024
377 )
378 )
379 self._chunk_size = chunk_size
380 self._stream = None
381 self._content_type = None
382 self._bytes_uploaded = 0
383 self._bytes_checksummed = 0
384 self._checksum_type = checksum
385 self._checksum_object = None
386 self._total_bytes = None
387 self._resumable_url = None
388 self._invalid = False
389
390 @property
391 def invalid(self):
392 """bool: Indicates if the upload is in an invalid state.
393
394 This will occur if a call to :meth:`transmit_next_chunk` fails.
395 To recover from such a failure, call :meth:`recover`.
396 """
397 return self._invalid
398
399 @property
400 def chunk_size(self):
401 """int: The size of each chunk used to upload the resource."""
402 return self._chunk_size
403
404 @property
405 def resumable_url(self):
406 """Optional[str]: The URL of the in-progress resumable upload."""
407 return self._resumable_url
408
409 @property
410 def bytes_uploaded(self):
411 """int: Number of bytes that have been uploaded."""
412 return self._bytes_uploaded
413
414 @property
415 def total_bytes(self):
416 """Optional[int]: The total number of bytes to be uploaded.
417
418 If this upload is initiated (via :meth:`initiate`) with
419 ``stream_final=True``, this value will be populated based on the size
420 of the ``stream`` being uploaded. (By default ``stream_final=True``.)
421
422 If this upload is initiated with ``stream_final=False``,
423 :attr:`total_bytes` will be :data:`None` since it cannot be
424 determined from the stream.
425 """
426 return self._total_bytes
427
428 def _prepare_initiate_request(
429 self, stream, metadata, content_type, total_bytes=None, stream_final=True
430 ):
431 """Prepare the contents of HTTP request to initiate upload.
432
433 This is everything that must be done before a request that doesn't
434 require network I/O (or other I/O). This is based on the `sans-I/O`_
435 philosophy.
436
437 Args:
438 stream (IO[bytes]): The stream (i.e. file-like object) that will
439 be uploaded. The stream **must** be at the beginning (i.e.
440 ``stream.tell() == 0``).
441 metadata (Mapping[str, str]): The resource metadata, such as an
442 ACL list.
443 content_type (str): The content type of the resource, e.g. a JPEG
444 image has content type ``image/jpeg``.
445 total_bytes (Optional[int]): The total number of bytes to be
446 uploaded. If specified, the upload size **will not** be
447 determined from the stream (even if ``stream_final=True``).
448 stream_final (Optional[bool]): Indicates if the ``stream`` is
449 "final" (i.e. no more bytes will be added to it). In this case
450 we determine the upload size from the size of the stream. If
451 ``total_bytes`` is passed, this argument will be ignored.
452
453 Returns:
454 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
455
456 * HTTP verb for the request (always POST)
457 * the URL for the request
458 * the body of the request
459 * headers for the request
460
461 Raises:
462 ValueError: If the current upload has already been initiated.
463 ValueError: If ``stream`` is not at the beginning.
464
465 .. _sans-I/O: https://sans-io.readthedocs.io/
466 """
467 if self.resumable_url is not None:
468 raise ValueError("This upload has already been initiated.")
469 if stream.tell() != 0:
470 raise ValueError("Stream must be at beginning.")
471
472 self._stream = stream
473 self._content_type = content_type
474
475 # Signed URL requires content type set directly - not through x-upload-content-type
476 parse_result = urllib.parse.urlparse(self.upload_url)
477 parsed_query = urllib.parse.parse_qs(parse_result.query)
478 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query:
479 # Deconstruct **self._headers first so that content type defined here takes priority
480 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type}
481 else:
482 # Deconstruct **self._headers first so that content type defined here takes priority
483 headers = {
484 **self._headers,
485 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8",
486 "x-upload-content-type": content_type,
487 }
488 # Set the total bytes if possible.
489 if total_bytes is not None:
490 self._total_bytes = total_bytes
491 elif stream_final:
492 self._total_bytes = get_total_bytes(stream)
493 # Add the total bytes to the headers if set.
494 if self._total_bytes is not None:
495 content_length = "{:d}".format(self._total_bytes)
496 headers["x-upload-content-length"] = content_length
497
498 payload = json.dumps(metadata).encode("utf-8")
499 return _POST, self.upload_url, payload, headers
500
501 def _process_initiate_response(self, response):
502 """Process the response from an HTTP request that initiated upload.
503
504 This is everything that must be done after a request that doesn't
505 require network I/O (or other I/O). This is based on the `sans-I/O`_
506 philosophy.
507
508 This method takes the URL from the ``Location`` header and stores it
509 for future use. Within that URL, we assume the ``upload_id`` query
510 parameter has been included, but we do not check.
511
512 Args:
513 response (object): The HTTP response object (need headers).
514
515 .. _sans-I/O: https://sans-io.readthedocs.io/
516 """
517 _helpers.require_status_code(
518 response,
519 (http.client.OK, http.client.CREATED),
520 self._get_status_code,
521 callback=self._make_invalid,
522 )
523 self._resumable_url = _helpers.header_required(
524 response, "location", self._get_headers
525 )
526
527 def initiate(
528 self,
529 transport,
530 stream,
531 metadata,
532 content_type,
533 total_bytes=None,
534 stream_final=True,
535 timeout=None,
536 ):
537 """Initiate a resumable upload.
538
539 By default, this method assumes your ``stream`` is in a "final"
540 state ready to transmit. However, ``stream_final=False`` can be used
541 to indicate that the size of the resource is not known. This can happen
542 if bytes are being dynamically fed into ``stream``, e.g. if the stream
543 is attached to application logs.
544
545 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
546 read from the stream every time :meth:`transmit_next_chunk` is called.
547 If one of those reads produces strictly fewer bites than the chunk
548 size, the upload will be concluded.
549
550 Args:
551 transport (object): An object which can make authenticated
552 requests.
553 stream (IO[bytes]): The stream (i.e. file-like object) that will
554 be uploaded. The stream **must** be at the beginning (i.e.
555 ``stream.tell() == 0``).
556 metadata (Mapping[str, str]): The resource metadata, such as an
557 ACL list.
558 content_type (str): The content type of the resource, e.g. a JPEG
559 image has content type ``image/jpeg``.
560 total_bytes (Optional[int]): The total number of bytes to be
561 uploaded. If specified, the upload size **will not** be
562 determined from the stream (even if ``stream_final=True``).
563 stream_final (Optional[bool]): Indicates if the ``stream`` is
564 "final" (i.e. no more bytes will be added to it). In this case
565 we determine the upload size from the size of the stream. If
566 ``total_bytes`` is passed, this argument will be ignored.
567 timeout (Optional[Union[float, Tuple[float, float]]]):
568 The number of seconds to wait for the server response.
569 Depending on the retry strategy, a request may be repeated
570 several times using the same timeout each time.
571
572 Can also be passed as a tuple (connect_timeout, read_timeout).
573 See :meth:`requests.Session.request` documentation for details.
574
575 Raises:
576 NotImplementedError: Always, since virtual.
577 """
578 raise NotImplementedError("This implementation is virtual.")
579
580 def _prepare_request(self):
581 """Prepare the contents of HTTP request to upload a chunk.
582
583 This is everything that must be done before a request that doesn't
584 require network I/O. This is based on the `sans-I/O`_ philosophy.
585
586 For the time being, this **does require** some form of I/O to read
587 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this
588 will (almost) certainly not be network I/O.
589
590 Returns:
591 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
592
593 * HTTP verb for the request (always PUT)
594 * the URL for the request
595 * the body of the request
596 * headers for the request
597
598 The headers incorporate the ``_headers`` on the current instance.
599
600 Raises:
601 ValueError: If the current upload has finished.
602 ValueError: If the current upload is in an invalid state.
603 ValueError: If the current upload has not been initiated.
604 ValueError: If the location in the stream (i.e. ``stream.tell()``)
605 does not agree with ``bytes_uploaded``.
606
607 .. _sans-I/O: https://sans-io.readthedocs.io/
608 """
609 if self.finished:
610 raise ValueError("Upload has finished.")
611 if self.invalid:
612 raise ValueError(
613 "Upload is in an invalid state. To recover call `recover()`."
614 )
615 if self.resumable_url is None:
616 raise ValueError(
617 "This upload has not been initiated. Please call "
618 "initiate() before beginning to transmit chunks."
619 )
620
621 start_byte, payload, content_range = get_next_chunk(
622 self._stream, self._chunk_size, self._total_bytes
623 )
624 if start_byte != self.bytes_uploaded:
625 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded)
626 raise ValueError(msg)
627
628 self._update_checksum(start_byte, payload)
629
630 headers = {
631 **self._headers,
632 _CONTENT_TYPE_HEADER: self._content_type,
633 _helpers.CONTENT_RANGE_HEADER: content_range,
634 }
635 return _PUT, self.resumable_url, payload, headers
636
637 def _update_checksum(self, start_byte, payload):
638 """Update the checksum with the payload if not already updated.
639
640 Because error recovery can result in bytes being transmitted more than
641 once, the checksum tracks the number of bytes checked in
642 self._bytes_checksummed and skips bytes that have already been summed.
643 """
644 if not self._checksum_type:
645 return
646
647 if not self._checksum_object:
648 self._checksum_object = _helpers._get_checksum_object(self._checksum_type)
649
650 if start_byte < self._bytes_checksummed:
651 offset = self._bytes_checksummed - start_byte
652 data = payload[offset:]
653 else:
654 data = payload
655
656 self._checksum_object.update(data)
657 self._bytes_checksummed += len(data)
658
659 def _make_invalid(self):
660 """Simple setter for ``invalid``.
661
662 This is intended to be passed along as a callback to helpers that
663 raise an exception so they can mark this instance as invalid before
664 raising.
665 """
666 self._invalid = True
667
668 def _process_resumable_response(self, response, bytes_sent):
669 """Process the response from an HTTP request.
670
671 This is everything that must be done after a request that doesn't
672 require network I/O (or other I/O). This is based on the `sans-I/O`_
673 philosophy.
674
675 Args:
676 response (object): The HTTP response object.
677 bytes_sent (int): The number of bytes sent in the request that
678 ``response`` was returned for.
679
680 Raises:
681 ~google.resumable_media.common.InvalidResponse: If the status
682 code is 308 and the ``range`` header is not of the form
683 ``bytes 0-{end}``.
684 ~google.resumable_media.common.InvalidResponse: If the status
685 code is not 200 or 308.
686
687 .. _sans-I/O: https://sans-io.readthedocs.io/
688 """
689 status_code = _helpers.require_status_code(
690 response,
691 (http.client.OK, http.client.PERMANENT_REDIRECT),
692 self._get_status_code,
693 callback=self._make_invalid,
694 )
695 if status_code == http.client.OK:
696 # NOTE: We use the "local" information of ``bytes_sent`` to update
697 # ``bytes_uploaded``, but do not verify this against other
698 # state. However, there may be some other information:
699 #
700 # * a ``size`` key in JSON response body
701 # * the ``total_bytes`` attribute (if set)
702 # * ``stream.tell()`` (relying on fact that ``initiate()``
703 # requires stream to be at the beginning)
704 self._bytes_uploaded = self._bytes_uploaded + bytes_sent
705 # Tombstone the current upload so it cannot be used again.
706 self._finished = True
707 # Validate the checksum. This can raise an exception on failure.
708 self._validate_checksum(response)
709 else:
710 bytes_range = _helpers.header_required(
711 response,
712 _helpers.RANGE_HEADER,
713 self._get_headers,
714 callback=self._make_invalid,
715 )
716 match = _BYTES_RANGE_RE.match(bytes_range)
717 if match is None:
718 self._make_invalid()
719 raise common.InvalidResponse(
720 response,
721 'Unexpected "range" header',
722 bytes_range,
723 'Expected to be of the form "bytes=0-{end}"',
724 )
725 self._bytes_uploaded = int(match.group("end_byte")) + 1
726
727 def _validate_checksum(self, response):
728 """Check the computed checksum, if any, against the recieved metadata.
729
730 Args:
731 response (object): The HTTP response object.
732
733 Raises:
734 ~google.resumable_media.common.DataCorruption: If the checksum
735 computed locally and the checksum reported by the remote host do
736 not match.
737 """
738 if self._checksum_type is None:
739 return
740 metadata_key = _helpers._get_metadata_key(self._checksum_type)
741 metadata = response.json()
742 remote_checksum = metadata.get(metadata_key)
743 if remote_checksum is None:
744 raise common.InvalidResponse(
745 response,
746 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
747 self._get_headers(response),
748 )
749 local_checksum = _helpers.prepare_checksum_digest(
750 self._checksum_object.digest()
751 )
752 if local_checksum != remote_checksum:
753 raise common.DataCorruption(
754 response,
755 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
756 self._checksum_type.upper(), local_checksum, remote_checksum
757 ),
758 )
759
760 def transmit_next_chunk(self, transport, timeout=None):
761 """Transmit the next chunk of the resource to be uploaded.
762
763 If the current upload was initiated with ``stream_final=False``,
764 this method will dynamically determine if the upload has completed.
765 The upload will be considered complete if the stream produces
766 fewer than :attr:`chunk_size` bytes when a chunk is read from it.
767
768 Args:
769 transport (object): An object which can make authenticated
770 requests.
771 timeout (Optional[Union[float, Tuple[float, float]]]):
772 The number of seconds to wait for the server response.
773 Depending on the retry strategy, a request may be repeated
774 several times using the same timeout each time.
775
776 Can also be passed as a tuple (connect_timeout, read_timeout).
777 See :meth:`requests.Session.request` documentation for details.
778
779 Raises:
780 NotImplementedError: Always, since virtual.
781 """
782 raise NotImplementedError("This implementation is virtual.")
783
784 def _prepare_recover_request(self):
785 """Prepare the contents of HTTP request to recover from failure.
786
787 This is everything that must be done before a request that doesn't
788 require network I/O. This is based on the `sans-I/O`_ philosophy.
789
790 We assume that the :attr:`resumable_url` is set (i.e. the only way
791 the upload can end up :attr:`invalid` is if it has been initiated.
792
793 Returns:
794 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple
795
796 * HTTP verb for the request (always PUT)
797 * the URL for the request
798 * the body of the request (always :data:`None`)
799 * headers for the request
800
801 The headers **do not** incorporate the ``_headers`` on the
802 current instance.
803
804 .. _sans-I/O: https://sans-io.readthedocs.io/
805 """
806 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"}
807 return _PUT, self.resumable_url, None, headers
808
809 def _process_recover_response(self, response):
810 """Process the response from an HTTP request to recover from failure.
811
812 This is everything that must be done after a request that doesn't
813 require network I/O (or other I/O). This is based on the `sans-I/O`_
814 philosophy.
815
816 Args:
817 response (object): The HTTP response object.
818
819 Raises:
820 ~google.resumable_media.common.InvalidResponse: If the status
821 code is not 308.
822 ~google.resumable_media.common.InvalidResponse: If the status
823 code is 308 and the ``range`` header is not of the form
824 ``bytes 0-{end}``.
825
826 .. _sans-I/O: https://sans-io.readthedocs.io/
827 """
828 _helpers.require_status_code(
829 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code
830 )
831 headers = self._get_headers(response)
832 if _helpers.RANGE_HEADER in headers:
833 bytes_range = headers[_helpers.RANGE_HEADER]
834 match = _BYTES_RANGE_RE.match(bytes_range)
835 if match is None:
836 raise common.InvalidResponse(
837 response,
838 'Unexpected "range" header',
839 bytes_range,
840 'Expected to be of the form "bytes=0-{end}"',
841 )
842 self._bytes_uploaded = int(match.group("end_byte")) + 1
843 else:
844 # In this case, the upload has not "begun".
845 self._bytes_uploaded = 0
846
847 self._stream.seek(self._bytes_uploaded)
848 self._invalid = False
849
850 def recover(self, transport):
851 """Recover from a failure.
852
853 This method should be used when a :class:`ResumableUpload` is in an
854 :attr:`~ResumableUpload.invalid` state due to a request failure.
855
856 This will verify the progress with the server and make sure the
857 current upload is in a valid state before :meth:`transmit_next_chunk`
858 can be used again.
859
860 Args:
861 transport (object): An object which can make authenticated
862 requests.
863
864 Raises:
865 NotImplementedError: Always, since virtual.
866 """
867 raise NotImplementedError("This implementation is virtual.")
868
869
870class XMLMPUContainer(UploadBase):
871 """Initiate and close an upload using the XML MPU API.
872
873 An XML MPU sends an initial request and then receives an upload ID.
874 Using the upload ID, the upload is then done in numbered parts and the
875 parts can be uploaded concurrently.
876
877 In order to avoid concurrency issues with this container object, the
878 uploading of individual parts is handled separately, by XMLMPUPart objects
879 spawned from this container class. The XMLMPUPart objects are not
880 necessarily in the same process as the container, so they do not update the
881 container automatically.
882
883 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous
884 given the JSON multipart upload, so the abbreviation "MPU" will be used
885 throughout.
886
887 See: https://cloud.google.com/storage/docs/multipart-uploads
888
889 Args:
890 upload_url (str): The URL of the object (without query parameters). The
891 initiate, PUT, and finalization requests will all use this URL, with
892 varying query parameters.
893 filename (str): The name (path) of the file to upload.
894 headers (Optional[Mapping[str, str]]): Extra headers that should
895 be sent with every request.
896
897 Attributes:
898 upload_url (str): The URL where the content will be uploaded.
899 upload_id (Optional(str)): The ID of the upload from the initialization
900 response.
901 """
902
903 def __init__(self, upload_url, filename, headers=None, upload_id=None):
904 super().__init__(upload_url, headers=headers)
905 self._filename = filename
906 self._upload_id = upload_id
907 self._parts = {}
908
909 @property
910 def upload_id(self):
911 return self._upload_id
912
913 def register_part(self, part_number, etag):
914 """Register an uploaded part by part number and corresponding etag.
915
916 XMLMPUPart objects represent individual parts, and their part number
917 and etag can be registered to the container object with this method
918 and therefore incorporated in the finalize() call to finish the upload.
919
920 This method accepts part_number and etag, but not XMLMPUPart objects
921 themselves, to reduce the complexity involved in running XMLMPUPart
922 uploads in separate processes.
923
924 Args:
925 part_number (int): The part number. Parts are assembled into the
926 final uploaded object with finalize() in order of their part
927 numbers.
928 etag (str): The etag included in the server response after upload.
929 """
930 self._parts[part_number] = etag
931
932 def _prepare_initiate_request(self, content_type):
933 """Prepare the contents of HTTP request to initiate upload.
934
935 This is everything that must be done before a request that doesn't
936 require network I/O (or other I/O). This is based on the `sans-I/O`_
937 philosophy.
938
939 Args:
940 content_type (str): The content type of the resource, e.g. a JPEG
941 image has content type ``image/jpeg``.
942
943 Returns:
944 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
945
946 * HTTP verb for the request (always POST)
947 * the URL for the request
948 * the body of the request
949 * headers for the request
950
951 Raises:
952 ValueError: If the current upload has already been initiated.
953
954 .. _sans-I/O: https://sans-io.readthedocs.io/
955 """
956 if self.upload_id is not None:
957 raise ValueError("This upload has already been initiated.")
958
959 initiate_url = self.upload_url + _MPU_INITIATE_QUERY
960
961 headers = {
962 **self._headers,
963 _CONTENT_TYPE_HEADER: content_type,
964 }
965 return _POST, initiate_url, None, headers
966
967 def _process_initiate_response(self, response):
968 """Process the response from an HTTP request that initiated the upload.
969
970 This is everything that must be done after a request that doesn't
971 require network I/O (or other I/O). This is based on the `sans-I/O`_
972 philosophy.
973
974 This method takes the URL from the ``Location`` header and stores it
975 for future use. Within that URL, we assume the ``upload_id`` query
976 parameter has been included, but we do not check.
977
978 Args:
979 response (object): The HTTP response object.
980
981 Raises:
982 ~google.resumable_media.common.InvalidResponse: If the status
983 code is not 200.
984
985 .. _sans-I/O: https://sans-io.readthedocs.io/
986 """
987 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
988 root = ElementTree.fromstring(response.text)
989 self._upload_id = root.find(_S3_COMPAT_XML_NAMESPACE + _UPLOAD_ID_NODE).text
990
991 def initiate(
992 self,
993 transport,
994 content_type,
995 timeout=None,
996 ):
997 """Initiate an MPU and record the upload ID.
998
999 Args:
1000 transport (object): An object which can make authenticated
1001 requests.
1002 content_type (str): The content type of the resource, e.g. a JPEG
1003 image has content type ``image/jpeg``.
1004 timeout (Optional[Union[float, Tuple[float, float]]]):
1005 The number of seconds to wait for the server response.
1006 Depending on the retry strategy, a request may be repeated
1007 several times using the same timeout each time.
1008
1009 Can also be passed as a tuple (connect_timeout, read_timeout).
1010 See :meth:`requests.Session.request` documentation for details.
1011
1012 Raises:
1013 NotImplementedError: Always, since virtual.
1014 """
1015 raise NotImplementedError("This implementation is virtual.")
1016
1017 def _prepare_finalize_request(self):
1018 """Prepare the contents of an HTTP request to finalize the upload.
1019
1020 All of the parts must be registered before calling this method.
1021
1022 Returns:
1023 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1024
1025 * HTTP verb for the request (always POST)
1026 * the URL for the request
1027 * the body of the request
1028 * headers for the request
1029
1030 Raises:
1031 ValueError: If the upload has not been initiated.
1032 """
1033 if self.upload_id is None:
1034 raise ValueError("This upload has not yet been initiated.")
1035
1036 final_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id)
1037 finalize_url = self.upload_url + final_query
1038 final_xml_root = ElementTree.Element("CompleteMultipartUpload")
1039 for part_number, etag in self._parts.items():
1040 part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop
1041 ElementTree.SubElement(part, "PartNumber").text = str(part_number)
1042 ElementTree.SubElement(part, "ETag").text = etag
1043 payload = ElementTree.tostring(final_xml_root)
1044 return _POST, finalize_url, payload, self._headers
1045
1046 def _process_finalize_response(self, response):
1047 """Process the response from an HTTP request that finalized the upload.
1048
1049 This is everything that must be done after a request that doesn't
1050 require network I/O (or other I/O). This is based on the `sans-I/O`_
1051 philosophy.
1052
1053 Args:
1054 response (object): The HTTP response object.
1055
1056 Raises:
1057 ~google.resumable_media.common.InvalidResponse: If the status
1058 code is not 200.
1059
1060 .. _sans-I/O: https://sans-io.readthedocs.io/
1061 """
1062
1063 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
1064 self._finished = True
1065
1066 def finalize(
1067 self,
1068 transport,
1069 timeout=None,
1070 ):
1071 """Finalize an MPU request with all the parts.
1072
1073 Args:
1074 transport (object): An object which can make authenticated
1075 requests.
1076 timeout (Optional[Union[float, Tuple[float, float]]]):
1077 The number of seconds to wait for the server response.
1078 Depending on the retry strategy, a request may be repeated
1079 several times using the same timeout each time.
1080
1081 Can also be passed as a tuple (connect_timeout, read_timeout).
1082 See :meth:`requests.Session.request` documentation for details.
1083
1084 Raises:
1085 NotImplementedError: Always, since virtual.
1086 """
1087 raise NotImplementedError("This implementation is virtual.")
1088
1089 def _prepare_cancel_request(self):
1090 """Prepare the contents of an HTTP request to cancel the upload.
1091
1092 Returns:
1093 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1094
1095 * HTTP verb for the request (always DELETE)
1096 * the URL for the request
1097 * the body of the request
1098 * headers for the request
1099
1100 Raises:
1101 ValueError: If the upload has not been initiated.
1102 """
1103 if self.upload_id is None:
1104 raise ValueError("This upload has not yet been initiated.")
1105
1106 cancel_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id)
1107 cancel_url = self.upload_url + cancel_query
1108 return _DELETE, cancel_url, None, self._headers
1109
1110 def _process_cancel_response(self, response):
1111 """Process the response from an HTTP request that canceled the upload.
1112
1113 This is everything that must be done after a request that doesn't
1114 require network I/O (or other I/O). This is based on the `sans-I/O`_
1115 philosophy.
1116
1117 Args:
1118 response (object): The HTTP response object.
1119
1120 Raises:
1121 ~google.resumable_media.common.InvalidResponse: If the status
1122 code is not 204.
1123
1124 .. _sans-I/O: https://sans-io.readthedocs.io/
1125 """
1126
1127 _helpers.require_status_code(
1128 response, (http.client.NO_CONTENT,), self._get_status_code
1129 )
1130
1131 def cancel(
1132 self,
1133 transport,
1134 timeout=None,
1135 ):
1136 """Cancel an MPU request and permanently delete any uploaded parts.
1137
1138 This cannot be undone.
1139
1140 Args:
1141 transport (object): An object which can make authenticated
1142 requests.
1143 timeout (Optional[Union[float, Tuple[float, float]]]):
1144 The number of seconds to wait for the server response.
1145 Depending on the retry strategy, a request may be repeated
1146 several times using the same timeout each time.
1147
1148 Can also be passed as a tuple (connect_timeout, read_timeout).
1149 See :meth:`requests.Session.request` documentation for details.
1150
1151 Raises:
1152 NotImplementedError: Always, since virtual.
1153 """
1154 raise NotImplementedError("This implementation is virtual.")
1155
1156
1157class XMLMPUPart(UploadBase):
1158 """Upload a single part of an existing XML MPU container.
1159
1160 An XML MPU sends an initial request and then receives an upload ID.
1161 Using the upload ID, the upload is then done in numbered parts and the
1162 parts can be uploaded concurrently.
1163
1164 In order to avoid concurrency issues with the container object, the
1165 uploading of individual parts is handled separately by multiple objects
1166 of this class. Once a part is uploaded, it can be registered with the
1167 container with `container.register_part(part.part_number, part.etag)`.
1168
1169 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous
1170 given the JSON multipart upload, so the abbreviation "MPU" will be used
1171 throughout.
1172
1173 See: https://cloud.google.com/storage/docs/multipart-uploads
1174
1175 Args:
1176 upload_url (str): The URL of the object (without query parameters).
1177 upload_id (str): The ID of the upload from the initialization response.
1178 filename (str): The name (path) of the file to upload.
1179 start (int): The byte index of the beginning of the part.
1180 end (int): The byte index of the end of the part.
1181 part_number (int): The part number. Part numbers will be assembled in
1182 sequential order when the container is finalized.
1183 headers (Optional[Mapping[str, str]]): Extra headers that should
1184 be sent with every request.
1185 checksum (Optional([str])): The type of checksum to compute to verify
1186 the integrity of the object. The request headers will be amended
1187 to include the computed value. Supported values are "md5", "crc32c"
1188 and None. The default is None.
1189
1190 Attributes:
1191 upload_url (str): The URL of the object (without query parameters).
1192 upload_id (str): The ID of the upload from the initialization response.
1193 filename (str): The name (path) of the file to upload.
1194 start (int): The byte index of the beginning of the part.
1195 end (int): The byte index of the end of the part.
1196 part_number (int): The part number. Part numbers will be assembled in
1197 sequential order when the container is finalized.
1198 etag (Optional(str)): The etag returned by the service after upload.
1199 """
1200
1201 def __init__(
1202 self,
1203 upload_url,
1204 upload_id,
1205 filename,
1206 start,
1207 end,
1208 part_number,
1209 headers=None,
1210 checksum=None,
1211 ):
1212 super().__init__(upload_url, headers=headers)
1213 self._filename = filename
1214 self._start = start
1215 self._end = end
1216 self._upload_id = upload_id
1217 self._part_number = part_number
1218 self._etag = None
1219 self._checksum_type = checksum
1220 self._checksum_object = None
1221
1222 @property
1223 def part_number(self):
1224 return self._part_number
1225
1226 @property
1227 def upload_id(self):
1228 return self._upload_id
1229
1230 @property
1231 def filename(self):
1232 return self._filename
1233
1234 @property
1235 def etag(self):
1236 return self._etag
1237
1238 @property
1239 def start(self):
1240 return self._start
1241
1242 @property
1243 def end(self):
1244 return self._end
1245
1246 def _prepare_upload_request(self):
1247 """Prepare the contents of HTTP request to upload a part.
1248
1249 This is everything that must be done before a request that doesn't
1250 require network I/O. This is based on the `sans-I/O`_ philosophy.
1251
1252 For the time being, this **does require** some form of I/O to read
1253 a part from ``stream`` (via :func:`get_part_payload`). However, this
1254 will (almost) certainly not be network I/O.
1255
1256 Returns:
1257 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1258
1259 * HTTP verb for the request (always PUT)
1260 * the URL for the request
1261 * the body of the request
1262 * headers for the request
1263
1264 The headers incorporate the ``_headers`` on the current instance.
1265
1266 Raises:
1267 ValueError: If the current upload has finished.
1268
1269 .. _sans-I/O: https://sans-io.readthedocs.io/
1270 """
1271 if self.finished:
1272 raise ValueError("This part has already been uploaded.")
1273
1274 with open(self._filename, "br") as f:
1275 f.seek(self._start)
1276 payload = f.read(self._end - self._start)
1277
1278 self._checksum_object = _helpers._get_checksum_object(self._checksum_type)
1279 if self._checksum_object is not None:
1280 self._checksum_object.update(payload)
1281
1282 part_query = _MPU_PART_QUERY_TEMPLATE.format(
1283 part=self._part_number, upload_id=self._upload_id
1284 )
1285 upload_url = self.upload_url + part_query
1286 return _PUT, upload_url, payload, self._headers
1287
1288 def _process_upload_response(self, response):
1289 """Process the response from an HTTP request.
1290
1291 This is everything that must be done after a request that doesn't
1292 require network I/O (or other I/O). This is based on the `sans-I/O`_
1293 philosophy.
1294
1295 Args:
1296 response (object): The HTTP response object.
1297
1298 Raises:
1299 ~google.resumable_media.common.InvalidResponse: If the status
1300 code is not 200 or the response is missing data.
1301
1302 .. _sans-I/O: https://sans-io.readthedocs.io/
1303 """
1304 _helpers.require_status_code(
1305 response,
1306 (http.client.OK,),
1307 self._get_status_code,
1308 )
1309
1310 self._validate_checksum(response)
1311
1312 etag = _helpers.header_required(response, "etag", self._get_headers)
1313 self._etag = etag
1314 self._finished = True
1315
1316 def upload(
1317 self,
1318 transport,
1319 timeout=None,
1320 ):
1321 """Upload the part.
1322
1323 Args:
1324 transport (object): An object which can make authenticated
1325 requests.
1326 timeout (Optional[Union[float, Tuple[float, float]]]):
1327 The number of seconds to wait for the server response.
1328 Depending on the retry strategy, a request may be repeated
1329 several times using the same timeout each time.
1330
1331 Can also be passed as a tuple (connect_timeout, read_timeout).
1332 See :meth:`requests.Session.request` documentation for details.
1333
1334 Raises:
1335 NotImplementedError: Always, since virtual.
1336 """
1337 raise NotImplementedError("This implementation is virtual.")
1338
1339 def _validate_checksum(self, response):
1340 """Check the computed checksum, if any, against the response headers.
1341
1342 Args:
1343 response (object): The HTTP response object.
1344
1345 Raises:
1346 ~google.resumable_media.common.DataCorruption: If the checksum
1347 computed locally and the checksum reported by the remote host do
1348 not match.
1349 """
1350 if self._checksum_type is None:
1351 return
1352
1353 remote_checksum = _helpers._get_uploaded_checksum_from_headers(
1354 response, self._get_headers, self._checksum_type
1355 )
1356
1357 if remote_checksum is None:
1358 metadata_key = _helpers._get_metadata_key(self._checksum_type)
1359 raise common.InvalidResponse(
1360 response,
1361 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
1362 self._get_headers(response),
1363 )
1364 local_checksum = _helpers.prepare_checksum_digest(
1365 self._checksum_object.digest()
1366 )
1367 if local_checksum != remote_checksum:
1368 raise common.DataCorruption(
1369 response,
1370 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
1371 self._checksum_type.upper(), local_checksum, remote_checksum
1372 ),
1373 )
1374
1375
1376def get_boundary():
1377 """Get a random boundary for a multipart request.
1378
1379 Returns:
1380 bytes: The boundary used to separate parts of a multipart request.
1381 """
1382 random_int = random.randrange(sys.maxsize)
1383 boundary = _BOUNDARY_FORMAT.format(random_int)
1384 # NOTE: Neither % formatting nor .format() are available for byte strings
1385 # in Python 3.4, so we must use unicode strings as templates.
1386 return boundary.encode("utf-8")
1387
1388
1389def construct_multipart_request(data, metadata, content_type):
1390 """Construct a multipart request body.
1391
1392 Args:
1393 data (bytes): The resource content (UTF-8 encoded as bytes)
1394 to be uploaded.
1395 metadata (Mapping[str, str]): The resource metadata, such as an
1396 ACL list.
1397 content_type (str): The content type of the resource, e.g. a JPEG
1398 image has content type ``image/jpeg``.
1399
1400 Returns:
1401 Tuple[bytes, bytes]: The multipart request body and the boundary used
1402 between each part.
1403 """
1404 multipart_boundary = get_boundary()
1405 json_bytes = json.dumps(metadata).encode("utf-8")
1406 content_type = content_type.encode("utf-8")
1407 # Combine the two parts into a multipart payload.
1408 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4.
1409 boundary_sep = _MULTIPART_SEP + multipart_boundary
1410 content = (
1411 boundary_sep
1412 + _MULTIPART_BEGIN
1413 + json_bytes
1414 + _CRLF
1415 + boundary_sep
1416 + _CRLF
1417 + b"content-type: "
1418 + content_type
1419 + _CRLF
1420 + _CRLF
1421 + data # Empty line between headers and body.
1422 + _CRLF
1423 + boundary_sep
1424 + _MULTIPART_SEP
1425 )
1426
1427 return content, multipart_boundary
1428
1429
1430def get_total_bytes(stream):
1431 """Determine the total number of bytes in a stream.
1432
1433 Args:
1434 stream (IO[bytes]): The stream (i.e. file-like object).
1435
1436 Returns:
1437 int: The number of bytes.
1438 """
1439 current_position = stream.tell()
1440 # NOTE: ``.seek()`` **should** return the same value that ``.tell()``
1441 # returns, but in Python 2, ``file`` objects do not.
1442 stream.seek(0, os.SEEK_END)
1443 end_position = stream.tell()
1444 # Go back to the initial position.
1445 stream.seek(current_position)
1446
1447 return end_position
1448
1449
1450def get_next_chunk(stream, chunk_size, total_bytes):
1451 """Get a chunk from an I/O stream.
1452
1453 The ``stream`` may have fewer bytes remaining than ``chunk_size``
1454 so it may not always be the case that
1455 ``end_byte == start_byte + chunk_size - 1``.
1456
1457 Args:
1458 stream (IO[bytes]): The stream (i.e. file-like object).
1459 chunk_size (int): The size of the chunk to be read from the ``stream``.
1460 total_bytes (Optional[int]): The (expected) total number of bytes
1461 in the ``stream``.
1462
1463 Returns:
1464 Tuple[int, bytes, str]: Triple of:
1465
1466 * the start byte index
1467 * the content in between the start and end bytes (inclusive)
1468 * content range header for the chunk (slice) that has been read
1469
1470 Raises:
1471 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields
1472 non-empty content.
1473 ValueError: If there is no data left to consume. This corresponds
1474 exactly to the case ``end_byte < start_byte``, which can only
1475 occur if ``end_byte == start_byte - 1``.
1476 """
1477 start_byte = stream.tell()
1478 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0:
1479 payload = stream.read(total_bytes - start_byte)
1480 else:
1481 payload = stream.read(chunk_size)
1482 end_byte = stream.tell() - 1
1483
1484 num_bytes_read = len(payload)
1485 if total_bytes is None:
1486 if num_bytes_read < chunk_size:
1487 # We now **KNOW** the total number of bytes.
1488 total_bytes = end_byte + 1
1489 elif total_bytes == 0:
1490 # NOTE: We also expect ``start_byte == 0`` here but don't check
1491 # because ``_prepare_initiate_request()`` requires the
1492 # stream to be at the beginning.
1493 if num_bytes_read != 0:
1494 raise ValueError(
1495 "Stream specified as empty, but produced non-empty content."
1496 )
1497 else:
1498 if num_bytes_read == 0:
1499 raise ValueError(
1500 "Stream is already exhausted. There is no content remaining."
1501 )
1502
1503 content_range = get_content_range(start_byte, end_byte, total_bytes)
1504 return start_byte, payload, content_range
1505
1506
1507def get_content_range(start_byte, end_byte, total_bytes):
1508 """Convert start, end and total into content range header.
1509
1510 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*".
1511 If we are dealing with an empty range (i.e. ``end_byte < start_byte``)
1512 then "bytes */{total}" is used.
1513
1514 This function **ASSUMES** that if the size is not known, the caller will
1515 not also pass an empty range.
1516
1517 Args:
1518 start_byte (int): The start (inclusive) of the byte range.
1519 end_byte (int): The end (inclusive) of the byte range.
1520 total_bytes (Optional[int]): The number of bytes in the byte
1521 range (if known).
1522
1523 Returns:
1524 str: The content range header.
1525 """
1526 if total_bytes is None:
1527 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte)
1528 elif end_byte < start_byte:
1529 return _EMPTY_RANGE_TEMPLATE.format(total_bytes)
1530 else:
1531 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)