1# Copyright 2017 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Virtual bases classes for uploading media via Google APIs.
16
17Supported here are:
18
19* simple (media) uploads
20* multipart uploads that contain both metadata and a small file as payload
21* resumable uploads (with metadata as well)
22"""
23
24import http.client
25import json
26import os
27import random
28import re
29import sys
30import urllib.parse
31
32from google.cloud.storage._media import _helpers
33from google.cloud.storage._media import UPLOAD_CHUNK_SIZE
34from google.cloud.storage.exceptions import InvalidResponse
35from google.cloud.storage.exceptions import DataCorruption
36from google.cloud.storage.retry import DEFAULT_RETRY
37
38from xml.etree import ElementTree
39
40
41_CONTENT_TYPE_HEADER = "content-type"
42_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}"
43_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*"
44_EMPTY_RANGE_TEMPLATE = "bytes */{:d}"
45_BOUNDARY_WIDTH = len(str(sys.maxsize - 1))
46_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH)
47_MULTIPART_SEP = b"--"
48_CRLF = b"\r\n"
49_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n"
50_RELATED_HEADER = b'multipart/related; boundary="'
51_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE)
52_STREAM_ERROR_TEMPLATE = (
53 "Bytes stream is in unexpected state. "
54 "The local stream has had {:d} bytes read from it while "
55 "{:d} bytes have already been updated (they should match)."
56)
57_STREAM_READ_PAST_TEMPLATE = (
58 "{:d} bytes have been read from the stream, which exceeds "
59 "the expected total {:d}."
60)
61_DELETE = "DELETE"
62_POST = "POST"
63_PUT = "PUT"
64_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = (
65 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the "
66 "remote host, ``{}``, did not match."
67)
68_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
69 "Response metadata had no ``{}`` value; checksum could not be validated."
70)
71_UPLOAD_HEADER_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
72 "Response headers had no ``{}`` value; checksum could not be validated."
73)
74_MPU_INITIATE_QUERY = "?uploads"
75_MPU_PART_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}"
76_S3_COMPAT_XML_NAMESPACE = "{http://s3.amazonaws.com/doc/2006-03-01/}"
77_UPLOAD_ID_NODE = "UploadId"
78_MPU_FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}"
79
80
81class UploadBase(object):
82 """Base class for upload helpers.
83
84 Defines core shared behavior across different upload types.
85
86 Args:
87 upload_url (str): The URL where the content will be uploaded.
88 headers (Optional[Mapping[str, str]]): Extra headers that should
89 be sent with the request, e.g. headers for encrypted data.
90 retry (Optional[google.api_core.retry.Retry]): How to retry the
91 RPC. A None value will disable retries. A
92 google.api_core.retry.Retry value will enable retries, and the
93 object will configure backoff and timeout options.
94
95 See the retry.py source code and docstrings in this package
96 (google.cloud.storage.retry) for information on retry types and how
97 to configure them.
98
99 Attributes:
100 upload_url (str): The URL where the content will be uploaded.
101 """
102
103 def __init__(self, upload_url, headers=None, retry=DEFAULT_RETRY):
104 self.upload_url = upload_url
105 if headers is None:
106 headers = {}
107 self._headers = headers
108 self._finished = False
109 self._retry_strategy = retry
110
111 @property
112 def finished(self):
113 """bool: Flag indicating if the upload has completed."""
114 return self._finished
115
116 def _process_response(self, response):
117 """Process the response from an HTTP request.
118
119 This is everything that must be done after a request that doesn't
120 require network I/O (or other I/O). This is based on the `sans-I/O`_
121 philosophy.
122
123 Args:
124 response (object): The HTTP response object.
125
126 Raises:
127 ~google.cloud.storage.exceptions.InvalidResponse: If the status
128 code is not 200.
129
130 .. _sans-I/O: https://sans-io.readthedocs.io/
131 """
132 # Tombstone the current upload so it cannot be used again (in either
133 # failure or success).
134 self._finished = True
135 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
136
137 @staticmethod
138 def _get_status_code(response):
139 """Access the status code from an HTTP response.
140
141 Args:
142 response (object): The HTTP response object.
143
144 Raises:
145 NotImplementedError: Always, since virtual.
146 """
147 raise NotImplementedError("This implementation is virtual.")
148
149 @staticmethod
150 def _get_headers(response):
151 """Access the headers from an HTTP response.
152
153 Args:
154 response (object): The HTTP response object.
155
156 Raises:
157 NotImplementedError: Always, since virtual.
158 """
159 raise NotImplementedError("This implementation is virtual.")
160
161 @staticmethod
162 def _get_body(response):
163 """Access the response body from an HTTP response.
164
165 Args:
166 response (object): The HTTP response object.
167
168 Raises:
169 NotImplementedError: Always, since virtual.
170 """
171 raise NotImplementedError("This implementation is virtual.")
172
173
174class SimpleUpload(UploadBase):
175 """Upload a resource to a Google API.
176
177 A **simple** media upload sends no metadata and completes the upload
178 in a single request.
179
180 Args:
181 upload_url (str): The URL where the content will be uploaded.
182 headers (Optional[Mapping[str, str]]): Extra headers that should
183 be sent with the request, e.g. headers for encrypted data.
184 retry (Optional[google.api_core.retry.Retry]): How to retry the
185 RPC. A None value will disable retries. A
186 google.api_core.retry.Retry value will enable retries, and the
187 object will configure backoff and timeout options.
188
189 See the retry.py source code and docstrings in this package
190 (google.cloud.storage.retry) for information on retry types and how
191 to configure them.
192
193 Attributes:
194 upload_url (str): The URL where the content will be uploaded.
195 """
196
197 def _prepare_request(self, data, content_type):
198 """Prepare the contents of an HTTP request.
199
200 This is everything that must be done before a request that doesn't
201 require network I/O (or other I/O). This is based on the `sans-I/O`_
202 philosophy.
203
204 .. note:
205
206 This method will be used only once, so ``headers`` will be
207 mutated by having a new key added to it.
208
209 Args:
210 data (bytes): The resource content to be uploaded.
211 content_type (str): The content type for the request.
212
213 Returns:
214 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
215
216 * HTTP verb for the request (always POST)
217 * the URL for the request
218 * the body of the request
219 * headers for the request
220
221 Raises:
222 ValueError: If the current upload has already finished.
223 TypeError: If ``data`` isn't bytes.
224
225 .. _sans-I/O: https://sans-io.readthedocs.io/
226 """
227 if self.finished:
228 raise ValueError("An upload can only be used once.")
229
230 if not isinstance(data, bytes):
231 raise TypeError("`data` must be bytes, received", type(data))
232 self._headers[_CONTENT_TYPE_HEADER] = content_type
233 return _POST, self.upload_url, data, self._headers
234
235 def transmit(self, transport, data, content_type, timeout=None):
236 """Transmit the resource to be uploaded.
237
238 Args:
239 transport (object): An object which can make authenticated
240 requests.
241 data (bytes): The resource content to be uploaded.
242 content_type (str): The content type of the resource, e.g. a JPEG
243 image has content type ``image/jpeg``.
244 timeout (Optional[Union[float, Tuple[float, float]]]):
245 The number of seconds to wait for the server response.
246 Depending on the retry strategy, a request may be repeated
247 several times using the same timeout each time.
248
249 Can also be passed as a tuple (connect_timeout, read_timeout).
250 See :meth:`requests.Session.request` documentation for details.
251
252 Raises:
253 NotImplementedError: Always, since virtual.
254 """
255 raise NotImplementedError("This implementation is virtual.")
256
257
258class MultipartUpload(UploadBase):
259 """Upload a resource with metadata to a Google API.
260
261 A **multipart** upload sends both metadata and the resource in a single
262 (multipart) request.
263
264 Args:
265 upload_url (str): The URL where the content will be uploaded.
266 headers (Optional[Mapping[str, str]]): Extra headers that should
267 be sent with the request, e.g. headers for encrypted data.
268 checksum Optional([str]): The type of checksum to compute to verify
269 the integrity of the object. The request metadata will be amended
270 to include the computed value. Using this option will override a
271 manually-set checksum value. Supported values are "md5",
272 "crc32c", "auto", and None. The default is "auto", which will try
273 to detect if the C extension for crc32c is installed and fall back
274 to md5 otherwise.
275 retry (Optional[google.api_core.retry.Retry]): How to retry the
276 RPC. A None value will disable retries. A
277 google.api_core.retry.Retry value will enable retries, and the
278 object will configure backoff and timeout options.
279
280 See the retry.py source code and docstrings in this package
281 (google.cloud.storage.retry) for information on retry types and how
282 to configure them.
283
284 Attributes:
285 upload_url (str): The URL where the content will be uploaded.
286 """
287
288 def __init__(self, upload_url, headers=None, checksum="auto", retry=DEFAULT_RETRY):
289 super(MultipartUpload, self).__init__(upload_url, headers=headers, retry=retry)
290 self._checksum_type = checksum
291 if self._checksum_type == "auto":
292 self._checksum_type = (
293 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5"
294 )
295
296 def _prepare_request(self, data, metadata, content_type):
297 """Prepare the contents of an HTTP request.
298
299 This is everything that must be done before a request that doesn't
300 require network I/O (or other I/O). This is based on the `sans-I/O`_
301 philosophy.
302
303 .. note:
304
305 This method will be used only once, so ``headers`` will be
306 mutated by having a new key added to it.
307
308 Args:
309 data (bytes): The resource content to be uploaded.
310 metadata (Mapping[str, str]): The resource metadata, such as an
311 ACL list.
312 content_type (str): The content type of the resource, e.g. a JPEG
313 image has content type ``image/jpeg``.
314
315 Returns:
316 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
317
318 * HTTP verb for the request (always POST)
319 * the URL for the request
320 * the body of the request
321 * headers for the request
322
323 Raises:
324 ValueError: If the current upload has already finished.
325 TypeError: If ``data`` isn't bytes.
326
327 .. _sans-I/O: https://sans-io.readthedocs.io/
328 """
329 if self.finished:
330 raise ValueError("An upload can only be used once.")
331
332 if not isinstance(data, bytes):
333 raise TypeError("`data` must be bytes, received", type(data))
334
335 checksum_object = _helpers._get_checksum_object(self._checksum_type)
336 if checksum_object is not None:
337 checksum_object.update(data)
338 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest())
339 metadata_key = _helpers._get_metadata_key(self._checksum_type)
340 metadata[metadata_key] = actual_checksum
341
342 content, multipart_boundary = construct_multipart_request(
343 data, metadata, content_type
344 )
345 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"'
346 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type
347
348 return _POST, self.upload_url, content, self._headers
349
350 def transmit(self, transport, data, metadata, content_type, timeout=None):
351 """Transmit the resource to be uploaded.
352
353 Args:
354 transport (object): An object which can make authenticated
355 requests.
356 data (bytes): The resource content to be uploaded.
357 metadata (Mapping[str, str]): The resource metadata, such as an
358 ACL list.
359 content_type (str): The content type of the resource, e.g. a JPEG
360 image has content type ``image/jpeg``.
361 timeout (Optional[Union[float, Tuple[float, float]]]):
362 The number of seconds to wait for the server response.
363 Depending on the retry strategy, a request may be repeated
364 several times using the same timeout each time.
365
366 Can also be passed as a tuple (connect_timeout, read_timeout).
367 See :meth:`requests.Session.request` documentation for details.
368
369 Raises:
370 NotImplementedError: Always, since virtual.
371 """
372 raise NotImplementedError("This implementation is virtual.")
373
374
375class ResumableUpload(UploadBase):
376 """Initiate and fulfill a resumable upload to a Google API.
377
378 A **resumable** upload sends an initial request with the resource metadata
379 and then gets assigned an upload ID / upload URL to send bytes to.
380 Using the upload URL, the upload is then done in chunks (determined by
381 the user) until all bytes have been uploaded.
382
383 Args:
384 upload_url (str): The URL where the resumable upload will be initiated.
385 chunk_size (int): The size of each chunk used to upload the resource.
386 headers (Optional[Mapping[str, str]]): Extra headers that should
387 be sent with every request.
388 checksum Optional([str]): The type of checksum to compute to verify
389 the integrity of the object. After the upload is complete, the
390 server-computed checksum of the resulting object will be checked
391 and google.cloud.storage.exceptions.DataCorruption will be raised on
392 a mismatch. The corrupted file will not be deleted from the remote
393 host automatically. Supported values are "md5", "crc32c", "auto",
394 and None. The default is "auto", which will try to detect if the C
395 extension for crc32c is installed and fall back to md5 otherwise.
396 retry (Optional[google.api_core.retry.Retry]): How to retry the
397 RPC. A None value will disable retries. A
398 google.api_core.retry.Retry value will enable retries, and the
399 object will configure backoff and timeout options.
400
401 See the retry.py source code and docstrings in this package
402 (google.cloud.storage.retry) for information on retry types and how
403 to configure them.
404
405 Attributes:
406 upload_url (str): The URL where the content will be uploaded.
407
408 Raises:
409 ValueError: If ``chunk_size`` is not a multiple of
410 :data:`.UPLOAD_CHUNK_SIZE`.
411 """
412
413 def __init__(
414 self, upload_url, chunk_size, checksum="auto", headers=None, retry=DEFAULT_RETRY
415 ):
416 super(ResumableUpload, self).__init__(upload_url, headers=headers, retry=retry)
417 if chunk_size % UPLOAD_CHUNK_SIZE != 0:
418 raise ValueError(
419 "{} KB must divide chunk size".format(UPLOAD_CHUNK_SIZE / 1024)
420 )
421 self._chunk_size = chunk_size
422 self._stream = None
423 self._content_type = None
424 self._bytes_uploaded = 0
425 self._bytes_checksummed = 0
426 self._checksum_type = checksum
427 if self._checksum_type == "auto":
428 self._checksum_type = (
429 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5"
430 )
431 self._checksum_object = None
432 self._total_bytes = None
433 self._resumable_url = None
434 self._invalid = False
435
436 @property
437 def invalid(self):
438 """bool: Indicates if the upload is in an invalid state.
439
440 This will occur if a call to :meth:`transmit_next_chunk` fails.
441 To recover from such a failure, call :meth:`recover`.
442 """
443 return self._invalid
444
445 @property
446 def chunk_size(self):
447 """int: The size of each chunk used to upload the resource."""
448 return self._chunk_size
449
450 @property
451 def resumable_url(self):
452 """Optional[str]: The URL of the in-progress resumable upload."""
453 return self._resumable_url
454
455 @property
456 def bytes_uploaded(self):
457 """int: Number of bytes that have been uploaded."""
458 return self._bytes_uploaded
459
460 @property
461 def total_bytes(self):
462 """Optional[int]: The total number of bytes to be uploaded.
463
464 If this upload is initiated (via :meth:`initiate`) with
465 ``stream_final=True``, this value will be populated based on the size
466 of the ``stream`` being uploaded. (By default ``stream_final=True``.)
467
468 If this upload is initiated with ``stream_final=False``,
469 :attr:`total_bytes` will be :data:`None` since it cannot be
470 determined from the stream.
471 """
472 return self._total_bytes
473
474 def _prepare_initiate_request(
475 self, stream, metadata, content_type, total_bytes=None, stream_final=True
476 ):
477 """Prepare the contents of HTTP request to initiate upload.
478
479 This is everything that must be done before a request that doesn't
480 require network I/O (or other I/O). This is based on the `sans-I/O`_
481 philosophy.
482
483 Args:
484 stream (IO[bytes]): The stream (i.e. file-like object) that will
485 be uploaded. The stream **must** be at the beginning (i.e.
486 ``stream.tell() == 0``).
487 metadata (Mapping[str, str]): The resource metadata, such as an
488 ACL list.
489 content_type (str): The content type of the resource, e.g. a JPEG
490 image has content type ``image/jpeg``.
491 total_bytes (Optional[int]): The total number of bytes to be
492 uploaded. If specified, the upload size **will not** be
493 determined from the stream (even if ``stream_final=True``).
494 stream_final (Optional[bool]): Indicates if the ``stream`` is
495 "final" (i.e. no more bytes will be added to it). In this case
496 we determine the upload size from the size of the stream. If
497 ``total_bytes`` is passed, this argument will be ignored.
498
499 Returns:
500 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
501
502 * HTTP verb for the request (always POST)
503 * the URL for the request
504 * the body of the request
505 * headers for the request
506
507 Raises:
508 ValueError: If the current upload has already been initiated.
509 ValueError: If ``stream`` is not at the beginning.
510
511 .. _sans-I/O: https://sans-io.readthedocs.io/
512 """
513 if self.resumable_url is not None:
514 raise ValueError("This upload has already been initiated.")
515 if stream.tell() != 0:
516 raise ValueError("Stream must be at beginning.")
517
518 self._stream = stream
519 self._content_type = content_type
520
521 # Signed URL requires content type set directly - not through x-upload-content-type
522 parse_result = urllib.parse.urlparse(self.upload_url)
523 parsed_query = urllib.parse.parse_qs(parse_result.query)
524 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query:
525 # Deconstruct **self._headers first so that content type defined here takes priority
526 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type}
527 else:
528 # Deconstruct **self._headers first so that content type defined here takes priority
529 headers = {
530 **self._headers,
531 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8",
532 "x-upload-content-type": content_type,
533 }
534 # Set the total bytes if possible.
535 if total_bytes is not None:
536 self._total_bytes = total_bytes
537 elif stream_final:
538 self._total_bytes = get_total_bytes(stream)
539 # Add the total bytes to the headers if set.
540 if self._total_bytes is not None:
541 content_length = "{:d}".format(self._total_bytes)
542 headers["x-upload-content-length"] = content_length
543
544 payload = json.dumps(metadata).encode("utf-8")
545 return _POST, self.upload_url, payload, headers
546
547 def _process_initiate_response(self, response):
548 """Process the response from an HTTP request that initiated upload.
549
550 This is everything that must be done after a request that doesn't
551 require network I/O (or other I/O). This is based on the `sans-I/O`_
552 philosophy.
553
554 This method takes the URL from the ``Location`` header and stores it
555 for future use. Within that URL, we assume the ``upload_id`` query
556 parameter has been included, but we do not check.
557
558 Args:
559 response (object): The HTTP response object (need headers).
560
561 .. _sans-I/O: https://sans-io.readthedocs.io/
562 """
563 _helpers.require_status_code(
564 response,
565 (http.client.OK, http.client.CREATED),
566 self._get_status_code,
567 callback=self._make_invalid,
568 )
569 self._resumable_url = _helpers.header_required(
570 response, "location", self._get_headers
571 )
572
573 def initiate(
574 self,
575 transport,
576 stream,
577 metadata,
578 content_type,
579 total_bytes=None,
580 stream_final=True,
581 timeout=None,
582 ):
583 """Initiate a resumable upload.
584
585 By default, this method assumes your ``stream`` is in a "final"
586 state ready to transmit. However, ``stream_final=False`` can be used
587 to indicate that the size of the resource is not known. This can happen
588 if bytes are being dynamically fed into ``stream``, e.g. if the stream
589 is attached to application logs.
590
591 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
592 read from the stream every time :meth:`transmit_next_chunk` is called.
593 If one of those reads produces strictly fewer bites than the chunk
594 size, the upload will be concluded.
595
596 Args:
597 transport (object): An object which can make authenticated
598 requests.
599 stream (IO[bytes]): The stream (i.e. file-like object) that will
600 be uploaded. The stream **must** be at the beginning (i.e.
601 ``stream.tell() == 0``).
602 metadata (Mapping[str, str]): The resource metadata, such as an
603 ACL list.
604 content_type (str): The content type of the resource, e.g. a JPEG
605 image has content type ``image/jpeg``.
606 total_bytes (Optional[int]): The total number of bytes to be
607 uploaded. If specified, the upload size **will not** be
608 determined from the stream (even if ``stream_final=True``).
609 stream_final (Optional[bool]): Indicates if the ``stream`` is
610 "final" (i.e. no more bytes will be added to it). In this case
611 we determine the upload size from the size of the stream. If
612 ``total_bytes`` is passed, this argument will be ignored.
613 timeout (Optional[Union[float, Tuple[float, float]]]):
614 The number of seconds to wait for the server response.
615 Depending on the retry strategy, a request may be repeated
616 several times using the same timeout each time.
617
618 Can also be passed as a tuple (connect_timeout, read_timeout).
619 See :meth:`requests.Session.request` documentation for details.
620
621 Raises:
622 NotImplementedError: Always, since virtual.
623 """
624 raise NotImplementedError("This implementation is virtual.")
625
626 def _prepare_request(self):
627 """Prepare the contents of HTTP request to upload a chunk.
628
629 This is everything that must be done before a request that doesn't
630 require network I/O. This is based on the `sans-I/O`_ philosophy.
631
632 For the time being, this **does require** some form of I/O to read
633 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this
634 will (almost) certainly not be network I/O.
635
636 Returns:
637 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
638
639 * HTTP verb for the request (always PUT)
640 * the URL for the request
641 * the body of the request
642 * headers for the request
643
644 The headers incorporate the ``_headers`` on the current instance.
645
646 Raises:
647 ValueError: If the current upload has finished.
648 ValueError: If the current upload is in an invalid state.
649 ValueError: If the current upload has not been initiated.
650 ValueError: If the location in the stream (i.e. ``stream.tell()``)
651 does not agree with ``bytes_uploaded``.
652
653 .. _sans-I/O: https://sans-io.readthedocs.io/
654 """
655 if self.finished:
656 raise ValueError("Upload has finished.")
657 if self.invalid:
658 raise ValueError(
659 "Upload is in an invalid state. To recover call `recover()`."
660 )
661 if self.resumable_url is None:
662 raise ValueError(
663 "This upload has not been initiated. Please call "
664 "initiate() before beginning to transmit chunks."
665 )
666
667 start_byte, payload, content_range = get_next_chunk(
668 self._stream, self._chunk_size, self._total_bytes
669 )
670 if start_byte != self.bytes_uploaded:
671 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded)
672 raise ValueError(msg)
673
674 self._update_checksum(start_byte, payload)
675
676 headers = {
677 **self._headers,
678 _CONTENT_TYPE_HEADER: self._content_type,
679 _helpers.CONTENT_RANGE_HEADER: content_range,
680 }
681 return _PUT, self.resumable_url, payload, headers
682
683 def _update_checksum(self, start_byte, payload):
684 """Update the checksum with the payload if not already updated.
685
686 Because error recovery can result in bytes being transmitted more than
687 once, the checksum tracks the number of bytes checked in
688 self._bytes_checksummed and skips bytes that have already been summed.
689 """
690 if not self._checksum_type:
691 return
692
693 if not self._checksum_object:
694 self._checksum_object = _helpers._get_checksum_object(self._checksum_type)
695
696 if start_byte < self._bytes_checksummed:
697 offset = self._bytes_checksummed - start_byte
698 data = payload[offset:]
699 else:
700 data = payload
701
702 self._checksum_object.update(data)
703 self._bytes_checksummed += len(data)
704
705 def _make_invalid(self):
706 """Simple setter for ``invalid``.
707
708 This is intended to be passed along as a callback to helpers that
709 raise an exception so they can mark this instance as invalid before
710 raising.
711 """
712 self._invalid = True
713
714 def _process_resumable_response(self, response, bytes_sent):
715 """Process the response from an HTTP request.
716
717 This is everything that must be done after a request that doesn't
718 require network I/O (or other I/O). This is based on the `sans-I/O`_
719 philosophy.
720
721 Args:
722 response (object): The HTTP response object.
723 bytes_sent (int): The number of bytes sent in the request that
724 ``response`` was returned for.
725
726 Raises:
727 ~google.cloud.storage.exceptions.InvalidResponse: If the status
728 code is 308 and the ``range`` header is not of the form
729 ``bytes 0-{end}``.
730 ~google.cloud.storage.exceptions.InvalidResponse: If the status
731 code is not 200 or 308.
732
733 .. _sans-I/O: https://sans-io.readthedocs.io/
734 """
735 status_code = _helpers.require_status_code(
736 response,
737 (http.client.OK, http.client.PERMANENT_REDIRECT),
738 self._get_status_code,
739 callback=self._make_invalid,
740 )
741 if status_code == http.client.OK:
742 # NOTE: We use the "local" information of ``bytes_sent`` to update
743 # ``bytes_uploaded``, but do not verify this against other
744 # state. However, there may be some other information:
745 #
746 # * a ``size`` key in JSON response body
747 # * the ``total_bytes`` attribute (if set)
748 # * ``stream.tell()`` (relying on fact that ``initiate()``
749 # requires stream to be at the beginning)
750 self._bytes_uploaded = self._bytes_uploaded + bytes_sent
751 # Tombstone the current upload so it cannot be used again.
752 self._finished = True
753 # Validate the checksum. This can raise an exception on failure.
754 self._validate_checksum(response)
755 else:
756 bytes_range = _helpers.header_required(
757 response,
758 _helpers.RANGE_HEADER,
759 self._get_headers,
760 callback=self._make_invalid,
761 )
762 match = _BYTES_RANGE_RE.match(bytes_range)
763 if match is None:
764 self._make_invalid()
765 raise InvalidResponse(
766 response,
767 'Unexpected "range" header',
768 bytes_range,
769 'Expected to be of the form "bytes=0-{end}"',
770 )
771 self._bytes_uploaded = int(match.group("end_byte")) + 1
772
773 def _validate_checksum(self, response):
774 """Check the computed checksum, if any, against the recieved metadata.
775
776 Args:
777 response (object): The HTTP response object.
778
779 Raises:
780 ~google.cloud.storage.exceptions.DataCorruption: If the checksum
781 computed locally and the checksum reported by the remote host do
782 not match.
783 """
784 if self._checksum_type is None:
785 return
786 metadata_key = _helpers._get_metadata_key(self._checksum_type)
787 metadata = response.json()
788 remote_checksum = metadata.get(metadata_key)
789 if remote_checksum is None:
790 raise InvalidResponse(
791 response,
792 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
793 self._get_headers(response),
794 )
795 local_checksum = _helpers.prepare_checksum_digest(
796 self._checksum_object.digest()
797 )
798 if local_checksum != remote_checksum:
799 raise DataCorruption(
800 response,
801 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
802 self._checksum_type.upper(), local_checksum, remote_checksum
803 ),
804 )
805
806 def transmit_next_chunk(self, transport, timeout=None):
807 """Transmit the next chunk of the resource to be uploaded.
808
809 If the current upload was initiated with ``stream_final=False``,
810 this method will dynamically determine if the upload has completed.
811 The upload will be considered complete if the stream produces
812 fewer than :attr:`chunk_size` bytes when a chunk is read from it.
813
814 Args:
815 transport (object): An object which can make authenticated
816 requests.
817 timeout (Optional[Union[float, Tuple[float, float]]]):
818 The number of seconds to wait for the server response.
819 Depending on the retry strategy, a request may be repeated
820 several times using the same timeout each time.
821
822 Can also be passed as a tuple (connect_timeout, read_timeout).
823 See :meth:`requests.Session.request` documentation for details.
824
825 Raises:
826 NotImplementedError: Always, since virtual.
827 """
828 raise NotImplementedError("This implementation is virtual.")
829
830 def _prepare_recover_request(self):
831 """Prepare the contents of HTTP request to recover from failure.
832
833 This is everything that must be done before a request that doesn't
834 require network I/O. This is based on the `sans-I/O`_ philosophy.
835
836 We assume that the :attr:`resumable_url` is set (i.e. the only way
837 the upload can end up :attr:`invalid` is if it has been initiated.
838
839 Returns:
840 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple
841
842 * HTTP verb for the request (always PUT)
843 * the URL for the request
844 * the body of the request (always :data:`None`)
845 * headers for the request
846
847 The headers **do not** incorporate the ``_headers`` on the
848 current instance.
849
850 .. _sans-I/O: https://sans-io.readthedocs.io/
851 """
852 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"}
853 return _PUT, self.resumable_url, None, headers
854
855 def _process_recover_response(self, response):
856 """Process the response from an HTTP request to recover from failure.
857
858 This is everything that must be done after a request that doesn't
859 require network I/O (or other I/O). This is based on the `sans-I/O`_
860 philosophy.
861
862 Args:
863 response (object): The HTTP response object.
864
865 Raises:
866 ~google.cloud.storage.exceptions.InvalidResponse: If the status
867 code is not 308.
868 ~google.cloud.storage.exceptions.InvalidResponse: If the status
869 code is 308 and the ``range`` header is not of the form
870 ``bytes 0-{end}``.
871
872 .. _sans-I/O: https://sans-io.readthedocs.io/
873 """
874 _helpers.require_status_code(
875 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code
876 )
877 headers = self._get_headers(response)
878 if _helpers.RANGE_HEADER in headers:
879 bytes_range = headers[_helpers.RANGE_HEADER]
880 match = _BYTES_RANGE_RE.match(bytes_range)
881 if match is None:
882 raise InvalidResponse(
883 response,
884 'Unexpected "range" header',
885 bytes_range,
886 'Expected to be of the form "bytes=0-{end}"',
887 )
888 self._bytes_uploaded = int(match.group("end_byte")) + 1
889 else:
890 # In this case, the upload has not "begun".
891 self._bytes_uploaded = 0
892
893 self._stream.seek(self._bytes_uploaded)
894 self._invalid = False
895
896 def recover(self, transport):
897 """Recover from a failure.
898
899 This method should be used when a :class:`ResumableUpload` is in an
900 :attr:`~ResumableUpload.invalid` state due to a request failure.
901
902 This will verify the progress with the server and make sure the
903 current upload is in a valid state before :meth:`transmit_next_chunk`
904 can be used again.
905
906 Args:
907 transport (object): An object which can make authenticated
908 requests.
909
910 Raises:
911 NotImplementedError: Always, since virtual.
912 """
913 raise NotImplementedError("This implementation is virtual.")
914
915
916class XMLMPUContainer(UploadBase):
917 """Initiate and close an upload using the XML MPU API.
918
919 An XML MPU sends an initial request and then receives an upload ID.
920 Using the upload ID, the upload is then done in numbered parts and the
921 parts can be uploaded concurrently.
922
923 In order to avoid concurrency issues with this container object, the
924 uploading of individual parts is handled separately, by XMLMPUPart objects
925 spawned from this container class. The XMLMPUPart objects are not
926 necessarily in the same process as the container, so they do not update the
927 container automatically.
928
929 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous
930 given the JSON multipart upload, so the abbreviation "MPU" will be used
931 throughout.
932
933 See: https://cloud.google.com/storage/docs/multipart-uploads
934
935 Args:
936 upload_url (str): The URL of the object (without query parameters). The
937 initiate, PUT, and finalization requests will all use this URL, with
938 varying query parameters.
939 filename (str): The name (path) of the file to upload.
940 headers (Optional[Mapping[str, str]]): Extra headers that should
941 be sent with every request.
942 retry (Optional[google.api_core.retry.Retry]): How to retry the
943 RPC. A None value will disable retries. A
944 google.api_core.retry.Retry value will enable retries, and the
945 object will configure backoff and timeout options.
946
947 See the retry.py source code and docstrings in this package
948 (google.cloud.storage.retry) for information on retry types and how
949 to configure them.
950
951 Attributes:
952 upload_url (str): The URL where the content will be uploaded.
953 upload_id (Optional(str)): The ID of the upload from the initialization
954 response.
955 """
956
957 def __init__(
958 self, upload_url, filename, headers=None, upload_id=None, retry=DEFAULT_RETRY
959 ):
960 super().__init__(upload_url, headers=headers, retry=retry)
961 self._filename = filename
962 self._upload_id = upload_id
963 self._parts = {}
964
965 @property
966 def upload_id(self):
967 return self._upload_id
968
969 def register_part(self, part_number, etag):
970 """Register an uploaded part by part number and corresponding etag.
971
972 XMLMPUPart objects represent individual parts, and their part number
973 and etag can be registered to the container object with this method
974 and therefore incorporated in the finalize() call to finish the upload.
975
976 This method accepts part_number and etag, but not XMLMPUPart objects
977 themselves, to reduce the complexity involved in running XMLMPUPart
978 uploads in separate processes.
979
980 Args:
981 part_number (int): The part number. Parts are assembled into the
982 final uploaded object with finalize() in order of their part
983 numbers.
984 etag (str): The etag included in the server response after upload.
985 """
986 self._parts[part_number] = etag
987
988 def _prepare_initiate_request(self, content_type):
989 """Prepare the contents of HTTP request to initiate upload.
990
991 This is everything that must be done before a request that doesn't
992 require network I/O (or other I/O). This is based on the `sans-I/O`_
993 philosophy.
994
995 Args:
996 content_type (str): The content type of the resource, e.g. a JPEG
997 image has content type ``image/jpeg``.
998
999 Returns:
1000 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1001
1002 * HTTP verb for the request (always POST)
1003 * the URL for the request
1004 * the body of the request
1005 * headers for the request
1006
1007 Raises:
1008 ValueError: If the current upload has already been initiated.
1009
1010 .. _sans-I/O: https://sans-io.readthedocs.io/
1011 """
1012 if self.upload_id is not None:
1013 raise ValueError("This upload has already been initiated.")
1014
1015 initiate_url = self.upload_url + _MPU_INITIATE_QUERY
1016
1017 headers = {
1018 **self._headers,
1019 _CONTENT_TYPE_HEADER: content_type,
1020 }
1021 return _POST, initiate_url, None, headers
1022
1023 def _process_initiate_response(self, response):
1024 """Process the response from an HTTP request that initiated the upload.
1025
1026 This is everything that must be done after a request that doesn't
1027 require network I/O (or other I/O). This is based on the `sans-I/O`_
1028 philosophy.
1029
1030 This method takes the URL from the ``Location`` header and stores it
1031 for future use. Within that URL, we assume the ``upload_id`` query
1032 parameter has been included, but we do not check.
1033
1034 Args:
1035 response (object): The HTTP response object.
1036
1037 Raises:
1038 ~google.cloud.storage.exceptions.InvalidResponse: If the status
1039 code is not 200.
1040
1041 .. _sans-I/O: https://sans-io.readthedocs.io/
1042 """
1043 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
1044 root = ElementTree.fromstring(response.text)
1045 self._upload_id = root.find(_S3_COMPAT_XML_NAMESPACE + _UPLOAD_ID_NODE).text
1046
1047 def initiate(
1048 self,
1049 transport,
1050 content_type,
1051 timeout=None,
1052 ):
1053 """Initiate an MPU and record the upload ID.
1054
1055 Args:
1056 transport (object): An object which can make authenticated
1057 requests.
1058 content_type (str): The content type of the resource, e.g. a JPEG
1059 image has content type ``image/jpeg``.
1060 timeout (Optional[Union[float, Tuple[float, float]]]):
1061 The number of seconds to wait for the server response.
1062 Depending on the retry strategy, a request may be repeated
1063 several times using the same timeout each time.
1064
1065 Can also be passed as a tuple (connect_timeout, read_timeout).
1066 See :meth:`requests.Session.request` documentation for details.
1067
1068 Raises:
1069 NotImplementedError: Always, since virtual.
1070 """
1071 raise NotImplementedError("This implementation is virtual.")
1072
1073 def _prepare_finalize_request(self):
1074 """Prepare the contents of an HTTP request to finalize the upload.
1075
1076 All of the parts must be registered before calling this method.
1077
1078 Returns:
1079 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1080
1081 * HTTP verb for the request (always POST)
1082 * the URL for the request
1083 * the body of the request
1084 * headers for the request
1085
1086 Raises:
1087 ValueError: If the upload has not been initiated.
1088 """
1089 if self.upload_id is None:
1090 raise ValueError("This upload has not yet been initiated.")
1091
1092 final_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id)
1093 finalize_url = self.upload_url + final_query
1094 final_xml_root = ElementTree.Element("CompleteMultipartUpload")
1095 for part_number, etag in self._parts.items():
1096 part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop
1097 ElementTree.SubElement(part, "PartNumber").text = str(part_number)
1098 ElementTree.SubElement(part, "ETag").text = etag
1099 payload = ElementTree.tostring(final_xml_root)
1100 return _POST, finalize_url, payload, self._headers
1101
1102 def _process_finalize_response(self, response):
1103 """Process the response from an HTTP request that finalized the upload.
1104
1105 This is everything that must be done after a request that doesn't
1106 require network I/O (or other I/O). This is based on the `sans-I/O`_
1107 philosophy.
1108
1109 Args:
1110 response (object): The HTTP response object.
1111
1112 Raises:
1113 ~google.cloud.storage.exceptions.InvalidResponse: If the status
1114 code is not 200.
1115
1116 .. _sans-I/O: https://sans-io.readthedocs.io/
1117 """
1118
1119 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
1120 self._finished = True
1121
1122 def finalize(
1123 self,
1124 transport,
1125 timeout=None,
1126 ):
1127 """Finalize an MPU request with all the parts.
1128
1129 Args:
1130 transport (object): An object which can make authenticated
1131 requests.
1132 timeout (Optional[Union[float, Tuple[float, float]]]):
1133 The number of seconds to wait for the server response.
1134 Depending on the retry strategy, a request may be repeated
1135 several times using the same timeout each time.
1136
1137 Can also be passed as a tuple (connect_timeout, read_timeout).
1138 See :meth:`requests.Session.request` documentation for details.
1139
1140 Raises:
1141 NotImplementedError: Always, since virtual.
1142 """
1143 raise NotImplementedError("This implementation is virtual.")
1144
1145 def _prepare_cancel_request(self):
1146 """Prepare the contents of an HTTP request to cancel the upload.
1147
1148 Returns:
1149 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1150
1151 * HTTP verb for the request (always DELETE)
1152 * the URL for the request
1153 * the body of the request
1154 * headers for the request
1155
1156 Raises:
1157 ValueError: If the upload has not been initiated.
1158 """
1159 if self.upload_id is None:
1160 raise ValueError("This upload has not yet been initiated.")
1161
1162 cancel_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id)
1163 cancel_url = self.upload_url + cancel_query
1164 return _DELETE, cancel_url, None, self._headers
1165
1166 def _process_cancel_response(self, response):
1167 """Process the response from an HTTP request that canceled the upload.
1168
1169 This is everything that must be done after a request that doesn't
1170 require network I/O (or other I/O). This is based on the `sans-I/O`_
1171 philosophy.
1172
1173 Args:
1174 response (object): The HTTP response object.
1175
1176 Raises:
1177 ~google.cloud.storage.exceptions.InvalidResponse: If the status
1178 code is not 204.
1179
1180 .. _sans-I/O: https://sans-io.readthedocs.io/
1181 """
1182
1183 _helpers.require_status_code(
1184 response, (http.client.NO_CONTENT,), self._get_status_code
1185 )
1186
1187 def cancel(
1188 self,
1189 transport,
1190 timeout=None,
1191 ):
1192 """Cancel an MPU request and permanently delete any uploaded parts.
1193
1194 This cannot be undone.
1195
1196 Args:
1197 transport (object): An object which can make authenticated
1198 requests.
1199 timeout (Optional[Union[float, Tuple[float, float]]]):
1200 The number of seconds to wait for the server response.
1201 Depending on the retry strategy, a request may be repeated
1202 several times using the same timeout each time.
1203
1204 Can also be passed as a tuple (connect_timeout, read_timeout).
1205 See :meth:`requests.Session.request` documentation for details.
1206
1207 Raises:
1208 NotImplementedError: Always, since virtual.
1209 """
1210 raise NotImplementedError("This implementation is virtual.")
1211
1212
1213class XMLMPUPart(UploadBase):
1214 """Upload a single part of an existing XML MPU container.
1215
1216 An XML MPU sends an initial request and then receives an upload ID.
1217 Using the upload ID, the upload is then done in numbered parts and the
1218 parts can be uploaded concurrently.
1219
1220 In order to avoid concurrency issues with the container object, the
1221 uploading of individual parts is handled separately by multiple objects
1222 of this class. Once a part is uploaded, it can be registered with the
1223 container with `container.register_part(part.part_number, part.etag)`.
1224
1225 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous
1226 given the JSON multipart upload, so the abbreviation "MPU" will be used
1227 throughout.
1228
1229 See: https://cloud.google.com/storage/docs/multipart-uploads
1230
1231 Args:
1232 upload_url (str): The URL of the object (without query parameters).
1233 upload_id (str): The ID of the upload from the initialization response.
1234 filename (str): The name (path) of the file to upload.
1235 start (int): The byte index of the beginning of the part.
1236 end (int): The byte index of the end of the part.
1237 part_number (int): The part number. Part numbers will be assembled in
1238 sequential order when the container is finalized.
1239 headers (Optional[Mapping[str, str]]): Extra headers that should
1240 be sent with every request.
1241 checksum (Optional([str])): The type of checksum to compute to verify
1242 the integrity of the object. The request headers will be amended
1243 to include the computed value. Supported values are "md5", "crc32c",
1244 "auto" and None. The default is "auto", which will try to detect if
1245 the C extension for crc32c is installed and fall back to md5
1246 otherwise.
1247 retry (Optional[google.api_core.retry.Retry]): How to retry the
1248 RPC. A None value will disable retries. A
1249 google.api_core.retry.Retry value will enable retries, and the
1250 object will configure backoff and timeout options.
1251
1252 See the retry.py source code and docstrings in this package
1253 (google.cloud.storage.retry) for information on retry types and how
1254 to configure them.
1255
1256 Attributes:
1257 upload_url (str): The URL of the object (without query parameters).
1258 upload_id (str): The ID of the upload from the initialization response.
1259 filename (str): The name (path) of the file to upload.
1260 start (int): The byte index of the beginning of the part.
1261 end (int): The byte index of the end of the part.
1262 part_number (int): The part number. Part numbers will be assembled in
1263 sequential order when the container is finalized.
1264 etag (Optional(str)): The etag returned by the service after upload.
1265 """
1266
1267 def __init__(
1268 self,
1269 upload_url,
1270 upload_id,
1271 filename,
1272 start,
1273 end,
1274 part_number,
1275 headers=None,
1276 checksum="auto",
1277 retry=DEFAULT_RETRY,
1278 ):
1279 super().__init__(upload_url, headers=headers, retry=retry)
1280 self._filename = filename
1281 self._start = start
1282 self._end = end
1283 self._upload_id = upload_id
1284 self._part_number = part_number
1285 self._etag = None
1286 self._checksum_type = checksum
1287 if self._checksum_type == "auto":
1288 self._checksum_type = (
1289 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5"
1290 )
1291 self._checksum_object = None
1292
1293 @property
1294 def part_number(self):
1295 return self._part_number
1296
1297 @property
1298 def upload_id(self):
1299 return self._upload_id
1300
1301 @property
1302 def filename(self):
1303 return self._filename
1304
1305 @property
1306 def etag(self):
1307 return self._etag
1308
1309 @property
1310 def start(self):
1311 return self._start
1312
1313 @property
1314 def end(self):
1315 return self._end
1316
1317 def _prepare_upload_request(self):
1318 """Prepare the contents of HTTP request to upload a part.
1319
1320 This is everything that must be done before a request that doesn't
1321 require network I/O. This is based on the `sans-I/O`_ philosophy.
1322
1323 For the time being, this **does require** some form of I/O to read
1324 a part from ``stream`` (via :func:`get_part_payload`). However, this
1325 will (almost) certainly not be network I/O.
1326
1327 Returns:
1328 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1329
1330 * HTTP verb for the request (always PUT)
1331 * the URL for the request
1332 * the body of the request
1333 * headers for the request
1334
1335 The headers incorporate the ``_headers`` on the current instance.
1336
1337 Raises:
1338 ValueError: If the current upload has finished.
1339
1340 .. _sans-I/O: https://sans-io.readthedocs.io/
1341 """
1342 if self.finished:
1343 raise ValueError("This part has already been uploaded.")
1344
1345 with open(self._filename, "br") as f:
1346 f.seek(self._start)
1347 payload = f.read(self._end - self._start)
1348
1349 self._checksum_object = _helpers._get_checksum_object(self._checksum_type)
1350 if self._checksum_object is not None:
1351 self._checksum_object.update(payload)
1352
1353 part_query = _MPU_PART_QUERY_TEMPLATE.format(
1354 part=self._part_number, upload_id=self._upload_id
1355 )
1356 upload_url = self.upload_url + part_query
1357 return _PUT, upload_url, payload, self._headers
1358
1359 def _process_upload_response(self, response):
1360 """Process the response from an HTTP request.
1361
1362 This is everything that must be done after a request that doesn't
1363 require network I/O (or other I/O). This is based on the `sans-I/O`_
1364 philosophy.
1365
1366 Args:
1367 response (object): The HTTP response object.
1368
1369 Raises:
1370 ~google.cloud.storage.exceptions.InvalidResponse: If the status
1371 code is not 200 or the response is missing data.
1372
1373 .. _sans-I/O: https://sans-io.readthedocs.io/
1374 """
1375 _helpers.require_status_code(
1376 response,
1377 (http.client.OK,),
1378 self._get_status_code,
1379 )
1380
1381 self._validate_checksum(response)
1382
1383 etag = _helpers.header_required(response, "etag", self._get_headers)
1384 self._etag = etag
1385 self._finished = True
1386
1387 def upload(
1388 self,
1389 transport,
1390 timeout=None,
1391 ):
1392 """Upload the part.
1393
1394 Args:
1395 transport (object): An object which can make authenticated
1396 requests.
1397 timeout (Optional[Union[float, Tuple[float, float]]]):
1398 The number of seconds to wait for the server response.
1399 Depending on the retry strategy, a request may be repeated
1400 several times using the same timeout each time.
1401
1402 Can also be passed as a tuple (connect_timeout, read_timeout).
1403 See :meth:`requests.Session.request` documentation for details.
1404
1405 Raises:
1406 NotImplementedError: Always, since virtual.
1407 """
1408 raise NotImplementedError("This implementation is virtual.")
1409
1410 def _validate_checksum(self, response):
1411 """Check the computed checksum, if any, against the response headers.
1412
1413 Args:
1414 response (object): The HTTP response object.
1415
1416 Raises:
1417 ~google.cloud.storage.exceptions.DataCorruption: If the checksum
1418 computed locally and the checksum reported by the remote host do
1419 not match.
1420 """
1421 if self._checksum_type is None:
1422 return
1423
1424 remote_checksum = _helpers._get_uploaded_checksum_from_headers(
1425 response, self._get_headers, self._checksum_type
1426 )
1427
1428 if remote_checksum is None:
1429 metadata_key = _helpers._get_metadata_key(self._checksum_type)
1430 raise InvalidResponse(
1431 response,
1432 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
1433 self._get_headers(response),
1434 )
1435 local_checksum = _helpers.prepare_checksum_digest(
1436 self._checksum_object.digest()
1437 )
1438 if local_checksum != remote_checksum:
1439 raise DataCorruption(
1440 response,
1441 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
1442 self._checksum_type.upper(), local_checksum, remote_checksum
1443 ),
1444 )
1445
1446
1447def get_boundary():
1448 """Get a random boundary for a multipart request.
1449
1450 Returns:
1451 bytes: The boundary used to separate parts of a multipart request.
1452 """
1453 random_int = random.randrange(sys.maxsize)
1454 boundary = _BOUNDARY_FORMAT.format(random_int)
1455 # NOTE: Neither % formatting nor .format() are available for byte strings
1456 # in Python 3.4, so we must use unicode strings as templates.
1457 return boundary.encode("utf-8")
1458
1459
1460def construct_multipart_request(data, metadata, content_type):
1461 """Construct a multipart request body.
1462
1463 Args:
1464 data (bytes): The resource content (UTF-8 encoded as bytes)
1465 to be uploaded.
1466 metadata (Mapping[str, str]): The resource metadata, such as an
1467 ACL list.
1468 content_type (str): The content type of the resource, e.g. a JPEG
1469 image has content type ``image/jpeg``.
1470
1471 Returns:
1472 Tuple[bytes, bytes]: The multipart request body and the boundary used
1473 between each part.
1474 """
1475 multipart_boundary = get_boundary()
1476 json_bytes = json.dumps(metadata).encode("utf-8")
1477 content_type = content_type.encode("utf-8")
1478 # Combine the two parts into a multipart payload.
1479 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4.
1480 boundary_sep = _MULTIPART_SEP + multipart_boundary
1481 content = (
1482 boundary_sep
1483 + _MULTIPART_BEGIN
1484 + json_bytes
1485 + _CRLF
1486 + boundary_sep
1487 + _CRLF
1488 + b"content-type: "
1489 + content_type
1490 + _CRLF
1491 + _CRLF
1492 + data # Empty line between headers and body.
1493 + _CRLF
1494 + boundary_sep
1495 + _MULTIPART_SEP
1496 )
1497
1498 return content, multipart_boundary
1499
1500
1501def get_total_bytes(stream):
1502 """Determine the total number of bytes in a stream.
1503
1504 Args:
1505 stream (IO[bytes]): The stream (i.e. file-like object).
1506
1507 Returns:
1508 int: The number of bytes.
1509 """
1510 current_position = stream.tell()
1511 # NOTE: ``.seek()`` **should** return the same value that ``.tell()``
1512 # returns, but in Python 2, ``file`` objects do not.
1513 stream.seek(0, os.SEEK_END)
1514 end_position = stream.tell()
1515 # Go back to the initial position.
1516 stream.seek(current_position)
1517
1518 return end_position
1519
1520
1521def get_next_chunk(stream, chunk_size, total_bytes):
1522 """Get a chunk from an I/O stream.
1523
1524 The ``stream`` may have fewer bytes remaining than ``chunk_size``
1525 so it may not always be the case that
1526 ``end_byte == start_byte + chunk_size - 1``.
1527
1528 Args:
1529 stream (IO[bytes]): The stream (i.e. file-like object).
1530 chunk_size (int): The size of the chunk to be read from the ``stream``.
1531 total_bytes (Optional[int]): The (expected) total number of bytes
1532 in the ``stream``.
1533
1534 Returns:
1535 Tuple[int, bytes, str]: Triple of:
1536
1537 * the start byte index
1538 * the content in between the start and end bytes (inclusive)
1539 * content range header for the chunk (slice) that has been read
1540
1541 Raises:
1542 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields
1543 non-empty content.
1544 ValueError: If there is no data left to consume. This corresponds
1545 exactly to the case ``end_byte < start_byte``, which can only
1546 occur if ``end_byte == start_byte - 1``.
1547 """
1548 start_byte = stream.tell()
1549 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0:
1550 payload = stream.read(total_bytes - start_byte)
1551 else:
1552 payload = stream.read(chunk_size)
1553 end_byte = stream.tell() - 1
1554
1555 num_bytes_read = len(payload)
1556 if total_bytes is None:
1557 if num_bytes_read < chunk_size:
1558 # We now **KNOW** the total number of bytes.
1559 total_bytes = end_byte + 1
1560 elif total_bytes == 0:
1561 # NOTE: We also expect ``start_byte == 0`` here but don't check
1562 # because ``_prepare_initiate_request()`` requires the
1563 # stream to be at the beginning.
1564 if num_bytes_read != 0:
1565 raise ValueError(
1566 "Stream specified as empty, but produced non-empty content."
1567 )
1568 else:
1569 if num_bytes_read == 0:
1570 raise ValueError(
1571 "Stream is already exhausted. There is no content remaining."
1572 )
1573
1574 content_range = get_content_range(start_byte, end_byte, total_bytes)
1575 return start_byte, payload, content_range
1576
1577
1578def get_content_range(start_byte, end_byte, total_bytes):
1579 """Convert start, end and total into content range header.
1580
1581 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*".
1582 If we are dealing with an empty range (i.e. ``end_byte < start_byte``)
1583 then "bytes */{total}" is used.
1584
1585 This function **ASSUMES** that if the size is not known, the caller will
1586 not also pass an empty range.
1587
1588 Args:
1589 start_byte (int): The start (inclusive) of the byte range.
1590 end_byte (int): The end (inclusive) of the byte range.
1591 total_bytes (Optional[int]): The number of bytes in the byte
1592 range (if known).
1593
1594 Returns:
1595 str: The content range header.
1596 """
1597 if total_bytes is None:
1598 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte)
1599 elif end_byte < start_byte:
1600 return _EMPTY_RANGE_TEMPLATE.format(total_bytes)
1601 else:
1602 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)