Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/resumable_media/_upload.py: 30%
247 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1# Copyright 2017 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Virtual bases classes for uploading media via Google APIs.
17Supported here are:
19* simple (media) uploads
20* multipart uploads that contain both metadata and a small file as payload
21* resumable uploads (with metadata as well)
22"""
24import http.client
25import json
26import os
27import random
28import re
29import sys
30import urllib.parse
32from google import resumable_media
33from google.resumable_media import _helpers
34from google.resumable_media import common
37_CONTENT_TYPE_HEADER = "content-type"
38_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}"
39_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*"
40_EMPTY_RANGE_TEMPLATE = "bytes */{:d}"
41_BOUNDARY_WIDTH = len(str(sys.maxsize - 1))
42_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH)
43_MULTIPART_SEP = b"--"
44_CRLF = b"\r\n"
45_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n"
46_RELATED_HEADER = b'multipart/related; boundary="'
47_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE)
48_STREAM_ERROR_TEMPLATE = (
49 "Bytes stream is in unexpected state. "
50 "The local stream has had {:d} bytes read from it while "
51 "{:d} bytes have already been updated (they should match)."
52)
53_STREAM_READ_PAST_TEMPLATE = (
54 "{:d} bytes have been read from the stream, which exceeds "
55 "the expected total {:d}."
56)
57_POST = "POST"
58_PUT = "PUT"
59_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = (
60 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the "
61 "remote host, ``{}``, did not match."
62)
63_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
64 "Response metadata had no ``{}`` value; checksum could not be validated."
65)
68class UploadBase(object):
69 """Base class for upload helpers.
71 Defines core shared behavior across different upload types.
73 Args:
74 upload_url (str): The URL where the content will be uploaded.
75 headers (Optional[Mapping[str, str]]): Extra headers that should
76 be sent with the request, e.g. headers for encrypted data.
78 Attributes:
79 upload_url (str): The URL where the content will be uploaded.
80 """
82 def __init__(self, upload_url, headers=None):
83 self.upload_url = upload_url
84 if headers is None:
85 headers = {}
86 self._headers = headers
87 self._finished = False
88 self._retry_strategy = common.RetryStrategy()
90 @property
91 def finished(self):
92 """bool: Flag indicating if the upload has completed."""
93 return self._finished
95 def _process_response(self, response):
96 """Process the response from an HTTP request.
98 This is everything that must be done after a request that doesn't
99 require network I/O (or other I/O). This is based on the `sans-I/O`_
100 philosophy.
102 Args:
103 response (object): The HTTP response object.
105 Raises:
106 ~google.resumable_media.common.InvalidResponse: If the status
107 code is not 200.
109 .. _sans-I/O: https://sans-io.readthedocs.io/
110 """
111 # Tombstone the current upload so it cannot be used again (in either
112 # failure or success).
113 self._finished = True
114 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
116 @staticmethod
117 def _get_status_code(response):
118 """Access the status code from an HTTP response.
120 Args:
121 response (object): The HTTP response object.
123 Raises:
124 NotImplementedError: Always, since virtual.
125 """
126 raise NotImplementedError("This implementation is virtual.")
128 @staticmethod
129 def _get_headers(response):
130 """Access the headers from an HTTP response.
132 Args:
133 response (object): The HTTP response object.
135 Raises:
136 NotImplementedError: Always, since virtual.
137 """
138 raise NotImplementedError("This implementation is virtual.")
140 @staticmethod
141 def _get_body(response):
142 """Access the response body from an HTTP response.
144 Args:
145 response (object): The HTTP response object.
147 Raises:
148 NotImplementedError: Always, since virtual.
149 """
150 raise NotImplementedError("This implementation is virtual.")
153class SimpleUpload(UploadBase):
154 """Upload a resource to a Google API.
156 A **simple** media upload sends no metadata and completes the upload
157 in a single request.
159 Args:
160 upload_url (str): The URL where the content will be uploaded.
161 headers (Optional[Mapping[str, str]]): Extra headers that should
162 be sent with the request, e.g. headers for encrypted data.
164 Attributes:
165 upload_url (str): The URL where the content will be uploaded.
166 """
168 def _prepare_request(self, data, content_type):
169 """Prepare the contents of an HTTP request.
171 This is everything that must be done before a request that doesn't
172 require network I/O (or other I/O). This is based on the `sans-I/O`_
173 philosophy.
175 .. note:
177 This method will be used only once, so ``headers`` will be
178 mutated by having a new key added to it.
180 Args:
181 data (bytes): The resource content to be uploaded.
182 content_type (str): The content type for the request.
184 Returns:
185 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
187 * HTTP verb for the request (always POST)
188 * the URL for the request
189 * the body of the request
190 * headers for the request
192 Raises:
193 ValueError: If the current upload has already finished.
194 TypeError: If ``data`` isn't bytes.
196 .. _sans-I/O: https://sans-io.readthedocs.io/
197 """
198 if self.finished:
199 raise ValueError("An upload can only be used once.")
201 if not isinstance(data, bytes):
202 raise TypeError("`data` must be bytes, received", type(data))
203 self._headers[_CONTENT_TYPE_HEADER] = content_type
204 return _POST, self.upload_url, data, self._headers
206 def transmit(self, transport, data, content_type, timeout=None):
207 """Transmit the resource to be uploaded.
209 Args:
210 transport (object): An object which can make authenticated
211 requests.
212 data (bytes): The resource content to be uploaded.
213 content_type (str): The content type of the resource, e.g. a JPEG
214 image has content type ``image/jpeg``.
215 timeout (Optional[Union[float, Tuple[float, float]]]):
216 The number of seconds to wait for the server response.
217 Depending on the retry strategy, a request may be repeated
218 several times using the same timeout each time.
220 Can also be passed as a tuple (connect_timeout, read_timeout).
221 See :meth:`requests.Session.request` documentation for details.
223 Raises:
224 NotImplementedError: Always, since virtual.
225 """
226 raise NotImplementedError("This implementation is virtual.")
229class MultipartUpload(UploadBase):
230 """Upload a resource with metadata to a Google API.
232 A **multipart** upload sends both metadata and the resource in a single
233 (multipart) request.
235 Args:
236 upload_url (str): The URL where the content will be uploaded.
237 headers (Optional[Mapping[str, str]]): Extra headers that should
238 be sent with the request, e.g. headers for encrypted data.
239 checksum Optional([str]): The type of checksum to compute to verify
240 the integrity of the object. The request metadata will be amended
241 to include the computed value. Using this option will override a
242 manually-set checksum value. Supported values are "md5", "crc32c"
243 and None. The default is None.
245 Attributes:
246 upload_url (str): The URL where the content will be uploaded.
247 """
249 def __init__(self, upload_url, headers=None, checksum=None):
250 super(MultipartUpload, self).__init__(upload_url, headers=headers)
251 self._checksum_type = checksum
253 def _prepare_request(self, data, metadata, content_type):
254 """Prepare the contents of an HTTP request.
256 This is everything that must be done before a request that doesn't
257 require network I/O (or other I/O). This is based on the `sans-I/O`_
258 philosophy.
260 .. note:
262 This method will be used only once, so ``headers`` will be
263 mutated by having a new key added to it.
265 Args:
266 data (bytes): The resource content to be uploaded.
267 metadata (Mapping[str, str]): The resource metadata, such as an
268 ACL list.
269 content_type (str): The content type of the resource, e.g. a JPEG
270 image has content type ``image/jpeg``.
272 Returns:
273 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
275 * HTTP verb for the request (always POST)
276 * the URL for the request
277 * the body of the request
278 * headers for the request
280 Raises:
281 ValueError: If the current upload has already finished.
282 TypeError: If ``data`` isn't bytes.
284 .. _sans-I/O: https://sans-io.readthedocs.io/
285 """
286 if self.finished:
287 raise ValueError("An upload can only be used once.")
289 if not isinstance(data, bytes):
290 raise TypeError("`data` must be bytes, received", type(data))
292 checksum_object = _helpers._get_checksum_object(self._checksum_type)
293 if checksum_object is not None:
294 checksum_object.update(data)
295 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest())
296 metadata_key = _helpers._get_metadata_key(self._checksum_type)
297 metadata[metadata_key] = actual_checksum
299 content, multipart_boundary = construct_multipart_request(
300 data, metadata, content_type
301 )
302 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"'
303 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type
305 return _POST, self.upload_url, content, self._headers
307 def transmit(self, transport, data, metadata, content_type, timeout=None):
308 """Transmit the resource to be uploaded.
310 Args:
311 transport (object): An object which can make authenticated
312 requests.
313 data (bytes): The resource content to be uploaded.
314 metadata (Mapping[str, str]): The resource metadata, such as an
315 ACL list.
316 content_type (str): The content type of the resource, e.g. a JPEG
317 image has content type ``image/jpeg``.
318 timeout (Optional[Union[float, Tuple[float, float]]]):
319 The number of seconds to wait for the server response.
320 Depending on the retry strategy, a request may be repeated
321 several times using the same timeout each time.
323 Can also be passed as a tuple (connect_timeout, read_timeout).
324 See :meth:`requests.Session.request` documentation for details.
326 Raises:
327 NotImplementedError: Always, since virtual.
328 """
329 raise NotImplementedError("This implementation is virtual.")
332class ResumableUpload(UploadBase):
333 """Initiate and fulfill a resumable upload to a Google API.
335 A **resumable** upload sends an initial request with the resource metadata
336 and then gets assigned an upload ID / upload URL to send bytes to.
337 Using the upload URL, the upload is then done in chunks (determined by
338 the user) until all bytes have been uploaded.
340 Args:
341 upload_url (str): The URL where the resumable upload will be initiated.
342 chunk_size (int): The size of each chunk used to upload the resource.
343 headers (Optional[Mapping[str, str]]): Extra headers that should
344 be sent with the :meth:`initiate` request, e.g. headers for
345 encrypted data. These **will not** be sent with
346 :meth:`transmit_next_chunk` or :meth:`recover` requests.
347 checksum Optional([str]): The type of checksum to compute to verify
348 the integrity of the object. After the upload is complete, the
349 server-computed checksum of the resulting object will be read
350 and google.resumable_media.common.DataCorruption will be raised on
351 a mismatch. The corrupted file will not be deleted from the remote
352 host automatically. Supported values are "md5", "crc32c" and None.
353 The default is None.
355 Attributes:
356 upload_url (str): The URL where the content will be uploaded.
358 Raises:
359 ValueError: If ``chunk_size`` is not a multiple of
360 :data:`.UPLOAD_CHUNK_SIZE`.
361 """
363 def __init__(self, upload_url, chunk_size, checksum=None, headers=None):
364 super(ResumableUpload, self).__init__(upload_url, headers=headers)
365 if chunk_size % resumable_media.UPLOAD_CHUNK_SIZE != 0:
366 raise ValueError(
367 "{} KB must divide chunk size".format(
368 resumable_media.UPLOAD_CHUNK_SIZE / 1024
369 )
370 )
371 self._chunk_size = chunk_size
372 self._stream = None
373 self._content_type = None
374 self._bytes_uploaded = 0
375 self._bytes_checksummed = 0
376 self._checksum_type = checksum
377 self._checksum_object = None
378 self._total_bytes = None
379 self._resumable_url = None
380 self._invalid = False
382 @property
383 def invalid(self):
384 """bool: Indicates if the upload is in an invalid state.
386 This will occur if a call to :meth:`transmit_next_chunk` fails.
387 To recover from such a failure, call :meth:`recover`.
388 """
389 return self._invalid
391 @property
392 def chunk_size(self):
393 """int: The size of each chunk used to upload the resource."""
394 return self._chunk_size
396 @property
397 def resumable_url(self):
398 """Optional[str]: The URL of the in-progress resumable upload."""
399 return self._resumable_url
401 @property
402 def bytes_uploaded(self):
403 """int: Number of bytes that have been uploaded."""
404 return self._bytes_uploaded
406 @property
407 def total_bytes(self):
408 """Optional[int]: The total number of bytes to be uploaded.
410 If this upload is initiated (via :meth:`initiate`) with
411 ``stream_final=True``, this value will be populated based on the size
412 of the ``stream`` being uploaded. (By default ``stream_final=True``.)
414 If this upload is initiated with ``stream_final=False``,
415 :attr:`total_bytes` will be :data:`None` since it cannot be
416 determined from the stream.
417 """
418 return self._total_bytes
420 def _prepare_initiate_request(
421 self, stream, metadata, content_type, total_bytes=None, stream_final=True
422 ):
423 """Prepare the contents of HTTP request to initiate upload.
425 This is everything that must be done before a request that doesn't
426 require network I/O (or other I/O). This is based on the `sans-I/O`_
427 philosophy.
429 Args:
430 stream (IO[bytes]): The stream (i.e. file-like object) that will
431 be uploaded. The stream **must** be at the beginning (i.e.
432 ``stream.tell() == 0``).
433 metadata (Mapping[str, str]): The resource metadata, such as an
434 ACL list.
435 content_type (str): The content type of the resource, e.g. a JPEG
436 image has content type ``image/jpeg``.
437 total_bytes (Optional[int]): The total number of bytes to be
438 uploaded. If specified, the upload size **will not** be
439 determined from the stream (even if ``stream_final=True``).
440 stream_final (Optional[bool]): Indicates if the ``stream`` is
441 "final" (i.e. no more bytes will be added to it). In this case
442 we determine the upload size from the size of the stream. If
443 ``total_bytes`` is passed, this argument will be ignored.
445 Returns:
446 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
448 * HTTP verb for the request (always POST)
449 * the URL for the request
450 * the body of the request
451 * headers for the request
453 Raises:
454 ValueError: If the current upload has already been initiated.
455 ValueError: If ``stream`` is not at the beginning.
457 .. _sans-I/O: https://sans-io.readthedocs.io/
458 """
459 if self.resumable_url is not None:
460 raise ValueError("This upload has already been initiated.")
461 if stream.tell() != 0:
462 raise ValueError("Stream must be at beginning.")
464 self._stream = stream
465 self._content_type = content_type
467 # Signed URL requires content type set directly - not through x-upload-content-type
468 parse_result = urllib.parse.urlparse(self.upload_url)
469 parsed_query = urllib.parse.parse_qs(parse_result.query)
470 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query:
471 # Deconstruct **self._headers first so that content type defined here takes priority
472 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type}
473 else:
474 # Deconstruct **self._headers first so that content type defined here takes priority
475 headers = {
476 **self._headers,
477 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8",
478 "x-upload-content-type": content_type,
479 }
480 # Set the total bytes if possible.
481 if total_bytes is not None:
482 self._total_bytes = total_bytes
483 elif stream_final:
484 self._total_bytes = get_total_bytes(stream)
485 # Add the total bytes to the headers if set.
486 if self._total_bytes is not None:
487 content_length = "{:d}".format(self._total_bytes)
488 headers["x-upload-content-length"] = content_length
490 payload = json.dumps(metadata).encode("utf-8")
491 return _POST, self.upload_url, payload, headers
493 def _process_initiate_response(self, response):
494 """Process the response from an HTTP request that initiated upload.
496 This is everything that must be done after a request that doesn't
497 require network I/O (or other I/O). This is based on the `sans-I/O`_
498 philosophy.
500 This method takes the URL from the ``Location`` header and stores it
501 for future use. Within that URL, we assume the ``upload_id`` query
502 parameter has been included, but we do not check.
504 Args:
505 response (object): The HTTP response object (need headers).
507 .. _sans-I/O: https://sans-io.readthedocs.io/
508 """
509 _helpers.require_status_code(
510 response,
511 (http.client.OK, http.client.CREATED),
512 self._get_status_code,
513 callback=self._make_invalid,
514 )
515 self._resumable_url = _helpers.header_required(
516 response, "location", self._get_headers
517 )
519 def initiate(
520 self,
521 transport,
522 stream,
523 metadata,
524 content_type,
525 total_bytes=None,
526 stream_final=True,
527 timeout=None,
528 ):
529 """Initiate a resumable upload.
531 By default, this method assumes your ``stream`` is in a "final"
532 state ready to transmit. However, ``stream_final=False`` can be used
533 to indicate that the size of the resource is not known. This can happen
534 if bytes are being dynamically fed into ``stream``, e.g. if the stream
535 is attached to application logs.
537 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
538 read from the stream every time :meth:`transmit_next_chunk` is called.
539 If one of those reads produces strictly fewer bites than the chunk
540 size, the upload will be concluded.
542 Args:
543 transport (object): An object which can make authenticated
544 requests.
545 stream (IO[bytes]): The stream (i.e. file-like object) that will
546 be uploaded. The stream **must** be at the beginning (i.e.
547 ``stream.tell() == 0``).
548 metadata (Mapping[str, str]): The resource metadata, such as an
549 ACL list.
550 content_type (str): The content type of the resource, e.g. a JPEG
551 image has content type ``image/jpeg``.
552 total_bytes (Optional[int]): The total number of bytes to be
553 uploaded. If specified, the upload size **will not** be
554 determined from the stream (even if ``stream_final=True``).
555 stream_final (Optional[bool]): Indicates if the ``stream`` is
556 "final" (i.e. no more bytes will be added to it). In this case
557 we determine the upload size from the size of the stream. If
558 ``total_bytes`` is passed, this argument will be ignored.
559 timeout (Optional[Union[float, Tuple[float, float]]]):
560 The number of seconds to wait for the server response.
561 Depending on the retry strategy, a request may be repeated
562 several times using the same timeout each time.
564 Can also be passed as a tuple (connect_timeout, read_timeout).
565 See :meth:`requests.Session.request` documentation for details.
567 Raises:
568 NotImplementedError: Always, since virtual.
569 """
570 raise NotImplementedError("This implementation is virtual.")
572 def _prepare_request(self):
573 """Prepare the contents of HTTP request to upload a chunk.
575 This is everything that must be done before a request that doesn't
576 require network I/O. This is based on the `sans-I/O`_ philosophy.
578 For the time being, this **does require** some form of I/O to read
579 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this
580 will (almost) certainly not be network I/O.
582 Returns:
583 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
585 * HTTP verb for the request (always PUT)
586 * the URL for the request
587 * the body of the request
588 * headers for the request
590 The headers **do not** incorporate the ``_headers`` on the
591 current instance.
593 Raises:
594 ValueError: If the current upload has finished.
595 ValueError: If the current upload is in an invalid state.
596 ValueError: If the current upload has not been initiated.
597 ValueError: If the location in the stream (i.e. ``stream.tell()``)
598 does not agree with ``bytes_uploaded``.
600 .. _sans-I/O: https://sans-io.readthedocs.io/
601 """
602 if self.finished:
603 raise ValueError("Upload has finished.")
604 if self.invalid:
605 raise ValueError(
606 "Upload is in an invalid state. To recover call `recover()`."
607 )
608 if self.resumable_url is None:
609 raise ValueError(
610 "This upload has not been initiated. Please call "
611 "initiate() before beginning to transmit chunks."
612 )
614 start_byte, payload, content_range = get_next_chunk(
615 self._stream, self._chunk_size, self._total_bytes
616 )
617 if start_byte != self.bytes_uploaded:
618 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded)
619 raise ValueError(msg)
621 self._update_checksum(start_byte, payload)
623 headers = {
624 **self._headers,
625 _CONTENT_TYPE_HEADER: self._content_type,
626 _helpers.CONTENT_RANGE_HEADER: content_range,
627 }
628 return _PUT, self.resumable_url, payload, headers
630 def _update_checksum(self, start_byte, payload):
631 """Update the checksum with the payload if not already updated.
633 Because error recovery can result in bytes being transmitted more than
634 once, the checksum tracks the number of bytes checked in
635 self._bytes_checksummed and skips bytes that have already been summed.
636 """
637 if not self._checksum_type:
638 return
640 if not self._checksum_object:
641 self._checksum_object = _helpers._get_checksum_object(self._checksum_type)
643 if start_byte < self._bytes_checksummed:
644 offset = self._bytes_checksummed - start_byte
645 data = payload[offset:]
646 else:
647 data = payload
649 self._checksum_object.update(data)
650 self._bytes_checksummed += len(data)
652 def _make_invalid(self):
653 """Simple setter for ``invalid``.
655 This is intended to be passed along as a callback to helpers that
656 raise an exception so they can mark this instance as invalid before
657 raising.
658 """
659 self._invalid = True
661 def _process_resumable_response(self, response, bytes_sent):
662 """Process the response from an HTTP request.
664 This is everything that must be done after a request that doesn't
665 require network I/O (or other I/O). This is based on the `sans-I/O`_
666 philosophy.
668 Args:
669 response (object): The HTTP response object.
670 bytes_sent (int): The number of bytes sent in the request that
671 ``response`` was returned for.
673 Raises:
674 ~google.resumable_media.common.InvalidResponse: If the status
675 code is 308 and the ``range`` header is not of the form
676 ``bytes 0-{end}``.
677 ~google.resumable_media.common.InvalidResponse: If the status
678 code is not 200 or 308.
680 .. _sans-I/O: https://sans-io.readthedocs.io/
681 """
682 status_code = _helpers.require_status_code(
683 response,
684 (http.client.OK, http.client.PERMANENT_REDIRECT),
685 self._get_status_code,
686 callback=self._make_invalid,
687 )
688 if status_code == http.client.OK:
689 # NOTE: We use the "local" information of ``bytes_sent`` to update
690 # ``bytes_uploaded``, but do not verify this against other
691 # state. However, there may be some other information:
692 #
693 # * a ``size`` key in JSON response body
694 # * the ``total_bytes`` attribute (if set)
695 # * ``stream.tell()`` (relying on fact that ``initiate()``
696 # requires stream to be at the beginning)
697 self._bytes_uploaded = self._bytes_uploaded + bytes_sent
698 # Tombstone the current upload so it cannot be used again.
699 self._finished = True
700 # Validate the checksum. This can raise an exception on failure.
701 self._validate_checksum(response)
702 else:
703 bytes_range = _helpers.header_required(
704 response,
705 _helpers.RANGE_HEADER,
706 self._get_headers,
707 callback=self._make_invalid,
708 )
709 match = _BYTES_RANGE_RE.match(bytes_range)
710 if match is None:
711 self._make_invalid()
712 raise common.InvalidResponse(
713 response,
714 'Unexpected "range" header',
715 bytes_range,
716 'Expected to be of the form "bytes=0-{end}"',
717 )
718 self._bytes_uploaded = int(match.group("end_byte")) + 1
720 def _validate_checksum(self, response):
721 """Check the computed checksum, if any, against the response headers.
723 Args:
724 response (object): The HTTP response object.
726 Raises:
727 ~google.resumable_media.common.DataCorruption: If the checksum
728 computed locally and the checksum reported by the remote host do
729 not match.
730 """
731 if self._checksum_type is None:
732 return
733 metadata_key = _helpers._get_metadata_key(self._checksum_type)
734 metadata = response.json()
735 remote_checksum = metadata.get(metadata_key)
736 if remote_checksum is None:
737 raise common.InvalidResponse(
738 response,
739 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
740 self._get_headers(response),
741 )
742 local_checksum = _helpers.prepare_checksum_digest(
743 self._checksum_object.digest()
744 )
745 if local_checksum != remote_checksum:
746 raise common.DataCorruption(
747 response,
748 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
749 self._checksum_type.upper(), local_checksum, remote_checksum
750 ),
751 )
753 def transmit_next_chunk(self, transport, timeout=None):
754 """Transmit the next chunk of the resource to be uploaded.
756 If the current upload was initiated with ``stream_final=False``,
757 this method will dynamically determine if the upload has completed.
758 The upload will be considered complete if the stream produces
759 fewer than :attr:`chunk_size` bytes when a chunk is read from it.
761 Args:
762 transport (object): An object which can make authenticated
763 requests.
764 timeout (Optional[Union[float, Tuple[float, float]]]):
765 The number of seconds to wait for the server response.
766 Depending on the retry strategy, a request may be repeated
767 several times using the same timeout each time.
769 Can also be passed as a tuple (connect_timeout, read_timeout).
770 See :meth:`requests.Session.request` documentation for details.
772 Raises:
773 NotImplementedError: Always, since virtual.
774 """
775 raise NotImplementedError("This implementation is virtual.")
777 def _prepare_recover_request(self):
778 """Prepare the contents of HTTP request to recover from failure.
780 This is everything that must be done before a request that doesn't
781 require network I/O. This is based on the `sans-I/O`_ philosophy.
783 We assume that the :attr:`resumable_url` is set (i.e. the only way
784 the upload can end up :attr:`invalid` is if it has been initiated.
786 Returns:
787 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple
789 * HTTP verb for the request (always PUT)
790 * the URL for the request
791 * the body of the request (always :data:`None`)
792 * headers for the request
794 The headers **do not** incorporate the ``_headers`` on the
795 current instance.
797 .. _sans-I/O: https://sans-io.readthedocs.io/
798 """
799 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"}
800 return _PUT, self.resumable_url, None, headers
802 def _process_recover_response(self, response):
803 """Process the response from an HTTP request to recover from failure.
805 This is everything that must be done after a request that doesn't
806 require network I/O (or other I/O). This is based on the `sans-I/O`_
807 philosophy.
809 Args:
810 response (object): The HTTP response object.
812 Raises:
813 ~google.resumable_media.common.InvalidResponse: If the status
814 code is not 308.
815 ~google.resumable_media.common.InvalidResponse: If the status
816 code is 308 and the ``range`` header is not of the form
817 ``bytes 0-{end}``.
819 .. _sans-I/O: https://sans-io.readthedocs.io/
820 """
821 _helpers.require_status_code(
822 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code
823 )
824 headers = self._get_headers(response)
825 if _helpers.RANGE_HEADER in headers:
826 bytes_range = headers[_helpers.RANGE_HEADER]
827 match = _BYTES_RANGE_RE.match(bytes_range)
828 if match is None:
829 raise common.InvalidResponse(
830 response,
831 'Unexpected "range" header',
832 bytes_range,
833 'Expected to be of the form "bytes=0-{end}"',
834 )
835 self._bytes_uploaded = int(match.group("end_byte")) + 1
836 else:
837 # In this case, the upload has not "begun".
838 self._bytes_uploaded = 0
840 self._stream.seek(self._bytes_uploaded)
841 self._invalid = False
843 def recover(self, transport):
844 """Recover from a failure.
846 This method should be used when a :class:`ResumableUpload` is in an
847 :attr:`~ResumableUpload.invalid` state due to a request failure.
849 This will verify the progress with the server and make sure the
850 current upload is in a valid state before :meth:`transmit_next_chunk`
851 can be used again.
853 Args:
854 transport (object): An object which can make authenticated
855 requests.
857 Raises:
858 NotImplementedError: Always, since virtual.
859 """
860 raise NotImplementedError("This implementation is virtual.")
863def get_boundary():
864 """Get a random boundary for a multipart request.
866 Returns:
867 bytes: The boundary used to separate parts of a multipart request.
868 """
869 random_int = random.randrange(sys.maxsize)
870 boundary = _BOUNDARY_FORMAT.format(random_int)
871 # NOTE: Neither % formatting nor .format() are available for byte strings
872 # in Python 3.4, so we must use unicode strings as templates.
873 return boundary.encode("utf-8")
876def construct_multipart_request(data, metadata, content_type):
877 """Construct a multipart request body.
879 Args:
880 data (bytes): The resource content (UTF-8 encoded as bytes)
881 to be uploaded.
882 metadata (Mapping[str, str]): The resource metadata, such as an
883 ACL list.
884 content_type (str): The content type of the resource, e.g. a JPEG
885 image has content type ``image/jpeg``.
887 Returns:
888 Tuple[bytes, bytes]: The multipart request body and the boundary used
889 between each part.
890 """
891 multipart_boundary = get_boundary()
892 json_bytes = json.dumps(metadata).encode("utf-8")
893 content_type = content_type.encode("utf-8")
894 # Combine the two parts into a multipart payload.
895 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4.
896 boundary_sep = _MULTIPART_SEP + multipart_boundary
897 content = (
898 boundary_sep
899 + _MULTIPART_BEGIN
900 + json_bytes
901 + _CRLF
902 + boundary_sep
903 + _CRLF
904 + b"content-type: "
905 + content_type
906 + _CRLF
907 + _CRLF
908 + data # Empty line between headers and body.
909 + _CRLF
910 + boundary_sep
911 + _MULTIPART_SEP
912 )
914 return content, multipart_boundary
917def get_total_bytes(stream):
918 """Determine the total number of bytes in a stream.
920 Args:
921 stream (IO[bytes]): The stream (i.e. file-like object).
923 Returns:
924 int: The number of bytes.
925 """
926 current_position = stream.tell()
927 # NOTE: ``.seek()`` **should** return the same value that ``.tell()``
928 # returns, but in Python 2, ``file`` objects do not.
929 stream.seek(0, os.SEEK_END)
930 end_position = stream.tell()
931 # Go back to the initial position.
932 stream.seek(current_position)
934 return end_position
937def get_next_chunk(stream, chunk_size, total_bytes):
938 """Get a chunk from an I/O stream.
940 The ``stream`` may have fewer bytes remaining than ``chunk_size``
941 so it may not always be the case that
942 ``end_byte == start_byte + chunk_size - 1``.
944 Args:
945 stream (IO[bytes]): The stream (i.e. file-like object).
946 chunk_size (int): The size of the chunk to be read from the ``stream``.
947 total_bytes (Optional[int]): The (expected) total number of bytes
948 in the ``stream``.
950 Returns:
951 Tuple[int, bytes, str]: Triple of:
953 * the start byte index
954 * the content in between the start and end bytes (inclusive)
955 * content range header for the chunk (slice) that has been read
957 Raises:
958 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields
959 non-empty content.
960 ValueError: If there is no data left to consume. This corresponds
961 exactly to the case ``end_byte < start_byte``, which can only
962 occur if ``end_byte == start_byte - 1``.
963 """
964 start_byte = stream.tell()
965 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0:
966 payload = stream.read(total_bytes - start_byte)
967 else:
968 payload = stream.read(chunk_size)
969 end_byte = stream.tell() - 1
971 num_bytes_read = len(payload)
972 if total_bytes is None:
973 if num_bytes_read < chunk_size:
974 # We now **KNOW** the total number of bytes.
975 total_bytes = end_byte + 1
976 elif total_bytes == 0:
977 # NOTE: We also expect ``start_byte == 0`` here but don't check
978 # because ``_prepare_initiate_request()`` requires the
979 # stream to be at the beginning.
980 if num_bytes_read != 0:
981 raise ValueError(
982 "Stream specified as empty, but produced non-empty content."
983 )
984 else:
985 if num_bytes_read == 0:
986 raise ValueError(
987 "Stream is already exhausted. There is no content remaining."
988 )
990 content_range = get_content_range(start_byte, end_byte, total_bytes)
991 return start_byte, payload, content_range
994def get_content_range(start_byte, end_byte, total_bytes):
995 """Convert start, end and total into content range header.
997 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*".
998 If we are dealing with an empty range (i.e. ``end_byte < start_byte``)
999 then "bytes */{total}" is used.
1001 This function **ASSUMES** that if the size is not known, the caller will
1002 not also pass an empty range.
1004 Args:
1005 start_byte (int): The start (inclusive) of the byte range.
1006 end_byte (int): The end (inclusive) of the byte range.
1007 total_bytes (Optional[int]): The number of bytes in the byte
1008 range (if known).
1010 Returns:
1011 str: The content range header.
1012 """
1013 if total_bytes is None:
1014 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte)
1015 elif end_byte < start_byte:
1016 return _EMPTY_RANGE_TEMPLATE.format(total_bytes)
1017 else:
1018 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)