1# Copyright 2017 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Support for resumable uploads.
16
17Also supported here are simple (media) uploads and multipart
18uploads that contain both metadata and a small file as payload.
19"""
20
21
22from google.cloud.storage._media import _upload
23from google.cloud.storage._media.requests import _request_helpers
24from google.cloud.storage._media import _helpers
25
26
27class SimpleUpload(_request_helpers.RequestsMixin, _upload.SimpleUpload):
28 """Upload a resource to a Google API.
29
30 A **simple** media upload sends no metadata and completes the upload
31 in a single request.
32
33 Args:
34 upload_url (str): The URL where the content will be uploaded.
35 headers (Optional[Mapping[str, str]]): Extra headers that should
36 be sent with the request, e.g. headers for encrypted data.
37
38 Attributes:
39 upload_url (str): The URL where the content will be uploaded.
40 """
41
42 def transmit(
43 self,
44 transport,
45 data,
46 content_type,
47 timeout=(
48 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
49 _request_helpers._DEFAULT_READ_TIMEOUT,
50 ),
51 ):
52 """Transmit the resource to be uploaded.
53
54 Args:
55 transport (~requests.Session): A ``requests`` object which can
56 make authenticated requests.
57 data (bytes): The resource content to be uploaded.
58 content_type (str): The content type of the resource, e.g. a JPEG
59 image has content type ``image/jpeg``.
60 timeout (Optional[Union[float, Tuple[float, float]]]):
61 The number of seconds to wait for the server response.
62 Depending on the retry strategy, a request may be repeated
63 several times using the same timeout each time.
64
65 Can also be passed as a tuple (connect_timeout, read_timeout).
66 See :meth:`requests.Session.request` documentation for details.
67
68 Returns:
69 ~requests.Response: The HTTP response returned by ``transport``.
70 """
71 method, url, payload, headers = self._prepare_request(data, content_type)
72
73 # Wrap the request business logic in a function to be retried.
74 def retriable_request():
75 result = transport.request(
76 method, url, data=payload, headers=headers, timeout=timeout
77 )
78
79 self._process_response(result)
80
81 return result
82
83 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)
84
85
86class MultipartUpload(_request_helpers.RequestsMixin, _upload.MultipartUpload):
87 """Upload a resource with metadata to a Google API.
88
89 A **multipart** upload sends both metadata and the resource in a single
90 (multipart) request.
91
92 Args:
93 upload_url (str): The URL where the content will be uploaded.
94 headers (Optional[Mapping[str, str]]): Extra headers that should
95 be sent with the request, e.g. headers for encrypted data.
96 checksum Optional([str]): The type of checksum to compute to verify
97 the integrity of the object. The request metadata will be amended
98 to include the computed value. Using this option will override a
99 manually-set checksum value. Supported values are "md5",
100 "crc32c", "auto", and None. The default is "auto", which will try
101 to detect if the C extension for crc32c is installed and fall back
102 to md5 otherwise.
103 retry (Optional[google.api_core.retry.Retry]): How to retry the
104 RPC. A None value will disable retries. A
105 google.api_core.retry.Retry value will enable retries, and the
106 object will configure backoff and timeout options.
107
108 See the retry.py source code and docstrings in this package
109 (google.cloud.storage.retry) for information on retry types and how
110 to configure them.
111
112 Attributes:
113 upload_url (str): The URL where the content will be uploaded.
114 """
115
116 def transmit(
117 self,
118 transport,
119 data,
120 metadata,
121 content_type,
122 timeout=(
123 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
124 _request_helpers._DEFAULT_READ_TIMEOUT,
125 ),
126 ):
127 """Transmit the resource to be uploaded.
128
129 Args:
130 transport (~requests.Session): A ``requests`` object which can
131 make authenticated requests.
132 data (bytes): The resource content to be uploaded.
133 metadata (Mapping[str, str]): The resource metadata, such as an
134 ACL list.
135 content_type (str): The content type of the resource, e.g. a JPEG
136 image has content type ``image/jpeg``.
137 timeout (Optional[Union[float, Tuple[float, float]]]):
138 The number of seconds to wait for the server response.
139 Depending on the retry strategy, a request may be repeated
140 several times using the same timeout each time.
141
142 Can also be passed as a tuple (connect_timeout, read_timeout).
143 See :meth:`requests.Session.request` documentation for details.
144
145 Returns:
146 ~requests.Response: The HTTP response returned by ``transport``.
147 """
148 method, url, payload, headers = self._prepare_request(
149 data, metadata, content_type
150 )
151
152 # Wrap the request business logic in a function to be retried.
153 def retriable_request():
154 result = transport.request(
155 method, url, data=payload, headers=headers, timeout=timeout
156 )
157
158 self._process_response(result)
159
160 return result
161
162 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)
163
164
165class ResumableUpload(_request_helpers.RequestsMixin, _upload.ResumableUpload):
166 """Initiate and fulfill a resumable upload to a Google API.
167
168 A **resumable** upload sends an initial request with the resource metadata
169 and then gets assigned an upload ID / upload URL to send bytes to.
170 Using the upload URL, the upload is then done in chunks (determined by
171 the user) until all bytes have been uploaded.
172
173 When constructing a resumable upload, only the resumable upload URL and
174 the chunk size are required:
175
176 .. testsetup:: resumable-constructor
177
178 bucket = 'bucket-foo'
179
180 .. doctest:: resumable-constructor
181
182 >>> from google.cloud.storage._media.requests import ResumableUpload
183 >>>
184 >>> url_template = (
185 ... 'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?'
186 ... 'uploadType=resumable')
187 >>> upload_url = url_template.format(bucket=bucket)
188 >>>
189 >>> chunk_size = 3 * 1024 * 1024 # 3MB
190 >>> upload = ResumableUpload(upload_url, chunk_size)
191
192 When initiating an upload (via :meth:`initiate`), the caller is expected
193 to pass the resource being uploaded as a file-like ``stream``. If the size
194 of the resource is explicitly known, it can be passed in directly:
195
196 .. testsetup:: resumable-explicit-size
197
198 import os
199 import tempfile
200
201 import mock
202 import requests
203 import http.client
204
205 from google.cloud.storage._media.requests import ResumableUpload
206
207 upload_url = 'http://test.invalid'
208 chunk_size = 3 * 1024 * 1024 # 3MB
209 upload = ResumableUpload(upload_url, chunk_size)
210
211 file_desc, filename = tempfile.mkstemp()
212 os.close(file_desc)
213
214 data = b'some bytes!'
215 with open(filename, 'wb') as file_obj:
216 file_obj.write(data)
217
218 fake_response = requests.Response()
219 fake_response.status_code = int(http.client.OK)
220 fake_response._content = b''
221 resumable_url = 'http://test.invalid?upload_id=7up'
222 fake_response.headers['location'] = resumable_url
223
224 post_method = mock.Mock(return_value=fake_response, spec=[])
225 transport = mock.Mock(request=post_method, spec=['request'])
226
227 .. doctest:: resumable-explicit-size
228
229 >>> import os
230 >>>
231 >>> upload.total_bytes is None
232 True
233 >>>
234 >>> stream = open(filename, 'rb')
235 >>> total_bytes = os.path.getsize(filename)
236 >>> metadata = {'name': filename}
237 >>> response = upload.initiate(
238 ... transport, stream, metadata, 'text/plain',
239 ... total_bytes=total_bytes)
240 >>> response
241 <Response [200]>
242 >>>
243 >>> upload.total_bytes == total_bytes
244 True
245
246 .. testcleanup:: resumable-explicit-size
247
248 os.remove(filename)
249
250 If the stream is in a "final" state (i.e. it won't have any more bytes
251 written to it), the total number of bytes can be determined implicitly
252 from the ``stream`` itself:
253
254 .. testsetup:: resumable-implicit-size
255
256 import io
257
258 import mock
259 import requests
260 import http.client
261
262 from google.cloud.storage._media.requests import ResumableUpload
263
264 upload_url = 'http://test.invalid'
265 chunk_size = 3 * 1024 * 1024 # 3MB
266 upload = ResumableUpload(upload_url, chunk_size)
267
268 fake_response = requests.Response()
269 fake_response.status_code = int(http.client.OK)
270 fake_response._content = b''
271 resumable_url = 'http://test.invalid?upload_id=7up'
272 fake_response.headers['location'] = resumable_url
273
274 post_method = mock.Mock(return_value=fake_response, spec=[])
275 transport = mock.Mock(request=post_method, spec=['request'])
276
277 data = b'some MOAR bytes!'
278 metadata = {'name': 'some-file.jpg'}
279 content_type = 'image/jpeg'
280
281 .. doctest:: resumable-implicit-size
282
283 >>> stream = io.BytesIO(data)
284 >>> response = upload.initiate(
285 ... transport, stream, metadata, content_type)
286 >>>
287 >>> upload.total_bytes == len(data)
288 True
289
290 If the size of the resource is **unknown** when the upload is initiated,
291 the ``stream_final`` argument can be used. This might occur if the
292 resource is being dynamically created on the client (e.g. application
293 logs). To use this argument:
294
295 .. testsetup:: resumable-unknown-size
296
297 import io
298
299 import mock
300 import requests
301 import http.client
302
303 from google.cloud.storage._media.requests import ResumableUpload
304
305 upload_url = 'http://test.invalid'
306 chunk_size = 3 * 1024 * 1024 # 3MB
307 upload = ResumableUpload(upload_url, chunk_size)
308
309 fake_response = requests.Response()
310 fake_response.status_code = int(http.client.OK)
311 fake_response._content = b''
312 resumable_url = 'http://test.invalid?upload_id=7up'
313 fake_response.headers['location'] = resumable_url
314
315 post_method = mock.Mock(return_value=fake_response, spec=[])
316 transport = mock.Mock(request=post_method, spec=['request'])
317
318 metadata = {'name': 'some-file.jpg'}
319 content_type = 'application/octet-stream'
320
321 stream = io.BytesIO(b'data')
322
323 .. doctest:: resumable-unknown-size
324
325 >>> response = upload.initiate(
326 ... transport, stream, metadata, content_type,
327 ... stream_final=False)
328 >>>
329 >>> upload.total_bytes is None
330 True
331
332 Args:
333 upload_url (str): The URL where the resumable upload will be initiated.
334 chunk_size (int): The size of each chunk used to upload the resource.
335 headers (Optional[Mapping[str, str]]): Extra headers that should
336 be sent with the :meth:`initiate` request, e.g. headers for
337 encrypted data. These **will not** be sent with
338 :meth:`transmit_next_chunk` or :meth:`recover` requests.
339 checksum Optional([str]): The type of checksum to compute to verify
340 the integrity of the object. After the upload is complete, the
341 server-computed checksum of the resulting object will be checked
342 and google.cloud.storage.exceptions.DataCorruption will be raised on
343 a mismatch. The corrupted file will not be deleted from the remote
344 host automatically. Supported values are "md5", "crc32c", "auto",
345 and None. The default is "auto", which will try to detect if the C
346 extension for crc32c is installed and fall back to md5 otherwise.
347 retry (Optional[google.api_core.retry.Retry]): How to retry the
348 RPC. A None value will disable retries. A
349 google.api_core.retry.Retry value will enable retries, and the
350 object will configure backoff and timeout options.
351
352 See the retry.py source code and docstrings in this package
353 (google.cloud.storage.retry) for information on retry types and how
354 to configure them.
355
356 Attributes:
357 upload_url (str): The URL where the content will be uploaded.
358
359 Raises:
360 ValueError: If ``chunk_size`` is not a multiple of
361 :data:`.UPLOAD_CHUNK_SIZE`.
362 """
363
364 def initiate(
365 self,
366 transport,
367 stream,
368 metadata,
369 content_type,
370 total_bytes=None,
371 stream_final=True,
372 timeout=(
373 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
374 _request_helpers._DEFAULT_READ_TIMEOUT,
375 ),
376 ):
377 """Initiate a resumable upload.
378
379 By default, this method assumes your ``stream`` is in a "final"
380 state ready to transmit. However, ``stream_final=False`` can be used
381 to indicate that the size of the resource is not known. This can happen
382 if bytes are being dynamically fed into ``stream``, e.g. if the stream
383 is attached to application logs.
384
385 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
386 read from the stream every time :meth:`transmit_next_chunk` is called.
387 If one of those reads produces strictly fewer bites than the chunk
388 size, the upload will be concluded.
389
390 Args:
391 transport (~requests.Session): A ``requests`` object which can
392 make authenticated requests.
393 stream (IO[bytes]): The stream (i.e. file-like object) that will
394 be uploaded. The stream **must** be at the beginning (i.e.
395 ``stream.tell() == 0``).
396 metadata (Mapping[str, str]): The resource metadata, such as an
397 ACL list.
398 content_type (str): The content type of the resource, e.g. a JPEG
399 image has content type ``image/jpeg``.
400 total_bytes (Optional[int]): The total number of bytes to be
401 uploaded. If specified, the upload size **will not** be
402 determined from the stream (even if ``stream_final=True``).
403 stream_final (Optional[bool]): Indicates if the ``stream`` is
404 "final" (i.e. no more bytes will be added to it). In this case
405 we determine the upload size from the size of the stream. If
406 ``total_bytes`` is passed, this argument will be ignored.
407 timeout (Optional[Union[float, Tuple[float, float]]]):
408 The number of seconds to wait for the server response.
409 Depending on the retry strategy, a request may be repeated
410 several times using the same timeout each time.
411
412 Can also be passed as a tuple (connect_timeout, read_timeout).
413 See :meth:`requests.Session.request` documentation for details.
414
415 Returns:
416 ~requests.Response: The HTTP response returned by ``transport``.
417 """
418 method, url, payload, headers = self._prepare_initiate_request(
419 stream,
420 metadata,
421 content_type,
422 total_bytes=total_bytes,
423 stream_final=stream_final,
424 )
425
426 # Wrap the request business logic in a function to be retried.
427 def retriable_request():
428 result = transport.request(
429 method, url, data=payload, headers=headers, timeout=timeout
430 )
431
432 self._process_initiate_response(result)
433
434 return result
435
436 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)
437
438 def transmit_next_chunk(
439 self,
440 transport,
441 timeout=(
442 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
443 _request_helpers._DEFAULT_READ_TIMEOUT,
444 ),
445 ):
446 """Transmit the next chunk of the resource to be uploaded.
447
448 If the current upload was initiated with ``stream_final=False``,
449 this method will dynamically determine if the upload has completed.
450 The upload will be considered complete if the stream produces
451 fewer than :attr:`chunk_size` bytes when a chunk is read from it.
452
453 In the case of failure, an exception is thrown that preserves the
454 failed response:
455
456 .. testsetup:: bad-response
457
458 import io
459
460 import mock
461 import requests
462 import http.client
463
464 from google.cloud.storage import _media
465 import google.cloud.storage._media.requests.upload as upload_mod
466
467 transport = mock.Mock(spec=['request'])
468 fake_response = requests.Response()
469 fake_response.status_code = int(http.client.BAD_REQUEST)
470 transport.request.return_value = fake_response
471
472 upload_url = 'http://test.invalid'
473 upload = upload_mod.ResumableUpload(
474 upload_url, _media.UPLOAD_CHUNK_SIZE)
475 # Fake that the upload has been initiate()-d
476 data = b'data is here'
477 upload._stream = io.BytesIO(data)
478 upload._total_bytes = len(data)
479 upload._resumable_url = 'http://test.invalid?upload_id=nope'
480
481 .. doctest:: bad-response
482 :options: +NORMALIZE_WHITESPACE
483
484 >>> error = None
485 >>> try:
486 ... upload.transmit_next_chunk(transport)
487 ... except _media.InvalidResponse as caught_exc:
488 ... error = caught_exc
489 ...
490 >>> error
491 InvalidResponse('Request failed with status code', 400,
492 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PERMANENT_REDIRECT: 308>)
493 >>> error.response
494 <Response [400]>
495
496 Args:
497 transport (~requests.Session): A ``requests`` object which can
498 make authenticated requests.
499 timeout (Optional[Union[float, Tuple[float, float]]]):
500 The number of seconds to wait for the server response.
501 Depending on the retry strategy, a request may be repeated
502 several times using the same timeout each time.
503
504 Can also be passed as a tuple (connect_timeout, read_timeout).
505 See :meth:`requests.Session.request` documentation for details.
506
507 Returns:
508 ~requests.Response: The HTTP response returned by ``transport``.
509
510 Raises:
511 ~google.cloud.storage.exceptions.InvalidResponse: If the status
512 code is not 200 or http.client.PERMANENT_REDIRECT.
513 ~google.cloud.storage.exceptions.DataCorruption: If this is the final
514 chunk, a checksum validation was requested, and the checksum
515 does not match or is not available.
516 """
517 method, url, payload, headers = self._prepare_request()
518
519 # Wrap the request business logic in a function to be retried.
520 def retriable_request():
521 result = transport.request(
522 method, url, data=payload, headers=headers, timeout=timeout
523 )
524
525 self._process_resumable_response(result, len(payload))
526
527 return result
528
529 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)
530
531 def recover(self, transport):
532 """Recover from a failure and check the status of the current upload.
533
534 This will verify the progress with the server and make sure the
535 current upload is in a valid state before :meth:`transmit_next_chunk`
536 can be used again. See https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
537 for more information.
538
539 This method can be used when a :class:`ResumableUpload` is in an
540 :attr:`~ResumableUpload.invalid` state due to a request failure.
541
542 Args:
543 transport (~requests.Session): A ``requests`` object which can
544 make authenticated requests.
545
546 Returns:
547 ~requests.Response: The HTTP response returned by ``transport``.
548 """
549 timeout = (
550 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
551 _request_helpers._DEFAULT_READ_TIMEOUT,
552 )
553
554 method, url, payload, headers = self._prepare_recover_request()
555 # NOTE: We assume "payload is None" but pass it along anyway.
556
557 # Wrap the request business logic in a function to be retried.
558 def retriable_request():
559 result = transport.request(
560 method, url, data=payload, headers=headers, timeout=timeout
561 )
562
563 self._process_recover_response(result)
564
565 return result
566
567 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)
568
569
570class XMLMPUContainer(_request_helpers.RequestsMixin, _upload.XMLMPUContainer):
571 """Initiate and close an upload using the XML MPU API.
572
573 An XML MPU sends an initial request and then receives an upload ID.
574 Using the upload ID, the upload is then done in numbered parts and the
575 parts can be uploaded concurrently.
576
577 In order to avoid concurrency issues with this container object, the
578 uploading of individual parts is handled separately, by XMLMPUPart objects
579 spawned from this container class. The XMLMPUPart objects are not
580 necessarily in the same process as the container, so they do not update the
581 container automatically.
582
583 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous
584 given the JSON multipart upload, so the abbreviation "MPU" will be used
585 throughout.
586
587 See: https://cloud.google.com/storage/docs/multipart-uploads
588
589 Args:
590 upload_url (str): The URL of the object (without query parameters). The
591 initiate, PUT, and finalization requests will all use this URL, with
592 varying query parameters.
593 headers (Optional[Mapping[str, str]]): Extra headers that should
594 be sent with the :meth:`initiate` request, e.g. headers for
595 encrypted data. These headers will be propagated to individual
596 XMLMPUPart objects spawned from this container as well.
597 retry (Optional[google.api_core.retry.Retry]): How to retry the
598 RPC. A None value will disable retries. A
599 google.api_core.retry.Retry value will enable retries, and the
600 object will configure backoff and timeout options.
601
602 See the retry.py source code and docstrings in this package
603 (google.cloud.storage.retry) for information on retry types and how
604 to configure them.
605
606 Attributes:
607 upload_url (str): The URL where the content will be uploaded.
608 upload_id (Optional(int)): The ID of the upload from the initialization
609 response.
610 """
611
612 def initiate(
613 self,
614 transport,
615 content_type,
616 timeout=(
617 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
618 _request_helpers._DEFAULT_READ_TIMEOUT,
619 ),
620 ):
621 """Initiate an MPU and record the upload ID.
622
623 Args:
624 transport (object): An object which can make authenticated
625 requests.
626 content_type (str): The content type of the resource, e.g. a JPEG
627 image has content type ``image/jpeg``.
628 timeout (Optional[Union[float, Tuple[float, float]]]):
629 The number of seconds to wait for the server response.
630 Depending on the retry strategy, a request may be repeated
631 several times using the same timeout each time.
632
633 Can also be passed as a tuple (connect_timeout, read_timeout).
634 See :meth:`requests.Session.request` documentation for details.
635
636 Returns:
637 ~requests.Response: The HTTP response returned by ``transport``.
638 """
639
640 method, url, payload, headers = self._prepare_initiate_request(
641 content_type,
642 )
643
644 # Wrap the request business logic in a function to be retried.
645 def retriable_request():
646 result = transport.request(
647 method, url, data=payload, headers=headers, timeout=timeout
648 )
649
650 self._process_initiate_response(result)
651
652 return result
653
654 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)
655
656 def finalize(
657 self,
658 transport,
659 timeout=(
660 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
661 _request_helpers._DEFAULT_READ_TIMEOUT,
662 ),
663 ):
664 """Finalize an MPU request with all the parts.
665
666 Args:
667 transport (object): An object which can make authenticated
668 requests.
669 timeout (Optional[Union[float, Tuple[float, float]]]):
670 The number of seconds to wait for the server response.
671 Depending on the retry strategy, a request may be repeated
672 several times using the same timeout each time.
673
674 Can also be passed as a tuple (connect_timeout, read_timeout).
675 See :meth:`requests.Session.request` documentation for details.
676
677 Returns:
678 ~requests.Response: The HTTP response returned by ``transport``.
679 """
680 method, url, payload, headers = self._prepare_finalize_request()
681
682 # Wrap the request business logic in a function to be retried.
683 def retriable_request():
684 result = transport.request(
685 method, url, data=payload, headers=headers, timeout=timeout
686 )
687
688 self._process_finalize_response(result)
689
690 return result
691
692 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)
693
694 def cancel(
695 self,
696 transport,
697 timeout=(
698 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
699 _request_helpers._DEFAULT_READ_TIMEOUT,
700 ),
701 ):
702 """Cancel an MPU request and permanently delete any uploaded parts.
703
704 This cannot be undone.
705
706 Args:
707 transport (object): An object which can make authenticated
708 requests.
709 timeout (Optional[Union[float, Tuple[float, float]]]):
710 The number of seconds to wait for the server response.
711 Depending on the retry strategy, a request may be repeated
712 several times using the same timeout each time.
713
714 Can also be passed as a tuple (connect_timeout, read_timeout).
715 See :meth:`requests.Session.request` documentation for details.
716
717 Returns:
718 ~requests.Response: The HTTP response returned by ``transport``.
719 """
720 method, url, payload, headers = self._prepare_cancel_request()
721
722 # Wrap the request business logic in a function to be retried.
723 def retriable_request():
724 result = transport.request(
725 method, url, data=payload, headers=headers, timeout=timeout
726 )
727
728 self._process_cancel_response(result)
729
730 return result
731
732 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)
733
734
735class XMLMPUPart(_request_helpers.RequestsMixin, _upload.XMLMPUPart):
736 def upload(
737 self,
738 transport,
739 timeout=(
740 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
741 _request_helpers._DEFAULT_READ_TIMEOUT,
742 ),
743 ):
744 """Upload the part.
745
746 Args:
747 transport (object): An object which can make authenticated
748 requests.
749 timeout (Optional[Union[float, Tuple[float, float]]]):
750 The number of seconds to wait for the server response.
751 Depending on the retry strategy, a request may be repeated
752 several times using the same timeout each time.
753
754 Can also be passed as a tuple (connect_timeout, read_timeout).
755 See :meth:`requests.Session.request` documentation for details.
756
757 Returns:
758 ~requests.Response: The HTTP response returned by ``transport``.
759 """
760 method, url, payload, headers = self._prepare_upload_request()
761 if self._checksum_object is not None:
762 checksum_digest_in_base64 = _helpers.prepare_checksum_digest(
763 self._checksum_object.digest()
764 )
765 if self._checksum_type == "crc32c":
766 headers["X-Goog-Hash"] = f"crc32c={checksum_digest_in_base64}"
767 elif self._checksum_type == "md5":
768 headers["X-Goog-Hash"] = f"md5={checksum_digest_in_base64}"
769
770 # Wrap the request business logic in a function to be retried.
771 def retriable_request():
772 result = transport.request(
773 method, url, data=payload, headers=headers, timeout=timeout
774 )
775
776 self._process_upload_response(result)
777
778 return result
779
780 return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)