1# Copyright 2017 Google Inc. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Support for resumable uploads. 
    16 
    17Also supported here are simple (media) uploads and multipart 
    18uploads that contain both metadata and a small file as payload. 
    19""" 
    20 
    21 
    22from google.cloud.storage._media import _upload 
    23from google.cloud.storage._media.requests import _request_helpers 
    24from google.cloud.storage._media import _helpers 
    25 
    26 
    27class SimpleUpload(_request_helpers.RequestsMixin, _upload.SimpleUpload): 
    28    """Upload a resource to a Google API. 
    29 
    30    A **simple** media upload sends no metadata and completes the upload 
    31    in a single request. 
    32 
    33    Args: 
    34        upload_url (str): The URL where the content will be uploaded. 
    35        headers (Optional[Mapping[str, str]]): Extra headers that should 
    36            be sent with the request, e.g. headers for encrypted data. 
    37 
    38    Attributes: 
    39        upload_url (str): The URL where the content will be uploaded. 
    40    """ 
    41 
    42    def transmit( 
    43        self, 
    44        transport, 
    45        data, 
    46        content_type, 
    47        timeout=( 
    48            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    49            _request_helpers._DEFAULT_READ_TIMEOUT, 
    50        ), 
    51    ): 
    52        """Transmit the resource to be uploaded. 
    53 
    54        Args: 
    55            transport (~requests.Session): A ``requests`` object which can 
    56                make authenticated requests. 
    57            data (bytes): The resource content to be uploaded. 
    58            content_type (str): The content type of the resource, e.g. a JPEG 
    59                image has content type ``image/jpeg``. 
    60            timeout (Optional[Union[float, Tuple[float, float]]]): 
    61                The number of seconds to wait for the server response. 
    62                Depending on the retry strategy, a request may be repeated 
    63                several times using the same timeout each time. 
    64 
    65                Can also be passed as a tuple (connect_timeout, read_timeout). 
    66                See :meth:`requests.Session.request` documentation for details. 
    67 
    68        Returns: 
    69            ~requests.Response: The HTTP response returned by ``transport``. 
    70        """ 
    71        method, url, payload, headers = self._prepare_request(data, content_type) 
    72 
    73        # Wrap the request business logic in a function to be retried. 
    74        def retriable_request(): 
    75            result = transport.request( 
    76                method, url, data=payload, headers=headers, timeout=timeout 
    77            ) 
    78 
    79            self._process_response(result) 
    80 
    81            return result 
    82 
    83        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    84 
    85 
    86class MultipartUpload(_request_helpers.RequestsMixin, _upload.MultipartUpload): 
    87    """Upload a resource with metadata to a Google API. 
    88 
    89    A **multipart** upload sends both metadata and the resource in a single 
    90    (multipart) request. 
    91 
    92    Args: 
    93        upload_url (str): The URL where the content will be uploaded. 
    94        headers (Optional[Mapping[str, str]]): Extra headers that should 
    95            be sent with the request, e.g. headers for encrypted data. 
    96        checksum Optional([str]): The type of checksum to compute to verify 
    97            the integrity of the object. The request metadata will be amended 
    98            to include the computed value. Using this option will override a 
    99            manually-set checksum value. Supported values are "md5", 
    100            "crc32c", "auto", and None. The default is "auto", which will try 
    101            to detect if the C extension for crc32c is installed and fall back 
    102            to md5 otherwise. 
    103        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    104            RPC. A None value will disable retries. A 
    105            google.api_core.retry.Retry value will enable retries, and the 
    106            object will configure backoff and timeout options. 
    107 
    108            See the retry.py source code and docstrings in this package 
    109            (google.cloud.storage.retry) for information on retry types and how 
    110            to configure them. 
    111 
    112    Attributes: 
    113        upload_url (str): The URL where the content will be uploaded. 
    114    """ 
    115 
    116    def transmit( 
    117        self, 
    118        transport, 
    119        data, 
    120        metadata, 
    121        content_type, 
    122        timeout=( 
    123            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    124            _request_helpers._DEFAULT_READ_TIMEOUT, 
    125        ), 
    126    ): 
    127        """Transmit the resource to be uploaded. 
    128 
    129        Args: 
    130            transport (~requests.Session): A ``requests`` object which can 
    131                make authenticated requests. 
    132            data (bytes): The resource content to be uploaded. 
    133            metadata (Mapping[str, str]): The resource metadata, such as an 
    134                ACL list. 
    135            content_type (str): The content type of the resource, e.g. a JPEG 
    136                image has content type ``image/jpeg``. 
    137            timeout (Optional[Union[float, Tuple[float, float]]]): 
    138                The number of seconds to wait for the server response. 
    139                Depending on the retry strategy, a request may be repeated 
    140                several times using the same timeout each time. 
    141 
    142                Can also be passed as a tuple (connect_timeout, read_timeout). 
    143                See :meth:`requests.Session.request` documentation for details. 
    144 
    145        Returns: 
    146            ~requests.Response: The HTTP response returned by ``transport``. 
    147        """ 
    148        method, url, payload, headers = self._prepare_request( 
    149            data, metadata, content_type 
    150        ) 
    151 
    152        # Wrap the request business logic in a function to be retried. 
    153        def retriable_request(): 
    154            result = transport.request( 
    155                method, url, data=payload, headers=headers, timeout=timeout 
    156            ) 
    157 
    158            self._process_response(result) 
    159 
    160            return result 
    161 
    162        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    163 
    164 
    165class ResumableUpload(_request_helpers.RequestsMixin, _upload.ResumableUpload): 
    166    """Initiate and fulfill a resumable upload to a Google API. 
    167 
    168    A **resumable** upload sends an initial request with the resource metadata 
    169    and then gets assigned an upload ID / upload URL to send bytes to. 
    170    Using the upload URL, the upload is then done in chunks (determined by 
    171    the user) until all bytes have been uploaded. 
    172 
    173    When constructing a resumable upload, only the resumable upload URL and 
    174    the chunk size are required: 
    175 
    176    .. testsetup:: resumable-constructor 
    177 
    178       bucket = 'bucket-foo' 
    179 
    180    .. doctest:: resumable-constructor 
    181 
    182       >>> from google.cloud.storage._media.requests import ResumableUpload 
    183       >>> 
    184       >>> url_template = ( 
    185       ...     'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?' 
    186       ...     'uploadType=resumable') 
    187       >>> upload_url = url_template.format(bucket=bucket) 
    188       >>> 
    189       >>> chunk_size = 3 * 1024 * 1024  # 3MB 
    190       >>> upload = ResumableUpload(upload_url, chunk_size) 
    191 
    192    When initiating an upload (via :meth:`initiate`), the caller is expected 
    193    to pass the resource being uploaded as a file-like ``stream``. If the size 
    194    of the resource is explicitly known, it can be passed in directly: 
    195 
    196    .. testsetup:: resumable-explicit-size 
    197 
    198       import os 
    199       import tempfile 
    200 
    201       import mock 
    202       import requests 
    203       import http.client 
    204 
    205       from google.cloud.storage._media.requests import ResumableUpload 
    206 
    207       upload_url = 'http://test.invalid' 
    208       chunk_size = 3 * 1024 * 1024  # 3MB 
    209       upload = ResumableUpload(upload_url, chunk_size) 
    210 
    211       file_desc, filename = tempfile.mkstemp() 
    212       os.close(file_desc) 
    213 
    214       data = b'some bytes!' 
    215       with open(filename, 'wb') as file_obj: 
    216           file_obj.write(data) 
    217 
    218       fake_response = requests.Response() 
    219       fake_response.status_code = int(http.client.OK) 
    220       fake_response._content = b'' 
    221       resumable_url = 'http://test.invalid?upload_id=7up' 
    222       fake_response.headers['location'] = resumable_url 
    223 
    224       post_method = mock.Mock(return_value=fake_response, spec=[]) 
    225       transport = mock.Mock(request=post_method, spec=['request']) 
    226 
    227    .. doctest:: resumable-explicit-size 
    228 
    229       >>> import os 
    230       >>> 
    231       >>> upload.total_bytes is None 
    232       True 
    233       >>> 
    234       >>> stream = open(filename, 'rb') 
    235       >>> total_bytes = os.path.getsize(filename) 
    236       >>> metadata = {'name': filename} 
    237       >>> response = upload.initiate( 
    238       ...     transport, stream, metadata, 'text/plain', 
    239       ...     total_bytes=total_bytes) 
    240       >>> response 
    241       <Response [200]> 
    242       >>> 
    243       >>> upload.total_bytes == total_bytes 
    244       True 
    245 
    246    .. testcleanup:: resumable-explicit-size 
    247 
    248       os.remove(filename) 
    249 
    250    If the stream is in a "final" state (i.e. it won't have any more bytes 
    251    written to it), the total number of bytes can be determined implicitly 
    252    from the ``stream`` itself: 
    253 
    254    .. testsetup:: resumable-implicit-size 
    255 
    256       import io 
    257 
    258       import mock 
    259       import requests 
    260       import http.client 
    261 
    262       from google.cloud.storage._media.requests import ResumableUpload 
    263 
    264       upload_url = 'http://test.invalid' 
    265       chunk_size = 3 * 1024 * 1024  # 3MB 
    266       upload = ResumableUpload(upload_url, chunk_size) 
    267 
    268       fake_response = requests.Response() 
    269       fake_response.status_code = int(http.client.OK) 
    270       fake_response._content = b'' 
    271       resumable_url = 'http://test.invalid?upload_id=7up' 
    272       fake_response.headers['location'] = resumable_url 
    273 
    274       post_method = mock.Mock(return_value=fake_response, spec=[]) 
    275       transport = mock.Mock(request=post_method, spec=['request']) 
    276 
    277       data = b'some MOAR bytes!' 
    278       metadata = {'name': 'some-file.jpg'} 
    279       content_type = 'image/jpeg' 
    280 
    281    .. doctest:: resumable-implicit-size 
    282 
    283       >>> stream = io.BytesIO(data) 
    284       >>> response = upload.initiate( 
    285       ...     transport, stream, metadata, content_type) 
    286       >>> 
    287       >>> upload.total_bytes == len(data) 
    288       True 
    289 
    290    If the size of the resource is **unknown** when the upload is initiated, 
    291    the ``stream_final`` argument can be used. This might occur if the 
    292    resource is being dynamically created on the client (e.g. application 
    293    logs). To use this argument: 
    294 
    295    .. testsetup:: resumable-unknown-size 
    296 
    297       import io 
    298 
    299       import mock 
    300       import requests 
    301       import http.client 
    302 
    303       from google.cloud.storage._media.requests import ResumableUpload 
    304 
    305       upload_url = 'http://test.invalid' 
    306       chunk_size = 3 * 1024 * 1024  # 3MB 
    307       upload = ResumableUpload(upload_url, chunk_size) 
    308 
    309       fake_response = requests.Response() 
    310       fake_response.status_code = int(http.client.OK) 
    311       fake_response._content = b'' 
    312       resumable_url = 'http://test.invalid?upload_id=7up' 
    313       fake_response.headers['location'] = resumable_url 
    314 
    315       post_method = mock.Mock(return_value=fake_response, spec=[]) 
    316       transport = mock.Mock(request=post_method, spec=['request']) 
    317 
    318       metadata = {'name': 'some-file.jpg'} 
    319       content_type = 'application/octet-stream' 
    320 
    321       stream = io.BytesIO(b'data') 
    322 
    323    .. doctest:: resumable-unknown-size 
    324 
    325       >>> response = upload.initiate( 
    326       ...     transport, stream, metadata, content_type, 
    327       ...     stream_final=False) 
    328       >>> 
    329       >>> upload.total_bytes is None 
    330       True 
    331 
    332    Args: 
    333        upload_url (str): The URL where the resumable upload will be initiated. 
    334        chunk_size (int): The size of each chunk used to upload the resource. 
    335        headers (Optional[Mapping[str, str]]): Extra headers that should 
    336            be sent with the :meth:`initiate` request, e.g. headers for 
    337            encrypted data. These **will not** be sent with 
    338            :meth:`transmit_next_chunk` or :meth:`recover` requests. 
    339        checksum Optional([str]): The type of checksum to compute to verify 
    340            the integrity of the object. After the upload is complete, the 
    341            server-computed checksum of the resulting object will be checked 
    342            and google.cloud.storage.exceptions.DataCorruption will be raised on 
    343            a mismatch. The corrupted file will not be deleted from the remote 
    344            host automatically. Supported values are "md5", "crc32c", "auto", 
    345            and None. The default is "auto", which will try to detect if the C 
    346            extension for crc32c is installed and fall back to md5 otherwise. 
    347        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    348            RPC. A None value will disable retries. A 
    349            google.api_core.retry.Retry value will enable retries, and the 
    350            object will configure backoff and timeout options. 
    351 
    352            See the retry.py source code and docstrings in this package 
    353            (google.cloud.storage.retry) for information on retry types and how 
    354            to configure them. 
    355 
    356    Attributes: 
    357        upload_url (str): The URL where the content will be uploaded. 
    358 
    359    Raises: 
    360        ValueError: If ``chunk_size`` is not a multiple of 
    361            :data:`.UPLOAD_CHUNK_SIZE`. 
    362    """ 
    363 
    364    def initiate( 
    365        self, 
    366        transport, 
    367        stream, 
    368        metadata, 
    369        content_type, 
    370        total_bytes=None, 
    371        stream_final=True, 
    372        timeout=( 
    373            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    374            _request_helpers._DEFAULT_READ_TIMEOUT, 
    375        ), 
    376    ): 
    377        """Initiate a resumable upload. 
    378 
    379        By default, this method assumes your ``stream`` is in a "final" 
    380        state ready to transmit. However, ``stream_final=False`` can be used 
    381        to indicate that the size of the resource is not known. This can happen 
    382        if bytes are being dynamically fed into ``stream``, e.g. if the stream 
    383        is attached to application logs. 
    384 
    385        If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be 
    386        read from the stream every time :meth:`transmit_next_chunk` is called. 
    387        If one of those reads produces strictly fewer bites than the chunk 
    388        size, the upload will be concluded. 
    389 
    390        Args: 
    391            transport (~requests.Session): A ``requests`` object which can 
    392                make authenticated requests. 
    393            stream (IO[bytes]): The stream (i.e. file-like object) that will 
    394                be uploaded. The stream **must** be at the beginning (i.e. 
    395                ``stream.tell() == 0``). 
    396            metadata (Mapping[str, str]): The resource metadata, such as an 
    397                ACL list. 
    398            content_type (str): The content type of the resource, e.g. a JPEG 
    399                image has content type ``image/jpeg``. 
    400            total_bytes (Optional[int]): The total number of bytes to be 
    401                uploaded. If specified, the upload size **will not** be 
    402                determined from the stream (even if ``stream_final=True``). 
    403            stream_final (Optional[bool]): Indicates if the ``stream`` is 
    404                "final" (i.e. no more bytes will be added to it). In this case 
    405                we determine the upload size from the size of the stream. If 
    406                ``total_bytes`` is passed, this argument will be ignored. 
    407            timeout (Optional[Union[float, Tuple[float, float]]]): 
    408                The number of seconds to wait for the server response. 
    409                Depending on the retry strategy, a request may be repeated 
    410                several times using the same timeout each time. 
    411 
    412                Can also be passed as a tuple (connect_timeout, read_timeout). 
    413                See :meth:`requests.Session.request` documentation for details. 
    414 
    415        Returns: 
    416            ~requests.Response: The HTTP response returned by ``transport``. 
    417        """ 
    418        method, url, payload, headers = self._prepare_initiate_request( 
    419            stream, 
    420            metadata, 
    421            content_type, 
    422            total_bytes=total_bytes, 
    423            stream_final=stream_final, 
    424        ) 
    425 
    426        # Wrap the request business logic in a function to be retried. 
    427        def retriable_request(): 
    428            result = transport.request( 
    429                method, url, data=payload, headers=headers, timeout=timeout 
    430            ) 
    431 
    432            self._process_initiate_response(result) 
    433 
    434            return result 
    435 
    436        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    437 
    438    def transmit_next_chunk( 
    439        self, 
    440        transport, 
    441        timeout=( 
    442            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    443            _request_helpers._DEFAULT_READ_TIMEOUT, 
    444        ), 
    445    ): 
    446        """Transmit the next chunk of the resource to be uploaded. 
    447 
    448        If the current upload was initiated with ``stream_final=False``, 
    449        this method will dynamically determine if the upload has completed. 
    450        The upload will be considered complete if the stream produces 
    451        fewer than :attr:`chunk_size` bytes when a chunk is read from it. 
    452 
    453        In the case of failure, an exception is thrown that preserves the 
    454        failed response: 
    455 
    456        .. testsetup:: bad-response 
    457 
    458           import io 
    459 
    460           import mock 
    461           import requests 
    462           import http.client 
    463 
    464           from google.cloud.storage import _media 
    465           import google.cloud.storage._media.requests.upload as upload_mod 
    466 
    467           transport = mock.Mock(spec=['request']) 
    468           fake_response = requests.Response() 
    469           fake_response.status_code = int(http.client.BAD_REQUEST) 
    470           transport.request.return_value = fake_response 
    471 
    472           upload_url = 'http://test.invalid' 
    473           upload = upload_mod.ResumableUpload( 
    474               upload_url, _media.UPLOAD_CHUNK_SIZE) 
    475           # Fake that the upload has been initiate()-d 
    476           data = b'data is here' 
    477           upload._stream = io.BytesIO(data) 
    478           upload._total_bytes = len(data) 
    479           upload._resumable_url = 'http://test.invalid?upload_id=nope' 
    480 
    481        .. doctest:: bad-response 
    482           :options: +NORMALIZE_WHITESPACE 
    483 
    484           >>> error = None 
    485           >>> try: 
    486           ...     upload.transmit_next_chunk(transport) 
    487           ... except _media.InvalidResponse as caught_exc: 
    488           ...     error = caught_exc 
    489           ... 
    490           >>> error 
    491           InvalidResponse('Request failed with status code', 400, 
    492                           'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PERMANENT_REDIRECT: 308>) 
    493           >>> error.response 
    494           <Response [400]> 
    495 
    496        Args: 
    497            transport (~requests.Session): A ``requests`` object which can 
    498                make authenticated requests. 
    499            timeout (Optional[Union[float, Tuple[float, float]]]): 
    500                The number of seconds to wait for the server response. 
    501                Depending on the retry strategy, a request may be repeated 
    502                several times using the same timeout each time. 
    503 
    504                Can also be passed as a tuple (connect_timeout, read_timeout). 
    505                See :meth:`requests.Session.request` documentation for details. 
    506 
    507        Returns: 
    508            ~requests.Response: The HTTP response returned by ``transport``. 
    509 
    510        Raises: 
    511            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    512                code is not 200 or http.client.PERMANENT_REDIRECT. 
    513            ~google.cloud.storage.exceptions.DataCorruption: If this is the final 
    514                chunk, a checksum validation was requested, and the checksum 
    515                does not match or is not available. 
    516        """ 
    517        method, url, payload, headers = self._prepare_request() 
    518 
    519        # Wrap the request business logic in a function to be retried. 
    520        def retriable_request(): 
    521            result = transport.request( 
    522                method, url, data=payload, headers=headers, timeout=timeout 
    523            ) 
    524 
    525            self._process_resumable_response(result, len(payload)) 
    526 
    527            return result 
    528 
    529        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    530 
    531    def recover(self, transport): 
    532        """Recover from a failure and check the status of the current upload. 
    533 
    534        This will verify the progress with the server and make sure the 
    535        current upload is in a valid state before :meth:`transmit_next_chunk` 
    536        can be used again. See https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check 
    537        for more information. 
    538 
    539        This method can be used when a :class:`ResumableUpload` is in an 
    540        :attr:`~ResumableUpload.invalid` state due to a request failure. 
    541 
    542        Args: 
    543            transport (~requests.Session): A ``requests`` object which can 
    544                make authenticated requests. 
    545 
    546        Returns: 
    547            ~requests.Response: The HTTP response returned by ``transport``. 
    548        """ 
    549        timeout = ( 
    550            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    551            _request_helpers._DEFAULT_READ_TIMEOUT, 
    552        ) 
    553 
    554        method, url, payload, headers = self._prepare_recover_request() 
    555        # NOTE: We assume "payload is None" but pass it along anyway. 
    556 
    557        # Wrap the request business logic in a function to be retried. 
    558        def retriable_request(): 
    559            result = transport.request( 
    560                method, url, data=payload, headers=headers, timeout=timeout 
    561            ) 
    562 
    563            self._process_recover_response(result) 
    564 
    565            return result 
    566 
    567        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    568 
    569 
    570class XMLMPUContainer(_request_helpers.RequestsMixin, _upload.XMLMPUContainer): 
    571    """Initiate and close an upload using the XML MPU API. 
    572 
    573    An XML MPU sends an initial request and then receives an upload ID. 
    574    Using the upload ID, the upload is then done in numbered parts and the 
    575    parts can be uploaded concurrently. 
    576 
    577    In order to avoid concurrency issues with this container object, the 
    578    uploading of individual parts is handled separately, by XMLMPUPart objects 
    579    spawned from this container class. The XMLMPUPart objects are not 
    580    necessarily in the same process as the container, so they do not update the 
    581    container automatically. 
    582 
    583    MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 
    584    given the JSON multipart upload, so the abbreviation "MPU" will be used 
    585    throughout. 
    586 
    587    See: https://cloud.google.com/storage/docs/multipart-uploads 
    588 
    589    Args: 
    590        upload_url (str): The URL of the object (without query parameters). The 
    591            initiate, PUT, and finalization requests will all use this URL, with 
    592            varying query parameters. 
    593        headers (Optional[Mapping[str, str]]): Extra headers that should 
    594            be sent with the :meth:`initiate` request, e.g. headers for 
    595            encrypted data. These headers will be propagated to individual 
    596            XMLMPUPart objects spawned from this container as well. 
    597        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    598            RPC. A None value will disable retries. A 
    599            google.api_core.retry.Retry value will enable retries, and the 
    600            object will configure backoff and timeout options. 
    601 
    602            See the retry.py source code and docstrings in this package 
    603            (google.cloud.storage.retry) for information on retry types and how 
    604            to configure them. 
    605 
    606    Attributes: 
    607        upload_url (str): The URL where the content will be uploaded. 
    608        upload_id (Optional(int)): The ID of the upload from the initialization 
    609            response. 
    610    """ 
    611 
    612    def initiate( 
    613        self, 
    614        transport, 
    615        content_type, 
    616        timeout=( 
    617            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    618            _request_helpers._DEFAULT_READ_TIMEOUT, 
    619        ), 
    620    ): 
    621        """Initiate an MPU and record the upload ID. 
    622 
    623        Args: 
    624            transport (object): An object which can make authenticated 
    625                requests. 
    626            content_type (str): The content type of the resource, e.g. a JPEG 
    627                image has content type ``image/jpeg``. 
    628            timeout (Optional[Union[float, Tuple[float, float]]]): 
    629                The number of seconds to wait for the server response. 
    630                Depending on the retry strategy, a request may be repeated 
    631                several times using the same timeout each time. 
    632 
    633                Can also be passed as a tuple (connect_timeout, read_timeout). 
    634                See :meth:`requests.Session.request` documentation for details. 
    635 
    636        Returns: 
    637            ~requests.Response: The HTTP response returned by ``transport``. 
    638        """ 
    639 
    640        method, url, payload, headers = self._prepare_initiate_request( 
    641            content_type, 
    642        ) 
    643 
    644        # Wrap the request business logic in a function to be retried. 
    645        def retriable_request(): 
    646            result = transport.request( 
    647                method, url, data=payload, headers=headers, timeout=timeout 
    648            ) 
    649 
    650            self._process_initiate_response(result) 
    651 
    652            return result 
    653 
    654        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    655 
    656    def finalize( 
    657        self, 
    658        transport, 
    659        timeout=( 
    660            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    661            _request_helpers._DEFAULT_READ_TIMEOUT, 
    662        ), 
    663    ): 
    664        """Finalize an MPU request with all the parts. 
    665 
    666        Args: 
    667            transport (object): An object which can make authenticated 
    668                requests. 
    669            timeout (Optional[Union[float, Tuple[float, float]]]): 
    670                The number of seconds to wait for the server response. 
    671                Depending on the retry strategy, a request may be repeated 
    672                several times using the same timeout each time. 
    673 
    674                Can also be passed as a tuple (connect_timeout, read_timeout). 
    675                See :meth:`requests.Session.request` documentation for details. 
    676 
    677        Returns: 
    678            ~requests.Response: The HTTP response returned by ``transport``. 
    679        """ 
    680        method, url, payload, headers = self._prepare_finalize_request() 
    681 
    682        # Wrap the request business logic in a function to be retried. 
    683        def retriable_request(): 
    684            result = transport.request( 
    685                method, url, data=payload, headers=headers, timeout=timeout 
    686            ) 
    687 
    688            self._process_finalize_response(result) 
    689 
    690            return result 
    691 
    692        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    693 
    694    def cancel( 
    695        self, 
    696        transport, 
    697        timeout=( 
    698            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    699            _request_helpers._DEFAULT_READ_TIMEOUT, 
    700        ), 
    701    ): 
    702        """Cancel an MPU request and permanently delete any uploaded parts. 
    703 
    704        This cannot be undone. 
    705 
    706        Args: 
    707            transport (object): An object which can make authenticated 
    708                requests. 
    709            timeout (Optional[Union[float, Tuple[float, float]]]): 
    710                The number of seconds to wait for the server response. 
    711                Depending on the retry strategy, a request may be repeated 
    712                several times using the same timeout each time. 
    713 
    714                Can also be passed as a tuple (connect_timeout, read_timeout). 
    715                See :meth:`requests.Session.request` documentation for details. 
    716 
    717        Returns: 
    718            ~requests.Response: The HTTP response returned by ``transport``. 
    719        """ 
    720        method, url, payload, headers = self._prepare_cancel_request() 
    721 
    722        # Wrap the request business logic in a function to be retried. 
    723        def retriable_request(): 
    724            result = transport.request( 
    725                method, url, data=payload, headers=headers, timeout=timeout 
    726            ) 
    727 
    728            self._process_cancel_response(result) 
    729 
    730            return result 
    731 
    732        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    733 
    734 
    735class XMLMPUPart(_request_helpers.RequestsMixin, _upload.XMLMPUPart): 
    736    def upload( 
    737        self, 
    738        transport, 
    739        timeout=( 
    740            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    741            _request_helpers._DEFAULT_READ_TIMEOUT, 
    742        ), 
    743    ): 
    744        """Upload the part. 
    745 
    746        Args: 
    747            transport (object): An object which can make authenticated 
    748                requests. 
    749            timeout (Optional[Union[float, Tuple[float, float]]]): 
    750                The number of seconds to wait for the server response. 
    751                Depending on the retry strategy, a request may be repeated 
    752                several times using the same timeout each time. 
    753 
    754                Can also be passed as a tuple (connect_timeout, read_timeout). 
    755                See :meth:`requests.Session.request` documentation for details. 
    756 
    757        Returns: 
    758            ~requests.Response: The HTTP response returned by ``transport``. 
    759        """ 
    760        method, url, payload, headers = self._prepare_upload_request() 
    761        if self._checksum_object is not None: 
    762            checksum_digest_in_base64 = _helpers.prepare_checksum_digest( 
    763                self._checksum_object.digest() 
    764            ) 
    765            if self._checksum_type == "crc32c": 
    766                headers["X-Goog-Hash"] = f"crc32c={checksum_digest_in_base64}" 
    767            elif self._checksum_type == "md5": 
    768                headers["X-Goog-Hash"] = f"md5={checksum_digest_in_base64}" 
    769 
    770        # Wrap the request business logic in a function to be retried. 
    771        def retriable_request(): 
    772            result = transport.request( 
    773                method, url, data=payload, headers=headers, timeout=timeout 
    774            ) 
    775 
    776            self._process_upload_response(result) 
    777 
    778            return result 
    779 
    780        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)