1# Copyright 2017 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Virtual bases classes for uploading media via Google APIs.
16
17Supported here are:
18
19* simple (media) uploads
20* multipart uploads that contain both metadata and a small file as payload
21* resumable uploads (with metadata as well)
22"""
23
24import http.client
25import json
26import os
27import random
28import re
29import sys
30import urllib.parse
31
32from google.cloud.storage._media import _helpers
33from google.cloud.storage._media import UPLOAD_CHUNK_SIZE
34from google.cloud.storage.exceptions import InvalidResponse
35from google.cloud.storage.exceptions import DataCorruption
36from google.cloud.storage.retry import DEFAULT_RETRY
37
38from xml.etree import ElementTree
39
40
41_CONTENT_TYPE_HEADER = "content-type"
42_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}"
43_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*"
44_EMPTY_RANGE_TEMPLATE = "bytes */{:d}"
45_BOUNDARY_WIDTH = len(str(sys.maxsize - 1))
46_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH)
47_MULTIPART_SEP = b"--"
48_CRLF = b"\r\n"
49_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n"
50_RELATED_HEADER = b'multipart/related; boundary="'
51_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE)
52_STREAM_ERROR_TEMPLATE = (
53 "Bytes stream is in unexpected state. "
54 "The local stream has had {:d} bytes read from it while "
55 "{:d} bytes have already been updated (they should match)."
56)
57_STREAM_READ_PAST_TEMPLATE = (
58 "{:d} bytes have been read from the stream, which exceeds "
59 "the expected total {:d}."
60)
61_DELETE = "DELETE"
62_POST = "POST"
63_PUT = "PUT"
64_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = (
65 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the "
66 "remote host, ``{}``, did not match."
67)
68_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
69 "Response metadata had no ``{}`` value; checksum could not be validated."
70)
71_UPLOAD_HEADER_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
72 "Response headers had no ``{}`` value; checksum could not be validated."
73)
74_MPU_INITIATE_QUERY = "?uploads"
75_MPU_PART_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}"
76_S3_COMPAT_XML_NAMESPACE = "{http://s3.amazonaws.com/doc/2006-03-01/}"
77_UPLOAD_ID_NODE = "UploadId"
78_MPU_FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}"
79
80
81class UploadBase(object):
82 """Base class for upload helpers.
83
84 Defines core shared behavior across different upload types.
85
86 Args:
87 upload_url (str): The URL where the content will be uploaded.
88 headers (Optional[Mapping[str, str]]): Extra headers that should
89 be sent with the request, e.g. headers for encrypted data.
90 retry (Optional[google.api_core.retry.Retry]): How to retry the
91 RPC. A None value will disable retries. A
92 google.api_core.retry.Retry value will enable retries, and the
93 object will configure backoff and timeout options.
94
95 See the retry.py source code and docstrings in this package
96 (google.cloud.storage.retry) for information on retry types and how
97 to configure them.
98
99 Attributes:
100 upload_url (str): The URL where the content will be uploaded.
101 """
102
103 def __init__(self, upload_url, headers=None, retry=DEFAULT_RETRY):
104 self.upload_url = upload_url
105 if headers is None:
106 headers = {}
107 self._headers = headers
108 self._finished = False
109 self._retry_strategy = retry
110
111 @property
112 def finished(self):
113 """bool: Flag indicating if the upload has completed."""
114 return self._finished
115
116 def _process_response(self, response):
117 """Process the response from an HTTP request.
118
119 This is everything that must be done after a request that doesn't
120 require network I/O (or other I/O). This is based on the `sans-I/O`_
121 philosophy.
122
123 Args:
124 response (object): The HTTP response object.
125
126 Raises:
127 ~google.cloud.storage.exceptions.InvalidResponse: If the status
128 code is not 200.
129
130 .. _sans-I/O: https://sans-io.readthedocs.io/
131 """
132 # Tombstone the current upload so it cannot be used again (in either
133 # failure or success).
134 self._finished = True
135 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
136
137 @staticmethod
138 def _get_status_code(response):
139 """Access the status code from an HTTP response.
140
141 Args:
142 response (object): The HTTP response object.
143
144 Raises:
145 NotImplementedError: Always, since virtual.
146 """
147 raise NotImplementedError("This implementation is virtual.")
148
149 @staticmethod
150 def _get_headers(response):
151 """Access the headers from an HTTP response.
152
153 Args:
154 response (object): The HTTP response object.
155
156 Raises:
157 NotImplementedError: Always, since virtual.
158 """
159 raise NotImplementedError("This implementation is virtual.")
160
161 @staticmethod
162 def _get_body(response):
163 """Access the response body from an HTTP response.
164
165 Args:
166 response (object): The HTTP response object.
167
168 Raises:
169 NotImplementedError: Always, since virtual.
170 """
171 raise NotImplementedError("This implementation is virtual.")
172
173
174class SimpleUpload(UploadBase):
175 """Upload a resource to a Google API.
176
177 A **simple** media upload sends no metadata and completes the upload
178 in a single request.
179
180 Args:
181 upload_url (str): The URL where the content will be uploaded.
182 headers (Optional[Mapping[str, str]]): Extra headers that should
183 be sent with the request, e.g. headers for encrypted data.
184 retry (Optional[google.api_core.retry.Retry]): How to retry the
185 RPC. A None value will disable retries. A
186 google.api_core.retry.Retry value will enable retries, and the
187 object will configure backoff and timeout options.
188
189 See the retry.py source code and docstrings in this package
190 (google.cloud.storage.retry) for information on retry types and how
191 to configure them.
192
193 Attributes:
194 upload_url (str): The URL where the content will be uploaded.
195 """
196
197 def _prepare_request(self, data, content_type):
198 """Prepare the contents of an HTTP request.
199
200 This is everything that must be done before a request that doesn't
201 require network I/O (or other I/O). This is based on the `sans-I/O`_
202 philosophy.
203
204 .. note:
205
206 This method will be used only once, so ``headers`` will be
207 mutated by having a new key added to it.
208
209 Args:
210 data (bytes): The resource content to be uploaded.
211 content_type (str): The content type for the request.
212
213 Returns:
214 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
215
216 * HTTP verb for the request (always POST)
217 * the URL for the request
218 * the body of the request
219 * headers for the request
220
221 Raises:
222 ValueError: If the current upload has already finished.
223 TypeError: If ``data`` isn't bytes.
224
225 .. _sans-I/O: https://sans-io.readthedocs.io/
226 """
227 if self.finished:
228 raise ValueError("An upload can only be used once.")
229
230 if not isinstance(data, bytes):
231 raise TypeError("`data` must be bytes, received", type(data))
232 self._headers[_CONTENT_TYPE_HEADER] = content_type
233 return _POST, self.upload_url, data, self._headers
234
235 def transmit(self, transport, data, content_type, timeout=None):
236 """Transmit the resource to be uploaded.
237
238 Args:
239 transport (object): An object which can make authenticated
240 requests.
241 data (bytes): The resource content to be uploaded.
242 content_type (str): The content type of the resource, e.g. a JPEG
243 image has content type ``image/jpeg``.
244 timeout (Optional[Union[float, Tuple[float, float]]]):
245 The number of seconds to wait for the server response.
246 Depending on the retry strategy, a request may be repeated
247 several times using the same timeout each time.
248
249 Can also be passed as a tuple (connect_timeout, read_timeout).
250 See :meth:`requests.Session.request` documentation for details.
251
252 Raises:
253 NotImplementedError: Always, since virtual.
254 """
255 raise NotImplementedError("This implementation is virtual.")
256
257
258class MultipartUpload(UploadBase):
259 """Upload a resource with metadata to a Google API.
260
261 A **multipart** upload sends both metadata and the resource in a single
262 (multipart) request.
263
264 Args:
265 upload_url (str): The URL where the content will be uploaded.
266 headers (Optional[Mapping[str, str]]): Extra headers that should
267 be sent with the request, e.g. headers for encrypted data.
268 checksum Optional([str]): The type of checksum to compute to verify
269 the integrity of the object. The request metadata will be amended
270 to include the computed value. Using this option will override a
271 manually-set checksum value. Supported values are "md5",
272 "crc32c", "auto", and None. The default is "auto", which will try
273 to detect if the C extension for crc32c is installed and fall back
274 to md5 otherwise.
275 retry (Optional[google.api_core.retry.Retry]): How to retry the
276 RPC. A None value will disable retries. A
277 google.api_core.retry.Retry value will enable retries, and the
278 object will configure backoff and timeout options.
279
280 See the retry.py source code and docstrings in this package
281 (google.cloud.storage.retry) for information on retry types and how
282 to configure them.
283
284 Attributes:
285 upload_url (str): The URL where the content will be uploaded.
286 """
287
288 def __init__(self, upload_url, headers=None, checksum="auto", retry=DEFAULT_RETRY):
289 super(MultipartUpload, self).__init__(upload_url, headers=headers, retry=retry)
290 self._checksum_type = checksum
291 if self._checksum_type == "auto":
292 self._checksum_type = (
293 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5"
294 )
295
296 def _prepare_request(self, data, metadata, content_type):
297 """Prepare the contents of an HTTP request.
298
299 This is everything that must be done before a request that doesn't
300 require network I/O (or other I/O). This is based on the `sans-I/O`_
301 philosophy.
302
303 .. note:
304
305 This method will be used only once, so ``headers`` will be
306 mutated by having a new key added to it.
307
308 Args:
309 data (bytes): The resource content to be uploaded.
310 metadata (Mapping[str, str]): The resource metadata, such as an
311 ACL list.
312 content_type (str): The content type of the resource, e.g. a JPEG
313 image has content type ``image/jpeg``.
314
315 Returns:
316 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
317
318 * HTTP verb for the request (always POST)
319 * the URL for the request
320 * the body of the request
321 * headers for the request
322
323 Raises:
324 ValueError: If the current upload has already finished.
325 TypeError: If ``data`` isn't bytes.
326
327 .. _sans-I/O: https://sans-io.readthedocs.io/
328 """
329 if self.finished:
330 raise ValueError("An upload can only be used once.")
331
332 if not isinstance(data, bytes):
333 raise TypeError("`data` must be bytes, received", type(data))
334
335 checksum_object = _helpers._get_checksum_object(self._checksum_type)
336 if checksum_object is not None:
337 checksum_object.update(data)
338 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest())
339 metadata_key = _helpers._get_metadata_key(self._checksum_type)
340 metadata[metadata_key] = actual_checksum
341
342 content, multipart_boundary = construct_multipart_request(
343 data, metadata, content_type
344 )
345 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"'
346 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type
347
348 return _POST, self.upload_url, content, self._headers
349
350 def transmit(self, transport, data, metadata, content_type, timeout=None):
351 """Transmit the resource to be uploaded.
352
353 Args:
354 transport (object): An object which can make authenticated
355 requests.
356 data (bytes): The resource content to be uploaded.
357 metadata (Mapping[str, str]): The resource metadata, such as an
358 ACL list.
359 content_type (str): The content type of the resource, e.g. a JPEG
360 image has content type ``image/jpeg``.
361 timeout (Optional[Union[float, Tuple[float, float]]]):
362 The number of seconds to wait for the server response.
363 Depending on the retry strategy, a request may be repeated
364 several times using the same timeout each time.
365
366 Can also be passed as a tuple (connect_timeout, read_timeout).
367 See :meth:`requests.Session.request` documentation for details.
368
369 Raises:
370 NotImplementedError: Always, since virtual.
371 """
372 raise NotImplementedError("This implementation is virtual.")
373
374
375class ResumableUpload(UploadBase):
376 """Initiate and fulfill a resumable upload to a Google API.
377
378 A **resumable** upload sends an initial request with the resource metadata
379 and then gets assigned an upload ID / upload URL to send bytes to.
380 Using the upload URL, the upload is then done in chunks (determined by
381 the user) until all bytes have been uploaded.
382
383 Args:
384 upload_url (str): The URL where the resumable upload will be initiated.
385 chunk_size (int): The size of each chunk used to upload the resource.
386 headers (Optional[Mapping[str, str]]): Extra headers that should
387 be sent with every request.
388 checksum Optional([str]): The type of checksum to compute to verify
389 the integrity of the object. After the upload is complete, the
390 server-computed checksum of the resulting object will be checked
391 and google.cloud.storage.exceptions.DataCorruption will be raised on
392 a mismatch. The corrupted file will not be deleted from the remote
393 host automatically. Supported values are "md5", "crc32c", "auto",
394 and None. The default is "auto", which will try to detect if the C
395 extension for crc32c is installed and fall back to md5 otherwise.
396 retry (Optional[google.api_core.retry.Retry]): How to retry the
397 RPC. A None value will disable retries. A
398 google.api_core.retry.Retry value will enable retries, and the
399 object will configure backoff and timeout options.
400
401 See the retry.py source code and docstrings in this package
402 (google.cloud.storage.retry) for information on retry types and how
403 to configure them.
404
405 Attributes:
406 upload_url (str): The URL where the content will be uploaded.
407
408 Raises:
409 ValueError: If ``chunk_size`` is not a multiple of
410 :data:`.UPLOAD_CHUNK_SIZE`.
411 """
412
413 def __init__(
414 self,
415 upload_url,
416 chunk_size,
417 checksum="auto",
418 headers=None,
419 retry=DEFAULT_RETRY,
420 ):
421 super(ResumableUpload, self).__init__(upload_url, headers=headers, retry=retry)
422 if chunk_size % UPLOAD_CHUNK_SIZE != 0:
423 raise ValueError(
424 "{} KB must divide chunk size".format(UPLOAD_CHUNK_SIZE / 1024)
425 )
426 self._chunk_size = chunk_size
427 self._stream = None
428 self._content_type = None
429 self._bytes_uploaded = 0
430 self._bytes_checksummed = 0
431 self._checksum_type = checksum
432 if self._checksum_type == "auto":
433 self._checksum_type = (
434 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5"
435 )
436 self._checksum_object = None
437 self._total_bytes = None
438 self._resumable_url = None
439 self._invalid = False
440
441 @property
442 def invalid(self):
443 """bool: Indicates if the upload is in an invalid state.
444
445 This will occur if a call to :meth:`transmit_next_chunk` fails.
446 To recover from such a failure, call :meth:`recover`.
447 """
448 return self._invalid
449
450 @property
451 def chunk_size(self):
452 """int: The size of each chunk used to upload the resource."""
453 return self._chunk_size
454
455 @property
456 def resumable_url(self):
457 """Optional[str]: The URL of the in-progress resumable upload."""
458 return self._resumable_url
459
460 @property
461 def bytes_uploaded(self):
462 """int: Number of bytes that have been uploaded."""
463 return self._bytes_uploaded
464
465 @property
466 def total_bytes(self):
467 """Optional[int]: The total number of bytes to be uploaded.
468
469 If this upload is initiated (via :meth:`initiate`) with
470 ``stream_final=True``, this value will be populated based on the size
471 of the ``stream`` being uploaded. (By default ``stream_final=True``.)
472
473 If this upload is initiated with ``stream_final=False``,
474 :attr:`total_bytes` will be :data:`None` since it cannot be
475 determined from the stream.
476 """
477 return self._total_bytes
478
479 def _prepare_initiate_request(
480 self,
481 stream,
482 metadata,
483 content_type,
484 total_bytes=None,
485 stream_final=True,
486 ):
487 """Prepare the contents of HTTP request to initiate upload.
488
489 This is everything that must be done before a request that doesn't
490 require network I/O (or other I/O). This is based on the `sans-I/O`_
491 philosophy.
492
493 Args:
494 stream (IO[bytes]): The stream (i.e. file-like object) that will
495 be uploaded. The stream **must** be at the beginning (i.e.
496 ``stream.tell() == 0``).
497 metadata (Mapping[str, str]): The resource metadata, such as an
498 ACL list.
499 content_type (str): The content type of the resource, e.g. a JPEG
500 image has content type ``image/jpeg``.
501 total_bytes (Optional[int]): The total number of bytes to be
502 uploaded. If specified, the upload size **will not** be
503 determined from the stream (even if ``stream_final=True``).
504 stream_final (Optional[bool]): Indicates if the ``stream`` is
505 "final" (i.e. no more bytes will be added to it). In this case
506 we determine the upload size from the size of the stream. If
507 ``total_bytes`` is passed, this argument will be ignored.
508
509 Returns:
510 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
511
512 * HTTP verb for the request (always POST)
513 * the URL for the request
514 * the body of the request
515 * headers for the request
516
517 Raises:
518 ValueError: If the current upload has already been initiated.
519 ValueError: If ``stream`` is not at the beginning.
520
521 .. _sans-I/O: https://sans-io.readthedocs.io/
522 """
523 if self.resumable_url is not None:
524 raise ValueError("This upload has already been initiated.")
525 if stream.tell() != 0:
526 raise ValueError("Stream must be at beginning.")
527
528 self._stream = stream
529 self._content_type = content_type
530
531 # Signed URL requires content type set directly - not through x-upload-content-type
532 parse_result = urllib.parse.urlparse(self.upload_url)
533 parsed_query = urllib.parse.parse_qs(parse_result.query)
534 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query:
535 # Deconstruct **self._headers first so that content type defined here takes priority
536 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type}
537 else:
538 # Deconstruct **self._headers first so that content type defined here takes priority
539 headers = {
540 **self._headers,
541 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8",
542 "x-upload-content-type": content_type,
543 }
544 # Set the total bytes if possible.
545 if total_bytes is not None:
546 self._total_bytes = total_bytes
547 elif stream_final:
548 self._total_bytes = get_total_bytes(stream)
549 # Add the total bytes to the headers if set.
550 if self._total_bytes is not None:
551 content_length = "{:d}".format(self._total_bytes)
552 headers["x-upload-content-length"] = content_length
553
554 payload = json.dumps(metadata).encode("utf-8")
555 return _POST, self.upload_url, payload, headers
556
557 def _process_initiate_response(self, response):
558 """Process the response from an HTTP request that initiated upload.
559
560 This is everything that must be done after a request that doesn't
561 require network I/O (or other I/O). This is based on the `sans-I/O`_
562 philosophy.
563
564 This method takes the URL from the ``Location`` header and stores it
565 for future use. Within that URL, we assume the ``upload_id`` query
566 parameter has been included, but we do not check.
567
568 Args:
569 response (object): The HTTP response object (need headers).
570
571 .. _sans-I/O: https://sans-io.readthedocs.io/
572 """
573 _helpers.require_status_code(
574 response,
575 (http.client.OK, http.client.CREATED),
576 self._get_status_code,
577 callback=self._make_invalid,
578 )
579 self._resumable_url = _helpers.header_required(
580 response, "location", self._get_headers
581 )
582
583 def initiate(
584 self,
585 transport,
586 stream,
587 metadata,
588 content_type,
589 total_bytes=None,
590 stream_final=True,
591 timeout=None,
592 ):
593 """Initiate a resumable upload.
594
595 By default, this method assumes your ``stream`` is in a "final"
596 state ready to transmit. However, ``stream_final=False`` can be used
597 to indicate that the size of the resource is not known. This can happen
598 if bytes are being dynamically fed into ``stream``, e.g. if the stream
599 is attached to application logs.
600
601 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
602 read from the stream every time :meth:`transmit_next_chunk` is called.
603 If one of those reads produces strictly fewer bites than the chunk
604 size, the upload will be concluded.
605
606 Args:
607 transport (object): An object which can make authenticated
608 requests.
609 stream (IO[bytes]): The stream (i.e. file-like object) that will
610 be uploaded. The stream **must** be at the beginning (i.e.
611 ``stream.tell() == 0``).
612 metadata (Mapping[str, str]): The resource metadata, such as an
613 ACL list.
614 content_type (str): The content type of the resource, e.g. a JPEG
615 image has content type ``image/jpeg``.
616 total_bytes (Optional[int]): The total number of bytes to be
617 uploaded. If specified, the upload size **will not** be
618 determined from the stream (even if ``stream_final=True``).
619 stream_final (Optional[bool]): Indicates if the ``stream`` is
620 "final" (i.e. no more bytes will be added to it). In this case
621 we determine the upload size from the size of the stream. If
622 ``total_bytes`` is passed, this argument will be ignored.
623 timeout (Optional[Union[float, Tuple[float, float]]]):
624 The number of seconds to wait for the server response.
625 Depending on the retry strategy, a request may be repeated
626 several times using the same timeout each time.
627
628 Can also be passed as a tuple (connect_timeout, read_timeout).
629 See :meth:`requests.Session.request` documentation for details.
630
631 Raises:
632 NotImplementedError: Always, since virtual.
633 """
634 raise NotImplementedError("This implementation is virtual.")
635
636 def _prepare_request(self):
637 """Prepare the contents of HTTP request to upload a chunk.
638
639 This is everything that must be done before a request that doesn't
640 require network I/O. This is based on the `sans-I/O`_ philosophy.
641
642 For the time being, this **does require** some form of I/O to read
643 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this
644 will (almost) certainly not be network I/O.
645
646 Returns:
647 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
648
649 * HTTP verb for the request (always PUT)
650 * the URL for the request
651 * the body of the request
652 * headers for the request
653
654 The headers incorporate the ``_headers`` on the current instance.
655
656 Raises:
657 ValueError: If the current upload has finished.
658 ValueError: If the current upload is in an invalid state.
659 ValueError: If the current upload has not been initiated.
660 ValueError: If the location in the stream (i.e. ``stream.tell()``)
661 does not agree with ``bytes_uploaded``.
662
663 .. _sans-I/O: https://sans-io.readthedocs.io/
664 """
665 if self.finished:
666 raise ValueError("Upload has finished.")
667 if self.invalid:
668 raise ValueError(
669 "Upload is in an invalid state. To recover call `recover()`."
670 )
671 if self.resumable_url is None:
672 raise ValueError(
673 "This upload has not been initiated. Please call "
674 "initiate() before beginning to transmit chunks."
675 )
676
677 start_byte, payload, content_range = get_next_chunk(
678 self._stream, self._chunk_size, self._total_bytes
679 )
680 if start_byte != self.bytes_uploaded:
681 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded)
682 raise ValueError(msg)
683
684 self._update_checksum(start_byte, payload)
685
686 headers = {
687 **self._headers,
688 _CONTENT_TYPE_HEADER: self._content_type,
689 _helpers.CONTENT_RANGE_HEADER: content_range,
690 }
691 return _PUT, self.resumable_url, payload, headers
692
693 def _update_checksum(self, start_byte, payload):
694 """Update the checksum with the payload if not already updated.
695
696 Because error recovery can result in bytes being transmitted more than
697 once, the checksum tracks the number of bytes checked in
698 self._bytes_checksummed and skips bytes that have already been summed.
699 """
700 if not self._checksum_type:
701 return
702
703 if not self._checksum_object:
704 self._checksum_object = _helpers._get_checksum_object(self._checksum_type)
705
706 if start_byte < self._bytes_checksummed:
707 offset = self._bytes_checksummed - start_byte
708 data = payload[offset:]
709 else:
710 data = payload
711
712 self._checksum_object.update(data)
713 self._bytes_checksummed += len(data)
714
715 def _make_invalid(self):
716 """Simple setter for ``invalid``.
717
718 This is intended to be passed along as a callback to helpers that
719 raise an exception so they can mark this instance as invalid before
720 raising.
721 """
722 self._invalid = True
723
724 def _process_resumable_response(self, response, bytes_sent):
725 """Process the response from an HTTP request.
726
727 This is everything that must be done after a request that doesn't
728 require network I/O (or other I/O). This is based on the `sans-I/O`_
729 philosophy.
730
731 Args:
732 response (object): The HTTP response object.
733 bytes_sent (int): The number of bytes sent in the request that
734 ``response`` was returned for.
735
736 Raises:
737 ~google.cloud.storage.exceptions.InvalidResponse: If the status
738 code is 308 and the ``range`` header is not of the form
739 ``bytes 0-{end}``.
740 ~google.cloud.storage.exceptions.InvalidResponse: If the status
741 code is not 200 or 308.
742
743 .. _sans-I/O: https://sans-io.readthedocs.io/
744 """
745 status_code = _helpers.require_status_code(
746 response,
747 (http.client.OK, http.client.PERMANENT_REDIRECT),
748 self._get_status_code,
749 callback=self._make_invalid,
750 )
751 if status_code == http.client.OK:
752 # NOTE: We use the "local" information of ``bytes_sent`` to update
753 # ``bytes_uploaded``, but do not verify this against other
754 # state. However, there may be some other information:
755 #
756 # * a ``size`` key in JSON response body
757 # * the ``total_bytes`` attribute (if set)
758 # * ``stream.tell()`` (relying on fact that ``initiate()``
759 # requires stream to be at the beginning)
760 self._bytes_uploaded = self._bytes_uploaded + bytes_sent
761 # Tombstone the current upload so it cannot be used again.
762 self._finished = True
763 # Validate the checksum. This can raise an exception on failure.
764 self._validate_checksum(response)
765 else:
766 bytes_range = _helpers.header_required(
767 response,
768 _helpers.RANGE_HEADER,
769 self._get_headers,
770 callback=self._make_invalid,
771 )
772 match = _BYTES_RANGE_RE.match(bytes_range)
773 if match is None:
774 self._make_invalid()
775 raise InvalidResponse(
776 response,
777 'Unexpected "range" header',
778 bytes_range,
779 'Expected to be of the form "bytes=0-{end}"',
780 )
781 self._bytes_uploaded = int(match.group("end_byte")) + 1
782
783 def _validate_checksum(self, response):
784 """Check the computed checksum, if any, against the recieved metadata.
785
786 Args:
787 response (object): The HTTP response object.
788
789 Raises:
790 ~google.cloud.storage.exceptions.DataCorruption: If the checksum
791 computed locally and the checksum reported by the remote host do
792 not match.
793 """
794 if self._checksum_type is None:
795 return
796 metadata_key = _helpers._get_metadata_key(self._checksum_type)
797 metadata = response.json()
798 remote_checksum = metadata.get(metadata_key)
799 if remote_checksum is None:
800 raise InvalidResponse(
801 response,
802 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
803 self._get_headers(response),
804 )
805 local_checksum = _helpers.prepare_checksum_digest(
806 self._checksum_object.digest()
807 )
808 if local_checksum != remote_checksum:
809 raise DataCorruption(
810 response,
811 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
812 self._checksum_type.upper(), local_checksum, remote_checksum
813 ),
814 )
815
816 def transmit_next_chunk(self, transport, timeout=None):
817 """Transmit the next chunk of the resource to be uploaded.
818
819 If the current upload was initiated with ``stream_final=False``,
820 this method will dynamically determine if the upload has completed.
821 The upload will be considered complete if the stream produces
822 fewer than :attr:`chunk_size` bytes when a chunk is read from it.
823
824 Args:
825 transport (object): An object which can make authenticated
826 requests.
827 timeout (Optional[Union[float, Tuple[float, float]]]):
828 The number of seconds to wait for the server response.
829 Depending on the retry strategy, a request may be repeated
830 several times using the same timeout each time.
831
832 Can also be passed as a tuple (connect_timeout, read_timeout).
833 See :meth:`requests.Session.request` documentation for details.
834
835 Raises:
836 NotImplementedError: Always, since virtual.
837 """
838 raise NotImplementedError("This implementation is virtual.")
839
840 def _prepare_recover_request(self):
841 """Prepare the contents of HTTP request to recover from failure.
842
843 This is everything that must be done before a request that doesn't
844 require network I/O. This is based on the `sans-I/O`_ philosophy.
845
846 We assume that the :attr:`resumable_url` is set (i.e. the only way
847 the upload can end up :attr:`invalid` is if it has been initiated.
848
849 Returns:
850 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple
851
852 * HTTP verb for the request (always PUT)
853 * the URL for the request
854 * the body of the request (always :data:`None`)
855 * headers for the request
856
857 The headers **do not** incorporate the ``_headers`` on the
858 current instance.
859
860 .. _sans-I/O: https://sans-io.readthedocs.io/
861 """
862 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"}
863 return _PUT, self.resumable_url, None, headers
864
865 def _process_recover_response(self, response):
866 """Process the response from an HTTP request to recover from failure.
867
868 This is everything that must be done after a request that doesn't
869 require network I/O (or other I/O). This is based on the `sans-I/O`_
870 philosophy.
871
872 Args:
873 response (object): The HTTP response object.
874
875 Raises:
876 ~google.cloud.storage.exceptions.InvalidResponse: If the status
877 code is not 308.
878 ~google.cloud.storage.exceptions.InvalidResponse: If the status
879 code is 308 and the ``range`` header is not of the form
880 ``bytes 0-{end}``.
881
882 .. _sans-I/O: https://sans-io.readthedocs.io/
883 """
884 _helpers.require_status_code(
885 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code
886 )
887 headers = self._get_headers(response)
888 if _helpers.RANGE_HEADER in headers:
889 bytes_range = headers[_helpers.RANGE_HEADER]
890 match = _BYTES_RANGE_RE.match(bytes_range)
891 if match is None:
892 raise InvalidResponse(
893 response,
894 'Unexpected "range" header',
895 bytes_range,
896 'Expected to be of the form "bytes=0-{end}"',
897 )
898 self._bytes_uploaded = int(match.group("end_byte")) + 1
899 else:
900 # In this case, the upload has not "begun".
901 self._bytes_uploaded = 0
902
903 self._stream.seek(self._bytes_uploaded)
904 self._invalid = False
905
906 def recover(self, transport):
907 """Recover from a failure.
908
909 This method should be used when a :class:`ResumableUpload` is in an
910 :attr:`~ResumableUpload.invalid` state due to a request failure.
911
912 This will verify the progress with the server and make sure the
913 current upload is in a valid state before :meth:`transmit_next_chunk`
914 can be used again.
915
916 Args:
917 transport (object): An object which can make authenticated
918 requests.
919
920 Raises:
921 NotImplementedError: Always, since virtual.
922 """
923 raise NotImplementedError("This implementation is virtual.")
924
925
926class XMLMPUContainer(UploadBase):
927 """Initiate and close an upload using the XML MPU API.
928
929 An XML MPU sends an initial request and then receives an upload ID.
930 Using the upload ID, the upload is then done in numbered parts and the
931 parts can be uploaded concurrently.
932
933 In order to avoid concurrency issues with this container object, the
934 uploading of individual parts is handled separately, by XMLMPUPart objects
935 spawned from this container class. The XMLMPUPart objects are not
936 necessarily in the same process as the container, so they do not update the
937 container automatically.
938
939 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous
940 given the JSON multipart upload, so the abbreviation "MPU" will be used
941 throughout.
942
943 See: https://cloud.google.com/storage/docs/multipart-uploads
944
945 Args:
946 upload_url (str): The URL of the object (without query parameters). The
947 initiate, PUT, and finalization requests will all use this URL, with
948 varying query parameters.
949 filename (str): The name (path) of the file to upload.
950 headers (Optional[Mapping[str, str]]): Extra headers that should
951 be sent with every request.
952 retry (Optional[google.api_core.retry.Retry]): How to retry the
953 RPC. A None value will disable retries. A
954 google.api_core.retry.Retry value will enable retries, and the
955 object will configure backoff and timeout options.
956
957 See the retry.py source code and docstrings in this package
958 (google.cloud.storage.retry) for information on retry types and how
959 to configure them.
960
961 Attributes:
962 upload_url (str): The URL where the content will be uploaded.
963 upload_id (Optional(str)): The ID of the upload from the initialization
964 response.
965 """
966
967 def __init__(
968 self,
969 upload_url,
970 filename,
971 headers=None,
972 upload_id=None,
973 retry=DEFAULT_RETRY,
974 ):
975 super().__init__(upload_url, headers=headers, retry=retry)
976 self._filename = filename
977 self._upload_id = upload_id
978 self._parts = {}
979
980 @property
981 def upload_id(self):
982 return self._upload_id
983
984 def register_part(self, part_number, etag):
985 """Register an uploaded part by part number and corresponding etag.
986
987 XMLMPUPart objects represent individual parts, and their part number
988 and etag can be registered to the container object with this method
989 and therefore incorporated in the finalize() call to finish the upload.
990
991 This method accepts part_number and etag, but not XMLMPUPart objects
992 themselves, to reduce the complexity involved in running XMLMPUPart
993 uploads in separate processes.
994
995 Args:
996 part_number (int): The part number. Parts are assembled into the
997 final uploaded object with finalize() in order of their part
998 numbers.
999 etag (str): The etag included in the server response after upload.
1000 """
1001 self._parts[part_number] = etag
1002
1003 def _prepare_initiate_request(self, content_type):
1004 """Prepare the contents of HTTP request to initiate upload.
1005
1006 This is everything that must be done before a request that doesn't
1007 require network I/O (or other I/O). This is based on the `sans-I/O`_
1008 philosophy.
1009
1010 Args:
1011 content_type (str): The content type of the resource, e.g. a JPEG
1012 image has content type ``image/jpeg``.
1013
1014 Returns:
1015 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1016
1017 * HTTP verb for the request (always POST)
1018 * the URL for the request
1019 * the body of the request
1020 * headers for the request
1021
1022 Raises:
1023 ValueError: If the current upload has already been initiated.
1024
1025 .. _sans-I/O: https://sans-io.readthedocs.io/
1026 """
1027 if self.upload_id is not None:
1028 raise ValueError("This upload has already been initiated.")
1029
1030 initiate_url = self.upload_url + _MPU_INITIATE_QUERY
1031
1032 headers = {
1033 **self._headers,
1034 _CONTENT_TYPE_HEADER: content_type,
1035 }
1036 return _POST, initiate_url, None, headers
1037
1038 def _process_initiate_response(self, response):
1039 """Process the response from an HTTP request that initiated the upload.
1040
1041 This is everything that must be done after a request that doesn't
1042 require network I/O (or other I/O). This is based on the `sans-I/O`_
1043 philosophy.
1044
1045 This method takes the URL from the ``Location`` header and stores it
1046 for future use. Within that URL, we assume the ``upload_id`` query
1047 parameter has been included, but we do not check.
1048
1049 Args:
1050 response (object): The HTTP response object.
1051
1052 Raises:
1053 ~google.cloud.storage.exceptions.InvalidResponse: If the status
1054 code is not 200.
1055
1056 .. _sans-I/O: https://sans-io.readthedocs.io/
1057 """
1058 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
1059 root = ElementTree.fromstring(response.text)
1060 self._upload_id = root.find(_S3_COMPAT_XML_NAMESPACE + _UPLOAD_ID_NODE).text
1061
1062 def initiate(
1063 self,
1064 transport,
1065 content_type,
1066 timeout=None,
1067 ):
1068 """Initiate an MPU and record the upload ID.
1069
1070 Args:
1071 transport (object): An object which can make authenticated
1072 requests.
1073 content_type (str): The content type of the resource, e.g. a JPEG
1074 image has content type ``image/jpeg``.
1075 timeout (Optional[Union[float, Tuple[float, float]]]):
1076 The number of seconds to wait for the server response.
1077 Depending on the retry strategy, a request may be repeated
1078 several times using the same timeout each time.
1079
1080 Can also be passed as a tuple (connect_timeout, read_timeout).
1081 See :meth:`requests.Session.request` documentation for details.
1082
1083 Raises:
1084 NotImplementedError: Always, since virtual.
1085 """
1086 raise NotImplementedError("This implementation is virtual.")
1087
1088 def _prepare_finalize_request(self):
1089 """Prepare the contents of an HTTP request to finalize the upload.
1090
1091 All of the parts must be registered before calling this method.
1092
1093 Returns:
1094 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1095
1096 * HTTP verb for the request (always POST)
1097 * the URL for the request
1098 * the body of the request
1099 * headers for the request
1100
1101 Raises:
1102 ValueError: If the upload has not been initiated.
1103 """
1104 if self.upload_id is None:
1105 raise ValueError("This upload has not yet been initiated.")
1106
1107 final_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id)
1108 finalize_url = self.upload_url + final_query
1109 final_xml_root = ElementTree.Element("CompleteMultipartUpload")
1110 for part_number, etag in self._parts.items():
1111 part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop
1112 ElementTree.SubElement(part, "PartNumber").text = str(part_number)
1113 ElementTree.SubElement(part, "ETag").text = etag
1114 payload = ElementTree.tostring(final_xml_root)
1115 return _POST, finalize_url, payload, self._headers
1116
1117 def _process_finalize_response(self, response):
1118 """Process the response from an HTTP request that finalized the upload.
1119
1120 This is everything that must be done after a request that doesn't
1121 require network I/O (or other I/O). This is based on the `sans-I/O`_
1122 philosophy.
1123
1124 Args:
1125 response (object): The HTTP response object.
1126
1127 Raises:
1128 ~google.cloud.storage.exceptions.InvalidResponse: If the status
1129 code is not 200.
1130
1131 .. _sans-I/O: https://sans-io.readthedocs.io/
1132 """
1133
1134 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
1135 self._finished = True
1136
1137 def finalize(
1138 self,
1139 transport,
1140 timeout=None,
1141 ):
1142 """Finalize an MPU request with all the parts.
1143
1144 Args:
1145 transport (object): An object which can make authenticated
1146 requests.
1147 timeout (Optional[Union[float, Tuple[float, float]]]):
1148 The number of seconds to wait for the server response.
1149 Depending on the retry strategy, a request may be repeated
1150 several times using the same timeout each time.
1151
1152 Can also be passed as a tuple (connect_timeout, read_timeout).
1153 See :meth:`requests.Session.request` documentation for details.
1154
1155 Raises:
1156 NotImplementedError: Always, since virtual.
1157 """
1158 raise NotImplementedError("This implementation is virtual.")
1159
1160 def _prepare_cancel_request(self):
1161 """Prepare the contents of an HTTP request to cancel the upload.
1162
1163 Returns:
1164 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1165
1166 * HTTP verb for the request (always DELETE)
1167 * the URL for the request
1168 * the body of the request
1169 * headers for the request
1170
1171 Raises:
1172 ValueError: If the upload has not been initiated.
1173 """
1174 if self.upload_id is None:
1175 raise ValueError("This upload has not yet been initiated.")
1176
1177 cancel_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id)
1178 cancel_url = self.upload_url + cancel_query
1179 return _DELETE, cancel_url, None, self._headers
1180
1181 def _process_cancel_response(self, response):
1182 """Process the response from an HTTP request that canceled the upload.
1183
1184 This is everything that must be done after a request that doesn't
1185 require network I/O (or other I/O). This is based on the `sans-I/O`_
1186 philosophy.
1187
1188 Args:
1189 response (object): The HTTP response object.
1190
1191 Raises:
1192 ~google.cloud.storage.exceptions.InvalidResponse: If the status
1193 code is not 204.
1194
1195 .. _sans-I/O: https://sans-io.readthedocs.io/
1196 """
1197
1198 _helpers.require_status_code(
1199 response, (http.client.NO_CONTENT,), self._get_status_code
1200 )
1201
1202 def cancel(
1203 self,
1204 transport,
1205 timeout=None,
1206 ):
1207 """Cancel an MPU request and permanently delete any uploaded parts.
1208
1209 This cannot be undone.
1210
1211 Args:
1212 transport (object): An object which can make authenticated
1213 requests.
1214 timeout (Optional[Union[float, Tuple[float, float]]]):
1215 The number of seconds to wait for the server response.
1216 Depending on the retry strategy, a request may be repeated
1217 several times using the same timeout each time.
1218
1219 Can also be passed as a tuple (connect_timeout, read_timeout).
1220 See :meth:`requests.Session.request` documentation for details.
1221
1222 Raises:
1223 NotImplementedError: Always, since virtual.
1224 """
1225 raise NotImplementedError("This implementation is virtual.")
1226
1227
1228class XMLMPUPart(UploadBase):
1229 """Upload a single part of an existing XML MPU container.
1230
1231 An XML MPU sends an initial request and then receives an upload ID.
1232 Using the upload ID, the upload is then done in numbered parts and the
1233 parts can be uploaded concurrently.
1234
1235 In order to avoid concurrency issues with the container object, the
1236 uploading of individual parts is handled separately by multiple objects
1237 of this class. Once a part is uploaded, it can be registered with the
1238 container with `container.register_part(part.part_number, part.etag)`.
1239
1240 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous
1241 given the JSON multipart upload, so the abbreviation "MPU" will be used
1242 throughout.
1243
1244 See: https://cloud.google.com/storage/docs/multipart-uploads
1245
1246 Args:
1247 upload_url (str): The URL of the object (without query parameters).
1248 upload_id (str): The ID of the upload from the initialization response.
1249 filename (str): The name (path) of the file to upload.
1250 start (int): The byte index of the beginning of the part.
1251 end (int): The byte index of the end of the part.
1252 part_number (int): The part number. Part numbers will be assembled in
1253 sequential order when the container is finalized.
1254 headers (Optional[Mapping[str, str]]): Extra headers that should
1255 be sent with every request.
1256 checksum (Optional([str])): The type of checksum to compute to verify
1257 the integrity of the object. The request headers will be amended
1258 to include the computed value. Supported values are "md5", "crc32c",
1259 "auto" and None. The default is "auto", which will try to detect if
1260 the C extension for crc32c is installed and fall back to md5
1261 otherwise.
1262 retry (Optional[google.api_core.retry.Retry]): How to retry the
1263 RPC. A None value will disable retries. A
1264 google.api_core.retry.Retry value will enable retries, and the
1265 object will configure backoff and timeout options.
1266
1267 See the retry.py source code and docstrings in this package
1268 (google.cloud.storage.retry) for information on retry types and how
1269 to configure them.
1270
1271 Attributes:
1272 upload_url (str): The URL of the object (without query parameters).
1273 upload_id (str): The ID of the upload from the initialization response.
1274 filename (str): The name (path) of the file to upload.
1275 start (int): The byte index of the beginning of the part.
1276 end (int): The byte index of the end of the part.
1277 part_number (int): The part number. Part numbers will be assembled in
1278 sequential order when the container is finalized.
1279 etag (Optional(str)): The etag returned by the service after upload.
1280 """
1281
1282 def __init__(
1283 self,
1284 upload_url,
1285 upload_id,
1286 filename,
1287 start,
1288 end,
1289 part_number,
1290 headers=None,
1291 checksum="auto",
1292 retry=DEFAULT_RETRY,
1293 ):
1294 super().__init__(upload_url, headers=headers, retry=retry)
1295 self._filename = filename
1296 self._start = start
1297 self._end = end
1298 self._upload_id = upload_id
1299 self._part_number = part_number
1300 self._etag = None
1301 self._checksum_type = checksum
1302 if self._checksum_type == "auto":
1303 self._checksum_type = (
1304 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5"
1305 )
1306 self._checksum_object = None
1307
1308 @property
1309 def part_number(self):
1310 return self._part_number
1311
1312 @property
1313 def upload_id(self):
1314 return self._upload_id
1315
1316 @property
1317 def filename(self):
1318 return self._filename
1319
1320 @property
1321 def etag(self):
1322 return self._etag
1323
1324 @property
1325 def start(self):
1326 return self._start
1327
1328 @property
1329 def end(self):
1330 return self._end
1331
1332 def _prepare_upload_request(self):
1333 """Prepare the contents of HTTP request to upload a part.
1334
1335 This is everything that must be done before a request that doesn't
1336 require network I/O. This is based on the `sans-I/O`_ philosophy.
1337
1338 For the time being, this **does require** some form of I/O to read
1339 a part from ``stream`` (via :func:`get_part_payload`). However, this
1340 will (almost) certainly not be network I/O.
1341
1342 Returns:
1343 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple
1344
1345 * HTTP verb for the request (always PUT)
1346 * the URL for the request
1347 * the body of the request
1348 * headers for the request
1349
1350 The headers incorporate the ``_headers`` on the current instance.
1351
1352 Raises:
1353 ValueError: If the current upload has finished.
1354
1355 .. _sans-I/O: https://sans-io.readthedocs.io/
1356 """
1357 if self.finished:
1358 raise ValueError("This part has already been uploaded.")
1359
1360 with open(self._filename, "br") as f:
1361 f.seek(self._start)
1362 payload = f.read(self._end - self._start)
1363
1364 self._checksum_object = _helpers._get_checksum_object(self._checksum_type)
1365 if self._checksum_object is not None:
1366 self._checksum_object.update(payload)
1367
1368 part_query = _MPU_PART_QUERY_TEMPLATE.format(
1369 part=self._part_number, upload_id=self._upload_id
1370 )
1371 upload_url = self.upload_url + part_query
1372 return _PUT, upload_url, payload, self._headers
1373
1374 def _process_upload_response(self, response):
1375 """Process the response from an HTTP request.
1376
1377 This is everything that must be done after a request that doesn't
1378 require network I/O (or other I/O). This is based on the `sans-I/O`_
1379 philosophy.
1380
1381 Args:
1382 response (object): The HTTP response object.
1383
1384 Raises:
1385 ~google.cloud.storage.exceptions.InvalidResponse: If the status
1386 code is not 200 or the response is missing data.
1387
1388 .. _sans-I/O: https://sans-io.readthedocs.io/
1389 """
1390 # Data corruption errors shouldn't be considered as invalid responses,
1391 # So we handle them earlier than call to `_helpers.require_status_code`.
1392 # If the response is 400, we check for data corruption errors.
1393 if response.status_code == 400:
1394 root = ElementTree.fromstring(response.text)
1395 error_code = root.find("Code").text
1396 error_message = root.find("Message").text
1397 error_details = root.find("Details").text
1398 if error_code in ["InvalidDigest", "BadDigest", "CrcMismatch"]:
1399 raise DataCorruption(
1400 response,
1401 (
1402 "Checksum mismatch: checksum calculated by client and"
1403 " server did not match. Error code: {error_code},"
1404 " Error message: {error_message},"
1405 " Error details: {error_details}"
1406 ).format(
1407 error_code=error_code,
1408 error_message=error_message,
1409 error_details=error_details,
1410 ),
1411 )
1412
1413 _helpers.require_status_code(
1414 response,
1415 (http.client.OK,),
1416 self._get_status_code,
1417 )
1418
1419 self._validate_checksum(response)
1420
1421 etag = _helpers.header_required(response, "etag", self._get_headers)
1422 self._etag = etag
1423 self._finished = True
1424
1425 def upload(
1426 self,
1427 transport,
1428 timeout=None,
1429 ):
1430 """Upload the part.
1431
1432 Args:
1433 transport (object): An object which can make authenticated
1434 requests.
1435 timeout (Optional[Union[float, Tuple[float, float]]]):
1436 The number of seconds to wait for the server response.
1437 Depending on the retry strategy, a request may be repeated
1438 several times using the same timeout each time.
1439
1440 Can also be passed as a tuple (connect_timeout, read_timeout).
1441 See :meth:`requests.Session.request` documentation for details.
1442
1443 Raises:
1444 NotImplementedError: Always, since virtual.
1445 """
1446 raise NotImplementedError("This implementation is virtual.")
1447
1448 def _validate_checksum(self, response):
1449 """Check the computed checksum, if any, against the response headers.
1450
1451 Args:
1452 response (object): The HTTP response object.
1453
1454 Raises:
1455 ~google.cloud.storage.exceptions.DataCorruption: If the checksum
1456 computed locally and the checksum reported by the remote host do
1457 not match.
1458 """
1459 if self._checksum_type is None:
1460 return
1461
1462 remote_checksum = _helpers._get_uploaded_checksum_from_headers(
1463 response, self._get_headers, self._checksum_type
1464 )
1465
1466 if remote_checksum is None:
1467 metadata_key = _helpers._get_metadata_key(self._checksum_type)
1468 raise InvalidResponse(
1469 response,
1470 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
1471 self._get_headers(response),
1472 )
1473 local_checksum = _helpers.prepare_checksum_digest(
1474 self._checksum_object.digest()
1475 )
1476 if local_checksum != remote_checksum:
1477 raise DataCorruption(
1478 response,
1479 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
1480 self._checksum_type.upper(), local_checksum, remote_checksum
1481 ),
1482 )
1483
1484
1485def get_boundary():
1486 """Get a random boundary for a multipart request.
1487
1488 Returns:
1489 bytes: The boundary used to separate parts of a multipart request.
1490 """
1491 random_int = random.randrange(sys.maxsize)
1492 boundary = _BOUNDARY_FORMAT.format(random_int)
1493 # NOTE: Neither % formatting nor .format() are available for byte strings
1494 # in Python 3.4, so we must use unicode strings as templates.
1495 return boundary.encode("utf-8")
1496
1497
1498def construct_multipart_request(data, metadata, content_type):
1499 """Construct a multipart request body.
1500
1501 Args:
1502 data (bytes): The resource content (UTF-8 encoded as bytes)
1503 to be uploaded.
1504 metadata (Mapping[str, str]): The resource metadata, such as an
1505 ACL list.
1506 content_type (str): The content type of the resource, e.g. a JPEG
1507 image has content type ``image/jpeg``.
1508
1509 Returns:
1510 Tuple[bytes, bytes]: The multipart request body and the boundary used
1511 between each part.
1512 """
1513 multipart_boundary = get_boundary()
1514 json_bytes = json.dumps(metadata).encode("utf-8")
1515 content_type = content_type.encode("utf-8")
1516 # Combine the two parts into a multipart payload.
1517 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4.
1518 boundary_sep = _MULTIPART_SEP + multipart_boundary
1519 content = (
1520 boundary_sep
1521 + _MULTIPART_BEGIN
1522 + json_bytes
1523 + _CRLF
1524 + boundary_sep
1525 + _CRLF
1526 + b"content-type: "
1527 + content_type
1528 + _CRLF
1529 + _CRLF
1530 + data # Empty line between headers and body.
1531 + _CRLF
1532 + boundary_sep
1533 + _MULTIPART_SEP
1534 )
1535
1536 return content, multipart_boundary
1537
1538
1539def get_total_bytes(stream):
1540 """Determine the total number of bytes in a stream.
1541
1542 Args:
1543 stream (IO[bytes]): The stream (i.e. file-like object).
1544
1545 Returns:
1546 int: The number of bytes.
1547 """
1548 current_position = stream.tell()
1549 # NOTE: ``.seek()`` **should** return the same value that ``.tell()``
1550 # returns, but in Python 2, ``file`` objects do not.
1551 stream.seek(0, os.SEEK_END)
1552 end_position = stream.tell()
1553 # Go back to the initial position.
1554 stream.seek(current_position)
1555
1556 return end_position
1557
1558
1559def get_next_chunk(stream, chunk_size, total_bytes):
1560 """Get a chunk from an I/O stream.
1561
1562 The ``stream`` may have fewer bytes remaining than ``chunk_size``
1563 so it may not always be the case that
1564 ``end_byte == start_byte + chunk_size - 1``.
1565
1566 Args:
1567 stream (IO[bytes]): The stream (i.e. file-like object).
1568 chunk_size (int): The size of the chunk to be read from the ``stream``.
1569 total_bytes (Optional[int]): The (expected) total number of bytes
1570 in the ``stream``.
1571
1572 Returns:
1573 Tuple[int, bytes, str]: Triple of:
1574
1575 * the start byte index
1576 * the content in between the start and end bytes (inclusive)
1577 * content range header for the chunk (slice) that has been read
1578
1579 Raises:
1580 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields
1581 non-empty content.
1582 ValueError: If there is no data left to consume. This corresponds
1583 exactly to the case ``end_byte < start_byte``, which can only
1584 occur if ``end_byte == start_byte - 1``.
1585 """
1586 start_byte = stream.tell()
1587 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0:
1588 payload = stream.read(total_bytes - start_byte)
1589 else:
1590 payload = stream.read(chunk_size)
1591 end_byte = stream.tell() - 1
1592
1593 num_bytes_read = len(payload)
1594 if total_bytes is None:
1595 if num_bytes_read < chunk_size:
1596 # We now **KNOW** the total number of bytes.
1597 total_bytes = end_byte + 1
1598 elif total_bytes == 0:
1599 # NOTE: We also expect ``start_byte == 0`` here but don't check
1600 # because ``_prepare_initiate_request()`` requires the
1601 # stream to be at the beginning.
1602 if num_bytes_read != 0:
1603 raise ValueError(
1604 "Stream specified as empty, but produced non-empty content."
1605 )
1606 else:
1607 if num_bytes_read == 0:
1608 raise ValueError(
1609 "Stream is already exhausted. There is no content remaining."
1610 )
1611
1612 content_range = get_content_range(start_byte, end_byte, total_bytes)
1613 return start_byte, payload, content_range
1614
1615
1616def get_content_range(start_byte, end_byte, total_bytes):
1617 """Convert start, end and total into content range header.
1618
1619 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*".
1620 If we are dealing with an empty range (i.e. ``end_byte < start_byte``)
1621 then "bytes */{total}" is used.
1622
1623 This function **ASSUMES** that if the size is not known, the caller will
1624 not also pass an empty range.
1625
1626 Args:
1627 start_byte (int): The start (inclusive) of the byte range.
1628 end_byte (int): The end (inclusive) of the byte range.
1629 total_bytes (Optional[int]): The number of bytes in the byte
1630 range (if known).
1631
1632 Returns:
1633 str: The content range header.
1634 """
1635 if total_bytes is None:
1636 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte)
1637 elif end_byte < start_byte:
1638 return _EMPTY_RANGE_TEMPLATE.format(total_bytes)
1639 else:
1640 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)