1# Copyright 2017 Google Inc. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Virtual bases classes for uploading media via Google APIs. 
    16 
    17Supported here are: 
    18 
    19* simple (media) uploads 
    20* multipart uploads that contain both metadata and a small file as payload 
    21* resumable uploads (with metadata as well) 
    22""" 
    23 
    24import http.client 
    25import json 
    26import os 
    27import random 
    28import re 
    29import sys 
    30import urllib.parse 
    31 
    32from google.cloud.storage._media import _helpers 
    33from google.cloud.storage._media import UPLOAD_CHUNK_SIZE 
    34from google.cloud.storage.exceptions import InvalidResponse 
    35from google.cloud.storage.exceptions import DataCorruption 
    36from google.cloud.storage.retry import DEFAULT_RETRY 
    37 
    38from xml.etree import ElementTree 
    39 
    40 
    41_CONTENT_TYPE_HEADER = "content-type" 
    42_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}" 
    43_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*" 
    44_EMPTY_RANGE_TEMPLATE = "bytes */{:d}" 
    45_BOUNDARY_WIDTH = len(str(sys.maxsize - 1)) 
    46_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH) 
    47_MULTIPART_SEP = b"--" 
    48_CRLF = b"\r\n" 
    49_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n" 
    50_RELATED_HEADER = b'multipart/related; boundary="' 
    51_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE) 
    52_STREAM_ERROR_TEMPLATE = ( 
    53    "Bytes stream is in unexpected state. " 
    54    "The local stream has had {:d} bytes read from it while " 
    55    "{:d} bytes have already been updated (they should match)." 
    56) 
    57_STREAM_READ_PAST_TEMPLATE = ( 
    58    "{:d} bytes have been read from the stream, which exceeds " 
    59    "the expected total {:d}." 
    60) 
    61_DELETE = "DELETE" 
    62_POST = "POST" 
    63_PUT = "PUT" 
    64_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = ( 
    65    "The computed ``{}`` checksum, ``{}``, and the checksum reported by the " 
    66    "remote host, ``{}``, did not match." 
    67) 
    68_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 
    69    "Response metadata had no ``{}`` value; checksum could not be validated." 
    70) 
    71_UPLOAD_HEADER_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 
    72    "Response headers had no ``{}`` value; checksum could not be validated." 
    73) 
    74_MPU_INITIATE_QUERY = "?uploads" 
    75_MPU_PART_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" 
    76_S3_COMPAT_XML_NAMESPACE = "{http://s3.amazonaws.com/doc/2006-03-01/}" 
    77_UPLOAD_ID_NODE = "UploadId" 
    78_MPU_FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}" 
    79 
    80 
    81class UploadBase(object): 
    82    """Base class for upload helpers. 
    83 
    84    Defines core shared behavior across different upload types. 
    85 
    86    Args: 
    87        upload_url (str): The URL where the content will be uploaded. 
    88        headers (Optional[Mapping[str, str]]): Extra headers that should 
    89            be sent with the request, e.g. headers for encrypted data. 
    90        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    91            RPC. A None value will disable retries. A 
    92            google.api_core.retry.Retry value will enable retries, and the 
    93            object will configure backoff and timeout options. 
    94 
    95            See the retry.py source code and docstrings in this package 
    96            (google.cloud.storage.retry) for information on retry types and how 
    97            to configure them. 
    98 
    99    Attributes: 
    100        upload_url (str): The URL where the content will be uploaded. 
    101    """ 
    102 
    103    def __init__(self, upload_url, headers=None, retry=DEFAULT_RETRY): 
    104        self.upload_url = upload_url 
    105        if headers is None: 
    106            headers = {} 
    107        self._headers = headers 
    108        self._finished = False 
    109        self._retry_strategy = retry 
    110 
    111    @property 
    112    def finished(self): 
    113        """bool: Flag indicating if the upload has completed.""" 
    114        return self._finished 
    115 
    116    def _process_response(self, response): 
    117        """Process the response from an HTTP request. 
    118 
    119        This is everything that must be done after a request that doesn't 
    120        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    121        philosophy. 
    122 
    123        Args: 
    124            response (object): The HTTP response object. 
    125 
    126        Raises: 
    127            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    128                code is not 200. 
    129 
    130        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    131        """ 
    132        # Tombstone the current upload so it cannot be used again (in either 
    133        # failure or success). 
    134        self._finished = True 
    135        _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 
    136 
    137    @staticmethod 
    138    def _get_status_code(response): 
    139        """Access the status code from an HTTP response. 
    140 
    141        Args: 
    142            response (object): The HTTP response object. 
    143 
    144        Raises: 
    145            NotImplementedError: Always, since virtual. 
    146        """ 
    147        raise NotImplementedError("This implementation is virtual.") 
    148 
    149    @staticmethod 
    150    def _get_headers(response): 
    151        """Access the headers from an HTTP response. 
    152 
    153        Args: 
    154            response (object): The HTTP response object. 
    155 
    156        Raises: 
    157            NotImplementedError: Always, since virtual. 
    158        """ 
    159        raise NotImplementedError("This implementation is virtual.") 
    160 
    161    @staticmethod 
    162    def _get_body(response): 
    163        """Access the response body from an HTTP response. 
    164 
    165        Args: 
    166            response (object): The HTTP response object. 
    167 
    168        Raises: 
    169            NotImplementedError: Always, since virtual. 
    170        """ 
    171        raise NotImplementedError("This implementation is virtual.") 
    172 
    173 
    174class SimpleUpload(UploadBase): 
    175    """Upload a resource to a Google API. 
    176 
    177    A **simple** media upload sends no metadata and completes the upload 
    178    in a single request. 
    179 
    180    Args: 
    181        upload_url (str): The URL where the content will be uploaded. 
    182        headers (Optional[Mapping[str, str]]): Extra headers that should 
    183            be sent with the request, e.g. headers for encrypted data. 
    184        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    185            RPC. A None value will disable retries. A 
    186            google.api_core.retry.Retry value will enable retries, and the 
    187            object will configure backoff and timeout options. 
    188 
    189            See the retry.py source code and docstrings in this package 
    190            (google.cloud.storage.retry) for information on retry types and how 
    191            to configure them. 
    192 
    193    Attributes: 
    194        upload_url (str): The URL where the content will be uploaded. 
    195    """ 
    196 
    197    def _prepare_request(self, data, content_type): 
    198        """Prepare the contents of an HTTP request. 
    199 
    200        This is everything that must be done before a request that doesn't 
    201        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    202        philosophy. 
    203 
    204        .. note: 
    205 
    206            This method will be used only once, so ``headers`` will be 
    207            mutated by having a new key added to it. 
    208 
    209        Args: 
    210            data (bytes): The resource content to be uploaded. 
    211            content_type (str): The content type for the request. 
    212 
    213        Returns: 
    214            Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 
    215 
    216              * HTTP verb for the request (always POST) 
    217              * the URL for the request 
    218              * the body of the request 
    219              * headers for the request 
    220 
    221        Raises: 
    222            ValueError: If the current upload has already finished. 
    223            TypeError: If ``data`` isn't bytes. 
    224 
    225        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    226        """ 
    227        if self.finished: 
    228            raise ValueError("An upload can only be used once.") 
    229 
    230        if not isinstance(data, bytes): 
    231            raise TypeError("`data` must be bytes, received", type(data)) 
    232        self._headers[_CONTENT_TYPE_HEADER] = content_type 
    233        return _POST, self.upload_url, data, self._headers 
    234 
    235    def transmit(self, transport, data, content_type, timeout=None): 
    236        """Transmit the resource to be uploaded. 
    237 
    238        Args: 
    239            transport (object): An object which can make authenticated 
    240                requests. 
    241            data (bytes): The resource content to be uploaded. 
    242            content_type (str): The content type of the resource, e.g. a JPEG 
    243                image has content type ``image/jpeg``. 
    244            timeout (Optional[Union[float, Tuple[float, float]]]): 
    245                The number of seconds to wait for the server response. 
    246                Depending on the retry strategy, a request may be repeated 
    247                several times using the same timeout each time. 
    248 
    249                Can also be passed as a tuple (connect_timeout, read_timeout). 
    250                See :meth:`requests.Session.request` documentation for details. 
    251 
    252        Raises: 
    253            NotImplementedError: Always, since virtual. 
    254        """ 
    255        raise NotImplementedError("This implementation is virtual.") 
    256 
    257 
    258class MultipartUpload(UploadBase): 
    259    """Upload a resource with metadata to a Google API. 
    260 
    261    A **multipart** upload sends both metadata and the resource in a single 
    262    (multipart) request. 
    263 
    264    Args: 
    265        upload_url (str): The URL where the content will be uploaded. 
    266        headers (Optional[Mapping[str, str]]): Extra headers that should 
    267            be sent with the request, e.g. headers for encrypted data. 
    268        checksum Optional([str]): The type of checksum to compute to verify 
    269            the integrity of the object. The request metadata will be amended 
    270            to include the computed value. Using this option will override a 
    271            manually-set checksum value. Supported values are "md5", 
    272            "crc32c", "auto", and None. The default is "auto", which will try 
    273            to detect if the C extension for crc32c is installed and fall back 
    274            to md5 otherwise. 
    275        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    276            RPC. A None value will disable retries. A 
    277            google.api_core.retry.Retry value will enable retries, and the 
    278            object will configure backoff and timeout options. 
    279 
    280            See the retry.py source code and docstrings in this package 
    281            (google.cloud.storage.retry) for information on retry types and how 
    282            to configure them. 
    283 
    284    Attributes: 
    285        upload_url (str): The URL where the content will be uploaded. 
    286    """ 
    287 
    288    def __init__(self, upload_url, headers=None, checksum="auto", retry=DEFAULT_RETRY): 
    289        super(MultipartUpload, self).__init__(upload_url, headers=headers, retry=retry) 
    290        self._checksum_type = checksum 
    291        if self._checksum_type == "auto": 
    292            self._checksum_type = ( 
    293                "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 
    294            ) 
    295 
    296    def _prepare_request(self, data, metadata, content_type): 
    297        """Prepare the contents of an HTTP request. 
    298 
    299        This is everything that must be done before a request that doesn't 
    300        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    301        philosophy. 
    302 
    303        .. note: 
    304 
    305            This method will be used only once, so ``headers`` will be 
    306            mutated by having a new key added to it. 
    307 
    308        Args: 
    309            data (bytes): The resource content to be uploaded. 
    310            metadata (Mapping[str, str]): The resource metadata, such as an 
    311                ACL list. 
    312            content_type (str): The content type of the resource, e.g. a JPEG 
    313                image has content type ``image/jpeg``. 
    314 
    315        Returns: 
    316            Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 
    317 
    318              * HTTP verb for the request (always POST) 
    319              * the URL for the request 
    320              * the body of the request 
    321              * headers for the request 
    322 
    323        Raises: 
    324            ValueError: If the current upload has already finished. 
    325            TypeError: If ``data`` isn't bytes. 
    326 
    327        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    328        """ 
    329        if self.finished: 
    330            raise ValueError("An upload can only be used once.") 
    331 
    332        if not isinstance(data, bytes): 
    333            raise TypeError("`data` must be bytes, received", type(data)) 
    334 
    335        checksum_object = _helpers._get_checksum_object(self._checksum_type) 
    336        if checksum_object is not None: 
    337            checksum_object.update(data) 
    338            actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) 
    339            metadata_key = _helpers._get_metadata_key(self._checksum_type) 
    340            metadata[metadata_key] = actual_checksum 
    341 
    342        content, multipart_boundary = construct_multipart_request( 
    343            data, metadata, content_type 
    344        ) 
    345        multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"' 
    346        self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type 
    347 
    348        return _POST, self.upload_url, content, self._headers 
    349 
    350    def transmit(self, transport, data, metadata, content_type, timeout=None): 
    351        """Transmit the resource to be uploaded. 
    352 
    353        Args: 
    354            transport (object): An object which can make authenticated 
    355                requests. 
    356            data (bytes): The resource content to be uploaded. 
    357            metadata (Mapping[str, str]): The resource metadata, such as an 
    358                ACL list. 
    359            content_type (str): The content type of the resource, e.g. a JPEG 
    360                image has content type ``image/jpeg``. 
    361            timeout (Optional[Union[float, Tuple[float, float]]]): 
    362                The number of seconds to wait for the server response. 
    363                Depending on the retry strategy, a request may be repeated 
    364                several times using the same timeout each time. 
    365 
    366                Can also be passed as a tuple (connect_timeout, read_timeout). 
    367                See :meth:`requests.Session.request` documentation for details. 
    368 
    369        Raises: 
    370            NotImplementedError: Always, since virtual. 
    371        """ 
    372        raise NotImplementedError("This implementation is virtual.") 
    373 
    374 
    375class ResumableUpload(UploadBase): 
    376    """Initiate and fulfill a resumable upload to a Google API. 
    377 
    378    A **resumable** upload sends an initial request with the resource metadata 
    379    and then gets assigned an upload ID / upload URL to send bytes to. 
    380    Using the upload URL, the upload is then done in chunks (determined by 
    381    the user) until all bytes have been uploaded. 
    382 
    383    Args: 
    384        upload_url (str): The URL where the resumable upload will be initiated. 
    385        chunk_size (int): The size of each chunk used to upload the resource. 
    386        headers (Optional[Mapping[str, str]]): Extra headers that should 
    387            be sent with every request. 
    388        checksum Optional([str]): The type of checksum to compute to verify 
    389            the integrity of the object. After the upload is complete, the 
    390            server-computed checksum of the resulting object will be checked 
    391            and google.cloud.storage.exceptions.DataCorruption will be raised on 
    392            a mismatch. The corrupted file will not be deleted from the remote 
    393            host automatically. Supported values are "md5", "crc32c", "auto", 
    394            and None. The default is "auto", which will try to detect if the C 
    395            extension for crc32c is installed and fall back to md5 otherwise. 
    396        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    397            RPC. A None value will disable retries. A 
    398            google.api_core.retry.Retry value will enable retries, and the 
    399            object will configure backoff and timeout options. 
    400 
    401            See the retry.py source code and docstrings in this package 
    402            (google.cloud.storage.retry) for information on retry types and how 
    403            to configure them. 
    404 
    405    Attributes: 
    406        upload_url (str): The URL where the content will be uploaded. 
    407 
    408    Raises: 
    409        ValueError: If ``chunk_size`` is not a multiple of 
    410            :data:`.UPLOAD_CHUNK_SIZE`. 
    411    """ 
    412 
    413    def __init__( 
    414        self, 
    415        upload_url, 
    416        chunk_size, 
    417        checksum="auto", 
    418        headers=None, 
    419        retry=DEFAULT_RETRY, 
    420    ): 
    421        super(ResumableUpload, self).__init__(upload_url, headers=headers, retry=retry) 
    422        if chunk_size % UPLOAD_CHUNK_SIZE != 0: 
    423            raise ValueError( 
    424                "{} KB must divide chunk size".format(UPLOAD_CHUNK_SIZE / 1024) 
    425            ) 
    426        self._chunk_size = chunk_size 
    427        self._stream = None 
    428        self._content_type = None 
    429        self._bytes_uploaded = 0 
    430        self._bytes_checksummed = 0 
    431        self._checksum_type = checksum 
    432        if self._checksum_type == "auto": 
    433            self._checksum_type = ( 
    434                "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 
    435            ) 
    436        self._checksum_object = None 
    437        self._total_bytes = None 
    438        self._resumable_url = None 
    439        self._invalid = False 
    440 
    441    @property 
    442    def invalid(self): 
    443        """bool: Indicates if the upload is in an invalid state. 
    444 
    445        This will occur if a call to :meth:`transmit_next_chunk` fails. 
    446        To recover from such a failure, call :meth:`recover`. 
    447        """ 
    448        return self._invalid 
    449 
    450    @property 
    451    def chunk_size(self): 
    452        """int: The size of each chunk used to upload the resource.""" 
    453        return self._chunk_size 
    454 
    455    @property 
    456    def resumable_url(self): 
    457        """Optional[str]: The URL of the in-progress resumable upload.""" 
    458        return self._resumable_url 
    459 
    460    @property 
    461    def bytes_uploaded(self): 
    462        """int: Number of bytes that have been uploaded.""" 
    463        return self._bytes_uploaded 
    464 
    465    @property 
    466    def total_bytes(self): 
    467        """Optional[int]: The total number of bytes to be uploaded. 
    468 
    469        If this upload is initiated (via :meth:`initiate`) with 
    470        ``stream_final=True``, this value will be populated based on the size 
    471        of the ``stream`` being uploaded. (By default ``stream_final=True``.) 
    472 
    473        If this upload is initiated with ``stream_final=False``, 
    474        :attr:`total_bytes` will be :data:`None` since it cannot be 
    475        determined from the stream. 
    476        """ 
    477        return self._total_bytes 
    478 
    479    def _prepare_initiate_request( 
    480        self, 
    481        stream, 
    482        metadata, 
    483        content_type, 
    484        total_bytes=None, 
    485        stream_final=True, 
    486    ): 
    487        """Prepare the contents of HTTP request to initiate upload. 
    488 
    489        This is everything that must be done before a request that doesn't 
    490        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    491        philosophy. 
    492 
    493        Args: 
    494            stream (IO[bytes]): The stream (i.e. file-like object) that will 
    495                be uploaded. The stream **must** be at the beginning (i.e. 
    496                ``stream.tell() == 0``). 
    497            metadata (Mapping[str, str]): The resource metadata, such as an 
    498                ACL list. 
    499            content_type (str): The content type of the resource, e.g. a JPEG 
    500                image has content type ``image/jpeg``. 
    501            total_bytes (Optional[int]): The total number of bytes to be 
    502                uploaded. If specified, the upload size **will not** be 
    503                determined from the stream (even if ``stream_final=True``). 
    504            stream_final (Optional[bool]): Indicates if the ``stream`` is 
    505                "final" (i.e. no more bytes will be added to it). In this case 
    506                we determine the upload size from the size of the stream. If 
    507                ``total_bytes`` is passed, this argument will be ignored. 
    508 
    509        Returns: 
    510            Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 
    511 
    512              * HTTP verb for the request (always POST) 
    513              * the URL for the request 
    514              * the body of the request 
    515              * headers for the request 
    516 
    517        Raises: 
    518            ValueError: If the current upload has already been initiated. 
    519            ValueError: If ``stream`` is not at the beginning. 
    520 
    521        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    522        """ 
    523        if self.resumable_url is not None: 
    524            raise ValueError("This upload has already been initiated.") 
    525        if stream.tell() != 0: 
    526            raise ValueError("Stream must be at beginning.") 
    527 
    528        self._stream = stream 
    529        self._content_type = content_type 
    530 
    531        # Signed URL requires content type set directly - not through x-upload-content-type 
    532        parse_result = urllib.parse.urlparse(self.upload_url) 
    533        parsed_query = urllib.parse.parse_qs(parse_result.query) 
    534        if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query: 
    535            # Deconstruct **self._headers first so that content type defined here takes priority 
    536            headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type} 
    537        else: 
    538            # Deconstruct **self._headers first so that content type defined here takes priority 
    539            headers = { 
    540                **self._headers, 
    541                _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8", 
    542                "x-upload-content-type": content_type, 
    543            } 
    544        # Set the total bytes if possible. 
    545        if total_bytes is not None: 
    546            self._total_bytes = total_bytes 
    547        elif stream_final: 
    548            self._total_bytes = get_total_bytes(stream) 
    549        # Add the total bytes to the headers if set. 
    550        if self._total_bytes is not None: 
    551            content_length = "{:d}".format(self._total_bytes) 
    552            headers["x-upload-content-length"] = content_length 
    553 
    554        payload = json.dumps(metadata).encode("utf-8") 
    555        return _POST, self.upload_url, payload, headers 
    556 
    557    def _process_initiate_response(self, response): 
    558        """Process the response from an HTTP request that initiated upload. 
    559 
    560        This is everything that must be done after a request that doesn't 
    561        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    562        philosophy. 
    563 
    564        This method takes the URL from the ``Location`` header and stores it 
    565        for future use. Within that URL, we assume the ``upload_id`` query 
    566        parameter has been included, but we do not check. 
    567 
    568        Args: 
    569            response (object): The HTTP response object (need headers). 
    570 
    571        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    572        """ 
    573        _helpers.require_status_code( 
    574            response, 
    575            (http.client.OK, http.client.CREATED), 
    576            self._get_status_code, 
    577            callback=self._make_invalid, 
    578        ) 
    579        self._resumable_url = _helpers.header_required( 
    580            response, "location", self._get_headers 
    581        ) 
    582 
    583    def initiate( 
    584        self, 
    585        transport, 
    586        stream, 
    587        metadata, 
    588        content_type, 
    589        total_bytes=None, 
    590        stream_final=True, 
    591        timeout=None, 
    592    ): 
    593        """Initiate a resumable upload. 
    594 
    595        By default, this method assumes your ``stream`` is in a "final" 
    596        state ready to transmit. However, ``stream_final=False`` can be used 
    597        to indicate that the size of the resource is not known. This can happen 
    598        if bytes are being dynamically fed into ``stream``, e.g. if the stream 
    599        is attached to application logs. 
    600 
    601        If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be 
    602        read from the stream every time :meth:`transmit_next_chunk` is called. 
    603        If one of those reads produces strictly fewer bites than the chunk 
    604        size, the upload will be concluded. 
    605 
    606        Args: 
    607            transport (object): An object which can make authenticated 
    608                requests. 
    609            stream (IO[bytes]): The stream (i.e. file-like object) that will 
    610                be uploaded. The stream **must** be at the beginning (i.e. 
    611                ``stream.tell() == 0``). 
    612            metadata (Mapping[str, str]): The resource metadata, such as an 
    613                ACL list. 
    614            content_type (str): The content type of the resource, e.g. a JPEG 
    615                image has content type ``image/jpeg``. 
    616            total_bytes (Optional[int]): The total number of bytes to be 
    617                uploaded. If specified, the upload size **will not** be 
    618                determined from the stream (even if ``stream_final=True``). 
    619            stream_final (Optional[bool]): Indicates if the ``stream`` is 
    620                "final" (i.e. no more bytes will be added to it). In this case 
    621                we determine the upload size from the size of the stream. If 
    622                ``total_bytes`` is passed, this argument will be ignored. 
    623            timeout (Optional[Union[float, Tuple[float, float]]]): 
    624                The number of seconds to wait for the server response. 
    625                Depending on the retry strategy, a request may be repeated 
    626                several times using the same timeout each time. 
    627 
    628                Can also be passed as a tuple (connect_timeout, read_timeout). 
    629                See :meth:`requests.Session.request` documentation for details. 
    630 
    631        Raises: 
    632            NotImplementedError: Always, since virtual. 
    633        """ 
    634        raise NotImplementedError("This implementation is virtual.") 
    635 
    636    def _prepare_request(self): 
    637        """Prepare the contents of HTTP request to upload a chunk. 
    638 
    639        This is everything that must be done before a request that doesn't 
    640        require network I/O. This is based on the `sans-I/O`_ philosophy. 
    641 
    642        For the time being, this **does require** some form of I/O to read 
    643        a chunk from ``stream`` (via :func:`get_next_chunk`). However, this 
    644        will (almost) certainly not be network I/O. 
    645 
    646        Returns: 
    647            Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 
    648 
    649              * HTTP verb for the request (always PUT) 
    650              * the URL for the request 
    651              * the body of the request 
    652              * headers for the request 
    653 
    654            The headers incorporate the ``_headers`` on the current instance. 
    655 
    656        Raises: 
    657            ValueError: If the current upload has finished. 
    658            ValueError: If the current upload is in an invalid state. 
    659            ValueError: If the current upload has not been initiated. 
    660            ValueError: If the location in the stream (i.e. ``stream.tell()``) 
    661                does not agree with ``bytes_uploaded``. 
    662 
    663        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    664        """ 
    665        if self.finished: 
    666            raise ValueError("Upload has finished.") 
    667        if self.invalid: 
    668            raise ValueError( 
    669                "Upload is in an invalid state. To recover call `recover()`." 
    670            ) 
    671        if self.resumable_url is None: 
    672            raise ValueError( 
    673                "This upload has not been initiated. Please call " 
    674                "initiate() before beginning to transmit chunks." 
    675            ) 
    676 
    677        start_byte, payload, content_range = get_next_chunk( 
    678            self._stream, self._chunk_size, self._total_bytes 
    679        ) 
    680        if start_byte != self.bytes_uploaded: 
    681            msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded) 
    682            raise ValueError(msg) 
    683 
    684        self._update_checksum(start_byte, payload) 
    685 
    686        headers = { 
    687            **self._headers, 
    688            _CONTENT_TYPE_HEADER: self._content_type, 
    689            _helpers.CONTENT_RANGE_HEADER: content_range, 
    690        } 
    691        return _PUT, self.resumable_url, payload, headers 
    692 
    693    def _update_checksum(self, start_byte, payload): 
    694        """Update the checksum with the payload if not already updated. 
    695 
    696        Because error recovery can result in bytes being transmitted more than 
    697        once, the checksum tracks the number of bytes checked in 
    698        self._bytes_checksummed and skips bytes that have already been summed. 
    699        """ 
    700        if not self._checksum_type: 
    701            return 
    702 
    703        if not self._checksum_object: 
    704            self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 
    705 
    706        if start_byte < self._bytes_checksummed: 
    707            offset = self._bytes_checksummed - start_byte 
    708            data = payload[offset:] 
    709        else: 
    710            data = payload 
    711 
    712        self._checksum_object.update(data) 
    713        self._bytes_checksummed += len(data) 
    714 
    715    def _make_invalid(self): 
    716        """Simple setter for ``invalid``. 
    717 
    718        This is intended to be passed along as a callback to helpers that 
    719        raise an exception so they can mark this instance as invalid before 
    720        raising. 
    721        """ 
    722        self._invalid = True 
    723 
    724    def _process_resumable_response(self, response, bytes_sent): 
    725        """Process the response from an HTTP request. 
    726 
    727        This is everything that must be done after a request that doesn't 
    728        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    729        philosophy. 
    730 
    731        Args: 
    732            response (object): The HTTP response object. 
    733            bytes_sent (int): The number of bytes sent in the request that 
    734                ``response`` was returned for. 
    735 
    736        Raises: 
    737            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    738                code is 308 and the ``range`` header is not of the form 
    739                ``bytes 0-{end}``. 
    740            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    741                code is not 200 or 308. 
    742 
    743        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    744        """ 
    745        status_code = _helpers.require_status_code( 
    746            response, 
    747            (http.client.OK, http.client.PERMANENT_REDIRECT), 
    748            self._get_status_code, 
    749            callback=self._make_invalid, 
    750        ) 
    751        if status_code == http.client.OK: 
    752            # NOTE: We use the "local" information of ``bytes_sent`` to update 
    753            #       ``bytes_uploaded``, but do not verify this against other 
    754            #       state. However, there may be some other information: 
    755            # 
    756            #       * a ``size`` key in JSON response body 
    757            #       * the ``total_bytes`` attribute (if set) 
    758            #       * ``stream.tell()`` (relying on fact that ``initiate()`` 
    759            #         requires stream to be at the beginning) 
    760            self._bytes_uploaded = self._bytes_uploaded + bytes_sent 
    761            # Tombstone the current upload so it cannot be used again. 
    762            self._finished = True 
    763            # Validate the checksum. This can raise an exception on failure. 
    764            self._validate_checksum(response) 
    765        else: 
    766            bytes_range = _helpers.header_required( 
    767                response, 
    768                _helpers.RANGE_HEADER, 
    769                self._get_headers, 
    770                callback=self._make_invalid, 
    771            ) 
    772            match = _BYTES_RANGE_RE.match(bytes_range) 
    773            if match is None: 
    774                self._make_invalid() 
    775                raise InvalidResponse( 
    776                    response, 
    777                    'Unexpected "range" header', 
    778                    bytes_range, 
    779                    'Expected to be of the form "bytes=0-{end}"', 
    780                ) 
    781            self._bytes_uploaded = int(match.group("end_byte")) + 1 
    782 
    783    def _validate_checksum(self, response): 
    784        """Check the computed checksum, if any, against the recieved metadata. 
    785 
    786        Args: 
    787            response (object): The HTTP response object. 
    788 
    789        Raises: 
    790            ~google.cloud.storage.exceptions.DataCorruption: If the checksum 
    791            computed locally and the checksum reported by the remote host do 
    792            not match. 
    793        """ 
    794        if self._checksum_type is None: 
    795            return 
    796        metadata_key = _helpers._get_metadata_key(self._checksum_type) 
    797        metadata = response.json() 
    798        remote_checksum = metadata.get(metadata_key) 
    799        if remote_checksum is None: 
    800            raise InvalidResponse( 
    801                response, 
    802                _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 
    803                self._get_headers(response), 
    804            ) 
    805        local_checksum = _helpers.prepare_checksum_digest( 
    806            self._checksum_object.digest() 
    807        ) 
    808        if local_checksum != remote_checksum: 
    809            raise DataCorruption( 
    810                response, 
    811                _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 
    812                    self._checksum_type.upper(), local_checksum, remote_checksum 
    813                ), 
    814            ) 
    815 
    816    def transmit_next_chunk(self, transport, timeout=None): 
    817        """Transmit the next chunk of the resource to be uploaded. 
    818 
    819        If the current upload was initiated with ``stream_final=False``, 
    820        this method will dynamically determine if the upload has completed. 
    821        The upload will be considered complete if the stream produces 
    822        fewer than :attr:`chunk_size` bytes when a chunk is read from it. 
    823 
    824        Args: 
    825            transport (object): An object which can make authenticated 
    826                requests. 
    827            timeout (Optional[Union[float, Tuple[float, float]]]): 
    828                The number of seconds to wait for the server response. 
    829                Depending on the retry strategy, a request may be repeated 
    830                several times using the same timeout each time. 
    831 
    832                Can also be passed as a tuple (connect_timeout, read_timeout). 
    833                See :meth:`requests.Session.request` documentation for details. 
    834 
    835        Raises: 
    836            NotImplementedError: Always, since virtual. 
    837        """ 
    838        raise NotImplementedError("This implementation is virtual.") 
    839 
    840    def _prepare_recover_request(self): 
    841        """Prepare the contents of HTTP request to recover from failure. 
    842 
    843        This is everything that must be done before a request that doesn't 
    844        require network I/O. This is based on the `sans-I/O`_ philosophy. 
    845 
    846        We assume that the :attr:`resumable_url` is set (i.e. the only way 
    847        the upload can end up :attr:`invalid` is if it has been initiated. 
    848 
    849        Returns: 
    850            Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 
    851 
    852              * HTTP verb for the request (always PUT) 
    853              * the URL for the request 
    854              * the body of the request (always :data:`None`) 
    855              * headers for the request 
    856 
    857            The headers **do not** incorporate the ``_headers`` on the 
    858            current instance. 
    859 
    860        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    861        """ 
    862        headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"} 
    863        return _PUT, self.resumable_url, None, headers 
    864 
    865    def _process_recover_response(self, response): 
    866        """Process the response from an HTTP request to recover from failure. 
    867 
    868        This is everything that must be done after a request that doesn't 
    869        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    870        philosophy. 
    871 
    872        Args: 
    873            response (object): The HTTP response object. 
    874 
    875        Raises: 
    876            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    877                code is not 308. 
    878            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    879                code is 308 and the ``range`` header is not of the form 
    880                ``bytes 0-{end}``. 
    881 
    882        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    883        """ 
    884        _helpers.require_status_code( 
    885            response, (http.client.PERMANENT_REDIRECT,), self._get_status_code 
    886        ) 
    887        headers = self._get_headers(response) 
    888        if _helpers.RANGE_HEADER in headers: 
    889            bytes_range = headers[_helpers.RANGE_HEADER] 
    890            match = _BYTES_RANGE_RE.match(bytes_range) 
    891            if match is None: 
    892                raise InvalidResponse( 
    893                    response, 
    894                    'Unexpected "range" header', 
    895                    bytes_range, 
    896                    'Expected to be of the form "bytes=0-{end}"', 
    897                ) 
    898            self._bytes_uploaded = int(match.group("end_byte")) + 1 
    899        else: 
    900            # In this case, the upload has not "begun". 
    901            self._bytes_uploaded = 0 
    902 
    903        self._stream.seek(self._bytes_uploaded) 
    904        self._invalid = False 
    905 
    906    def recover(self, transport): 
    907        """Recover from a failure. 
    908 
    909        This method should be used when a :class:`ResumableUpload` is in an 
    910        :attr:`~ResumableUpload.invalid` state due to a request failure. 
    911 
    912        This will verify the progress with the server and make sure the 
    913        current upload is in a valid state before :meth:`transmit_next_chunk` 
    914        can be used again. 
    915 
    916        Args: 
    917            transport (object): An object which can make authenticated 
    918                requests. 
    919 
    920        Raises: 
    921            NotImplementedError: Always, since virtual. 
    922        """ 
    923        raise NotImplementedError("This implementation is virtual.") 
    924 
    925 
    926class XMLMPUContainer(UploadBase): 
    927    """Initiate and close an upload using the XML MPU API. 
    928 
    929    An XML MPU sends an initial request and then receives an upload ID. 
    930    Using the upload ID, the upload is then done in numbered parts and the 
    931    parts can be uploaded concurrently. 
    932 
    933    In order to avoid concurrency issues with this container object, the 
    934    uploading of individual parts is handled separately, by XMLMPUPart objects 
    935    spawned from this container class. The XMLMPUPart objects are not 
    936    necessarily in the same process as the container, so they do not update the 
    937    container automatically. 
    938 
    939    MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 
    940    given the JSON multipart upload, so the abbreviation "MPU" will be used 
    941    throughout. 
    942 
    943    See: https://cloud.google.com/storage/docs/multipart-uploads 
    944 
    945    Args: 
    946        upload_url (str): The URL of the object (without query parameters). The 
    947            initiate, PUT, and finalization requests will all use this URL, with 
    948            varying query parameters. 
    949        filename (str): The name (path) of the file to upload. 
    950        headers (Optional[Mapping[str, str]]): Extra headers that should 
    951            be sent with every request. 
    952        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    953            RPC. A None value will disable retries. A 
    954            google.api_core.retry.Retry value will enable retries, and the 
    955            object will configure backoff and timeout options. 
    956 
    957            See the retry.py source code and docstrings in this package 
    958            (google.cloud.storage.retry) for information on retry types and how 
    959            to configure them. 
    960 
    961    Attributes: 
    962        upload_url (str): The URL where the content will be uploaded. 
    963        upload_id (Optional(str)): The ID of the upload from the initialization 
    964            response. 
    965    """ 
    966 
    967    def __init__( 
    968        self, 
    969        upload_url, 
    970        filename, 
    971        headers=None, 
    972        upload_id=None, 
    973        retry=DEFAULT_RETRY, 
    974    ): 
    975        super().__init__(upload_url, headers=headers, retry=retry) 
    976        self._filename = filename 
    977        self._upload_id = upload_id 
    978        self._parts = {} 
    979 
    980    @property 
    981    def upload_id(self): 
    982        return self._upload_id 
    983 
    984    def register_part(self, part_number, etag): 
    985        """Register an uploaded part by part number and corresponding etag. 
    986 
    987        XMLMPUPart objects represent individual parts, and their part number 
    988        and etag can be registered to the container object with this method 
    989        and therefore incorporated in the finalize() call to finish the upload. 
    990 
    991        This method accepts part_number and etag, but not XMLMPUPart objects 
    992        themselves, to reduce the complexity involved in running XMLMPUPart 
    993        uploads in separate processes. 
    994 
    995        Args: 
    996            part_number (int): The part number. Parts are assembled into the 
    997                final uploaded object with finalize() in order of their part 
    998                numbers. 
    999            etag (str): The etag included in the server response after upload. 
    1000        """ 
    1001        self._parts[part_number] = etag 
    1002 
    1003    def _prepare_initiate_request(self, content_type): 
    1004        """Prepare the contents of HTTP request to initiate upload. 
    1005 
    1006        This is everything that must be done before a request that doesn't 
    1007        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    1008        philosophy. 
    1009 
    1010        Args: 
    1011            content_type (str): The content type of the resource, e.g. a JPEG 
    1012                image has content type ``image/jpeg``. 
    1013 
    1014        Returns: 
    1015            Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 
    1016 
    1017              * HTTP verb for the request (always POST) 
    1018              * the URL for the request 
    1019              * the body of the request 
    1020              * headers for the request 
    1021 
    1022        Raises: 
    1023            ValueError: If the current upload has already been initiated. 
    1024 
    1025        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    1026        """ 
    1027        if self.upload_id is not None: 
    1028            raise ValueError("This upload has already been initiated.") 
    1029 
    1030        initiate_url = self.upload_url + _MPU_INITIATE_QUERY 
    1031 
    1032        headers = { 
    1033            **self._headers, 
    1034            _CONTENT_TYPE_HEADER: content_type, 
    1035        } 
    1036        return _POST, initiate_url, None, headers 
    1037 
    1038    def _process_initiate_response(self, response): 
    1039        """Process the response from an HTTP request that initiated the upload. 
    1040 
    1041        This is everything that must be done after a request that doesn't 
    1042        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    1043        philosophy. 
    1044 
    1045        This method takes the URL from the ``Location`` header and stores it 
    1046        for future use. Within that URL, we assume the ``upload_id`` query 
    1047        parameter has been included, but we do not check. 
    1048 
    1049        Args: 
    1050            response (object): The HTTP response object. 
    1051 
    1052        Raises: 
    1053            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    1054                code is not 200. 
    1055 
    1056        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    1057        """ 
    1058        _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 
    1059        root = ElementTree.fromstring(response.text) 
    1060        self._upload_id = root.find(_S3_COMPAT_XML_NAMESPACE + _UPLOAD_ID_NODE).text 
    1061 
    1062    def initiate( 
    1063        self, 
    1064        transport, 
    1065        content_type, 
    1066        timeout=None, 
    1067    ): 
    1068        """Initiate an MPU and record the upload ID. 
    1069 
    1070        Args: 
    1071            transport (object): An object which can make authenticated 
    1072                requests. 
    1073            content_type (str): The content type of the resource, e.g. a JPEG 
    1074                image has content type ``image/jpeg``. 
    1075            timeout (Optional[Union[float, Tuple[float, float]]]): 
    1076                The number of seconds to wait for the server response. 
    1077                Depending on the retry strategy, a request may be repeated 
    1078                several times using the same timeout each time. 
    1079 
    1080                Can also be passed as a tuple (connect_timeout, read_timeout). 
    1081                See :meth:`requests.Session.request` documentation for details. 
    1082 
    1083        Raises: 
    1084            NotImplementedError: Always, since virtual. 
    1085        """ 
    1086        raise NotImplementedError("This implementation is virtual.") 
    1087 
    1088    def _prepare_finalize_request(self): 
    1089        """Prepare the contents of an HTTP request to finalize the upload. 
    1090 
    1091        All of the parts must be registered before calling this method. 
    1092 
    1093        Returns: 
    1094            Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 
    1095 
    1096              * HTTP verb for the request (always POST) 
    1097              * the URL for the request 
    1098              * the body of the request 
    1099              * headers for the request 
    1100 
    1101        Raises: 
    1102            ValueError: If the upload has not been initiated. 
    1103        """ 
    1104        if self.upload_id is None: 
    1105            raise ValueError("This upload has not yet been initiated.") 
    1106 
    1107        final_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 
    1108        finalize_url = self.upload_url + final_query 
    1109        final_xml_root = ElementTree.Element("CompleteMultipartUpload") 
    1110        for part_number, etag in self._parts.items(): 
    1111            part = ElementTree.SubElement(final_xml_root, "Part")  # put in a loop 
    1112            ElementTree.SubElement(part, "PartNumber").text = str(part_number) 
    1113            ElementTree.SubElement(part, "ETag").text = etag 
    1114        payload = ElementTree.tostring(final_xml_root) 
    1115        return _POST, finalize_url, payload, self._headers 
    1116 
    1117    def _process_finalize_response(self, response): 
    1118        """Process the response from an HTTP request that finalized the upload. 
    1119 
    1120        This is everything that must be done after a request that doesn't 
    1121        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    1122        philosophy. 
    1123 
    1124        Args: 
    1125            response (object): The HTTP response object. 
    1126 
    1127        Raises: 
    1128            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    1129                code is not 200. 
    1130 
    1131        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    1132        """ 
    1133 
    1134        _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 
    1135        self._finished = True 
    1136 
    1137    def finalize( 
    1138        self, 
    1139        transport, 
    1140        timeout=None, 
    1141    ): 
    1142        """Finalize an MPU request with all the parts. 
    1143 
    1144        Args: 
    1145            transport (object): An object which can make authenticated 
    1146                requests. 
    1147            timeout (Optional[Union[float, Tuple[float, float]]]): 
    1148                The number of seconds to wait for the server response. 
    1149                Depending on the retry strategy, a request may be repeated 
    1150                several times using the same timeout each time. 
    1151 
    1152                Can also be passed as a tuple (connect_timeout, read_timeout). 
    1153                See :meth:`requests.Session.request` documentation for details. 
    1154 
    1155        Raises: 
    1156            NotImplementedError: Always, since virtual. 
    1157        """ 
    1158        raise NotImplementedError("This implementation is virtual.") 
    1159 
    1160    def _prepare_cancel_request(self): 
    1161        """Prepare the contents of an HTTP request to cancel the upload. 
    1162 
    1163        Returns: 
    1164            Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 
    1165 
    1166              * HTTP verb for the request (always DELETE) 
    1167              * the URL for the request 
    1168              * the body of the request 
    1169              * headers for the request 
    1170 
    1171        Raises: 
    1172            ValueError: If the upload has not been initiated. 
    1173        """ 
    1174        if self.upload_id is None: 
    1175            raise ValueError("This upload has not yet been initiated.") 
    1176 
    1177        cancel_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 
    1178        cancel_url = self.upload_url + cancel_query 
    1179        return _DELETE, cancel_url, None, self._headers 
    1180 
    1181    def _process_cancel_response(self, response): 
    1182        """Process the response from an HTTP request that canceled the upload. 
    1183 
    1184        This is everything that must be done after a request that doesn't 
    1185        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    1186        philosophy. 
    1187 
    1188        Args: 
    1189            response (object): The HTTP response object. 
    1190 
    1191        Raises: 
    1192            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    1193                code is not 204. 
    1194 
    1195        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    1196        """ 
    1197 
    1198        _helpers.require_status_code( 
    1199            response, (http.client.NO_CONTENT,), self._get_status_code 
    1200        ) 
    1201 
    1202    def cancel( 
    1203        self, 
    1204        transport, 
    1205        timeout=None, 
    1206    ): 
    1207        """Cancel an MPU request and permanently delete any uploaded parts. 
    1208 
    1209        This cannot be undone. 
    1210 
    1211        Args: 
    1212            transport (object): An object which can make authenticated 
    1213                requests. 
    1214            timeout (Optional[Union[float, Tuple[float, float]]]): 
    1215                The number of seconds to wait for the server response. 
    1216                Depending on the retry strategy, a request may be repeated 
    1217                several times using the same timeout each time. 
    1218 
    1219                Can also be passed as a tuple (connect_timeout, read_timeout). 
    1220                See :meth:`requests.Session.request` documentation for details. 
    1221 
    1222        Raises: 
    1223            NotImplementedError: Always, since virtual. 
    1224        """ 
    1225        raise NotImplementedError("This implementation is virtual.") 
    1226 
    1227 
    1228class XMLMPUPart(UploadBase): 
    1229    """Upload a single part of an existing XML MPU container. 
    1230 
    1231    An XML MPU sends an initial request and then receives an upload ID. 
    1232    Using the upload ID, the upload is then done in numbered parts and the 
    1233    parts can be uploaded concurrently. 
    1234 
    1235    In order to avoid concurrency issues with the container object, the 
    1236    uploading of individual parts is handled separately by multiple objects 
    1237    of this class. Once a part is uploaded, it can be registered with the 
    1238    container with `container.register_part(part.part_number, part.etag)`. 
    1239 
    1240    MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 
    1241    given the JSON multipart upload, so the abbreviation "MPU" will be used 
    1242    throughout. 
    1243 
    1244    See: https://cloud.google.com/storage/docs/multipart-uploads 
    1245 
    1246    Args: 
    1247        upload_url (str): The URL of the object (without query parameters). 
    1248        upload_id (str): The ID of the upload from the initialization response. 
    1249        filename (str): The name (path) of the file to upload. 
    1250        start (int): The byte index of the beginning of the part. 
    1251        end (int): The byte index of the end of the part. 
    1252        part_number (int): The part number. Part numbers will be assembled in 
    1253            sequential order when the container is finalized. 
    1254        headers (Optional[Mapping[str, str]]): Extra headers that should 
    1255            be sent with every request. 
    1256        checksum (Optional([str])): The type of checksum to compute to verify 
    1257            the integrity of the object. The request headers will be amended 
    1258            to include the computed value. Supported values are "md5", "crc32c", 
    1259            "auto" and None. The default is "auto", which will try to detect if 
    1260            the C extension for crc32c is installed and fall back to md5 
    1261            otherwise. 
    1262        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    1263            RPC. A None value will disable retries. A 
    1264            google.api_core.retry.Retry value will enable retries, and the 
    1265            object will configure backoff and timeout options. 
    1266 
    1267            See the retry.py source code and docstrings in this package 
    1268            (google.cloud.storage.retry) for information on retry types and how 
    1269            to configure them. 
    1270 
    1271    Attributes: 
    1272        upload_url (str): The URL of the object (without query parameters). 
    1273        upload_id (str): The ID of the upload from the initialization response. 
    1274        filename (str): The name (path) of the file to upload. 
    1275        start (int): The byte index of the beginning of the part. 
    1276        end (int): The byte index of the end of the part. 
    1277        part_number (int): The part number. Part numbers will be assembled in 
    1278            sequential order when the container is finalized. 
    1279        etag (Optional(str)): The etag returned by the service after upload. 
    1280    """ 
    1281 
    1282    def __init__( 
    1283        self, 
    1284        upload_url, 
    1285        upload_id, 
    1286        filename, 
    1287        start, 
    1288        end, 
    1289        part_number, 
    1290        headers=None, 
    1291        checksum="auto", 
    1292        retry=DEFAULT_RETRY, 
    1293    ): 
    1294        super().__init__(upload_url, headers=headers, retry=retry) 
    1295        self._filename = filename 
    1296        self._start = start 
    1297        self._end = end 
    1298        self._upload_id = upload_id 
    1299        self._part_number = part_number 
    1300        self._etag = None 
    1301        self._checksum_type = checksum 
    1302        if self._checksum_type == "auto": 
    1303            self._checksum_type = ( 
    1304                "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 
    1305            ) 
    1306        self._checksum_object = None 
    1307 
    1308    @property 
    1309    def part_number(self): 
    1310        return self._part_number 
    1311 
    1312    @property 
    1313    def upload_id(self): 
    1314        return self._upload_id 
    1315 
    1316    @property 
    1317    def filename(self): 
    1318        return self._filename 
    1319 
    1320    @property 
    1321    def etag(self): 
    1322        return self._etag 
    1323 
    1324    @property 
    1325    def start(self): 
    1326        return self._start 
    1327 
    1328    @property 
    1329    def end(self): 
    1330        return self._end 
    1331 
    1332    def _prepare_upload_request(self): 
    1333        """Prepare the contents of HTTP request to upload a part. 
    1334 
    1335        This is everything that must be done before a request that doesn't 
    1336        require network I/O. This is based on the `sans-I/O`_ philosophy. 
    1337 
    1338        For the time being, this **does require** some form of I/O to read 
    1339        a part from ``stream`` (via :func:`get_part_payload`). However, this 
    1340        will (almost) certainly not be network I/O. 
    1341 
    1342        Returns: 
    1343            Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 
    1344 
    1345              * HTTP verb for the request (always PUT) 
    1346              * the URL for the request 
    1347              * the body of the request 
    1348              * headers for the request 
    1349 
    1350            The headers incorporate the ``_headers`` on the current instance. 
    1351 
    1352        Raises: 
    1353            ValueError: If the current upload has finished. 
    1354 
    1355        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    1356        """ 
    1357        if self.finished: 
    1358            raise ValueError("This part has already been uploaded.") 
    1359 
    1360        with open(self._filename, "br") as f: 
    1361            f.seek(self._start) 
    1362            payload = f.read(self._end - self._start) 
    1363 
    1364        self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 
    1365        if self._checksum_object is not None: 
    1366            self._checksum_object.update(payload) 
    1367 
    1368        part_query = _MPU_PART_QUERY_TEMPLATE.format( 
    1369            part=self._part_number, upload_id=self._upload_id 
    1370        ) 
    1371        upload_url = self.upload_url + part_query 
    1372        return _PUT, upload_url, payload, self._headers 
    1373 
    1374    def _process_upload_response(self, response): 
    1375        """Process the response from an HTTP request. 
    1376 
    1377        This is everything that must be done after a request that doesn't 
    1378        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    1379        philosophy. 
    1380 
    1381        Args: 
    1382            response (object): The HTTP response object. 
    1383 
    1384        Raises: 
    1385            ~google.cloud.storage.exceptions.InvalidResponse: If the status 
    1386                code is not 200 or the response is missing data. 
    1387 
    1388        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    1389        """ 
    1390        # Data corruption errors shouldn't be considered as invalid responses, 
    1391        # So we handle them earlier than call to `_helpers.require_status_code`. 
    1392        # If the response is 400, we check for data corruption errors. 
    1393        if response.status_code == 400: 
    1394            root = ElementTree.fromstring(response.text) 
    1395            error_code = root.find("Code").text 
    1396            error_message = root.find("Message").text 
    1397            error_details = root.find("Details").text 
    1398            if error_code in ["InvalidDigest", "BadDigest", "CrcMismatch"]: 
    1399                raise DataCorruption( 
    1400                    response, 
    1401                    ( 
    1402                        "Checksum mismatch: checksum calculated by client and" 
    1403                        " server did not match. Error code: {error_code}," 
    1404                        " Error message: {error_message}," 
    1405                        " Error details: {error_details}" 
    1406                    ).format( 
    1407                        error_code=error_code, 
    1408                        error_message=error_message, 
    1409                        error_details=error_details, 
    1410                    ), 
    1411                ) 
    1412 
    1413        _helpers.require_status_code( 
    1414            response, 
    1415            (http.client.OK,), 
    1416            self._get_status_code, 
    1417        ) 
    1418 
    1419        self._validate_checksum(response) 
    1420 
    1421        etag = _helpers.header_required(response, "etag", self._get_headers) 
    1422        self._etag = etag 
    1423        self._finished = True 
    1424 
    1425    def upload( 
    1426        self, 
    1427        transport, 
    1428        timeout=None, 
    1429    ): 
    1430        """Upload the part. 
    1431 
    1432        Args: 
    1433            transport (object): An object which can make authenticated 
    1434                requests. 
    1435            timeout (Optional[Union[float, Tuple[float, float]]]): 
    1436                The number of seconds to wait for the server response. 
    1437                Depending on the retry strategy, a request may be repeated 
    1438                several times using the same timeout each time. 
    1439 
    1440                Can also be passed as a tuple (connect_timeout, read_timeout). 
    1441                See :meth:`requests.Session.request` documentation for details. 
    1442 
    1443        Raises: 
    1444            NotImplementedError: Always, since virtual. 
    1445        """ 
    1446        raise NotImplementedError("This implementation is virtual.") 
    1447 
    1448    def _validate_checksum(self, response): 
    1449        """Check the computed checksum, if any, against the response headers. 
    1450 
    1451        Args: 
    1452            response (object): The HTTP response object. 
    1453 
    1454        Raises: 
    1455            ~google.cloud.storage.exceptions.DataCorruption: If the checksum 
    1456            computed locally and the checksum reported by the remote host do 
    1457            not match. 
    1458        """ 
    1459        if self._checksum_type is None: 
    1460            return 
    1461 
    1462        remote_checksum = _helpers._get_uploaded_checksum_from_headers( 
    1463            response, self._get_headers, self._checksum_type 
    1464        ) 
    1465 
    1466        if remote_checksum is None: 
    1467            metadata_key = _helpers._get_metadata_key(self._checksum_type) 
    1468            raise InvalidResponse( 
    1469                response, 
    1470                _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 
    1471                self._get_headers(response), 
    1472            ) 
    1473        local_checksum = _helpers.prepare_checksum_digest( 
    1474            self._checksum_object.digest() 
    1475        ) 
    1476        if local_checksum != remote_checksum: 
    1477            raise DataCorruption( 
    1478                response, 
    1479                _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 
    1480                    self._checksum_type.upper(), local_checksum, remote_checksum 
    1481                ), 
    1482            ) 
    1483 
    1484 
    1485def get_boundary(): 
    1486    """Get a random boundary for a multipart request. 
    1487 
    1488    Returns: 
    1489        bytes: The boundary used to separate parts of a multipart request. 
    1490    """ 
    1491    random_int = random.randrange(sys.maxsize) 
    1492    boundary = _BOUNDARY_FORMAT.format(random_int) 
    1493    # NOTE: Neither % formatting nor .format() are available for byte strings 
    1494    #       in Python 3.4, so we must use unicode strings as templates. 
    1495    return boundary.encode("utf-8") 
    1496 
    1497 
    1498def construct_multipart_request(data, metadata, content_type): 
    1499    """Construct a multipart request body. 
    1500 
    1501    Args: 
    1502        data (bytes): The resource content (UTF-8 encoded as bytes) 
    1503            to be uploaded. 
    1504        metadata (Mapping[str, str]): The resource metadata, such as an 
    1505            ACL list. 
    1506        content_type (str): The content type of the resource, e.g. a JPEG 
    1507            image has content type ``image/jpeg``. 
    1508 
    1509    Returns: 
    1510        Tuple[bytes, bytes]: The multipart request body and the boundary used 
    1511        between each part. 
    1512    """ 
    1513    multipart_boundary = get_boundary() 
    1514    json_bytes = json.dumps(metadata).encode("utf-8") 
    1515    content_type = content_type.encode("utf-8") 
    1516    # Combine the two parts into a multipart payload. 
    1517    # NOTE: We'd prefer a bytes template but are restricted by Python 3.4. 
    1518    boundary_sep = _MULTIPART_SEP + multipart_boundary 
    1519    content = ( 
    1520        boundary_sep 
    1521        + _MULTIPART_BEGIN 
    1522        + json_bytes 
    1523        + _CRLF 
    1524        + boundary_sep 
    1525        + _CRLF 
    1526        + b"content-type: " 
    1527        + content_type 
    1528        + _CRLF 
    1529        + _CRLF 
    1530        + data  # Empty line between headers and body. 
    1531        + _CRLF 
    1532        + boundary_sep 
    1533        + _MULTIPART_SEP 
    1534    ) 
    1535 
    1536    return content, multipart_boundary 
    1537 
    1538 
    1539def get_total_bytes(stream): 
    1540    """Determine the total number of bytes in a stream. 
    1541 
    1542    Args: 
    1543       stream (IO[bytes]): The stream (i.e. file-like object). 
    1544 
    1545    Returns: 
    1546        int: The number of bytes. 
    1547    """ 
    1548    current_position = stream.tell() 
    1549    # NOTE: ``.seek()`` **should** return the same value that ``.tell()`` 
    1550    #       returns, but in Python 2, ``file`` objects do not. 
    1551    stream.seek(0, os.SEEK_END) 
    1552    end_position = stream.tell() 
    1553    # Go back to the initial position. 
    1554    stream.seek(current_position) 
    1555 
    1556    return end_position 
    1557 
    1558 
    1559def get_next_chunk(stream, chunk_size, total_bytes): 
    1560    """Get a chunk from an I/O stream. 
    1561 
    1562    The ``stream`` may have fewer bytes remaining than ``chunk_size`` 
    1563    so it may not always be the case that 
    1564    ``end_byte == start_byte + chunk_size - 1``. 
    1565 
    1566    Args: 
    1567        stream (IO[bytes]): The stream (i.e. file-like object). 
    1568        chunk_size (int): The size of the chunk to be read from the ``stream``. 
    1569        total_bytes (Optional[int]): The (expected) total number of bytes 
    1570            in the ``stream``. 
    1571 
    1572    Returns: 
    1573        Tuple[int, bytes, str]: Triple of: 
    1574 
    1575          * the start byte index 
    1576          * the content in between the start and end bytes (inclusive) 
    1577          * content range header for the chunk (slice) that has been read 
    1578 
    1579    Raises: 
    1580        ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields 
    1581            non-empty content. 
    1582        ValueError: If there is no data left to consume. This corresponds 
    1583            exactly to the case ``end_byte < start_byte``, which can only 
    1584            occur if ``end_byte == start_byte - 1``. 
    1585    """ 
    1586    start_byte = stream.tell() 
    1587    if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0: 
    1588        payload = stream.read(total_bytes - start_byte) 
    1589    else: 
    1590        payload = stream.read(chunk_size) 
    1591    end_byte = stream.tell() - 1 
    1592 
    1593    num_bytes_read = len(payload) 
    1594    if total_bytes is None: 
    1595        if num_bytes_read < chunk_size: 
    1596            # We now **KNOW** the total number of bytes. 
    1597            total_bytes = end_byte + 1 
    1598    elif total_bytes == 0: 
    1599        # NOTE: We also expect ``start_byte == 0`` here but don't check 
    1600        #       because ``_prepare_initiate_request()`` requires the 
    1601        #       stream to be at the beginning. 
    1602        if num_bytes_read != 0: 
    1603            raise ValueError( 
    1604                "Stream specified as empty, but produced non-empty content." 
    1605            ) 
    1606    else: 
    1607        if num_bytes_read == 0: 
    1608            raise ValueError( 
    1609                "Stream is already exhausted. There is no content remaining." 
    1610            ) 
    1611 
    1612    content_range = get_content_range(start_byte, end_byte, total_bytes) 
    1613    return start_byte, payload, content_range 
    1614 
    1615 
    1616def get_content_range(start_byte, end_byte, total_bytes): 
    1617    """Convert start, end and total into content range header. 
    1618 
    1619    If ``total_bytes`` is not known, uses "bytes {start}-{end}/*". 
    1620    If we are dealing with an empty range (i.e. ``end_byte < start_byte``) 
    1621    then "bytes */{total}" is used. 
    1622 
    1623    This function **ASSUMES** that if the size is not known, the caller will 
    1624    not also pass an empty range. 
    1625 
    1626    Args: 
    1627        start_byte (int): The start (inclusive) of the byte range. 
    1628        end_byte (int): The end (inclusive) of the byte range. 
    1629        total_bytes (Optional[int]): The number of bytes in the byte 
    1630            range (if known). 
    1631 
    1632    Returns: 
    1633        str: The content range header. 
    1634    """ 
    1635    if total_bytes is None: 
    1636        return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte) 
    1637    elif end_byte < start_byte: 
    1638        return _EMPTY_RANGE_TEMPLATE.format(total_bytes) 
    1639    else: 
    1640        return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)