1# Copyright 2017 Google Inc. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Support for downloading media from Google APIs.""" 
    16 
    17import urllib3.response  # type: ignore 
    18import http 
    19 
    20from google.cloud.storage._media import _download 
    21from google.cloud.storage._media import _helpers 
    22from google.cloud.storage._media.requests import _request_helpers 
    23from google.cloud.storage.exceptions import DataCorruption 
    24 
    25_CHECKSUM_MISMATCH = """\ 
    26Checksum mismatch while downloading: 
    27 
    28  {} 
    29 
    30The X-Goog-Hash header indicated an {checksum_type} checksum of: 
    31 
    32  {} 
    33 
    34but the actual {checksum_type} checksum of the downloaded contents was: 
    35 
    36  {} 
    37""" 
    38 
    39_STREAM_SEEK_ERROR = """\ 
    40Incomplete download for: 
    41{} 
    42Error writing to stream while handling a gzip-compressed file download. 
    43Please restart the download. 
    44""" 
    45 
    46_RESPONSE_HEADERS_INFO = """\ 
    47The X-Goog-Stored-Content-Length is {}. The X-Goog-Stored-Content-Encoding is {}. 
    48The download request read {} bytes of data. 
    49If the download was incomplete, please check the network connection and restart the download. 
    50""" 
    51 
    52 
    53class Download(_request_helpers.RequestsMixin, _download.Download): 
    54    """Helper to manage downloading a resource from a Google API. 
    55 
    56    "Slices" of the resource can be retrieved by specifying a range 
    57    with ``start`` and / or ``end``. However, in typical usage, neither 
    58    ``start`` nor ``end`` is expected to be provided. 
    59 
    60    Args: 
    61        media_url (str): The URL containing the media to be downloaded. 
    62        stream (IO[bytes]): A write-able stream (i.e. file-like object) that 
    63            the downloaded resource can be written to. 
    64        start (int): The first byte in a range to be downloaded. If not 
    65            provided, but ``end`` is provided, will download from the 
    66            beginning to ``end`` of the media. 
    67        end (int): The last byte in a range to be downloaded. If not 
    68            provided, but ``start`` is provided, will download from the 
    69            ``start`` to the end of the media. 
    70        headers (Optional[Mapping[str, str]]): Extra headers that should 
    71            be sent with the request, e.g. headers for encrypted data. 
    72        checksum Optional([str]): The type of checksum to compute to verify 
    73            the integrity of the object. The response headers must contain 
    74            a checksum of the requested type. If the headers lack an 
    75            appropriate checksum (for instance in the case of transcoded or 
    76            ranged downloads where the remote service does not know the 
    77            correct checksum) an INFO-level log will be emitted. Supported 
    78            values are "md5", "crc32c", "auto" and None. The default is "auto", 
    79            which will try to detect if the C extension for crc32c is installed 
    80            and fall back to md5 otherwise. 
    81        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    82            RPC. A None value will disable retries. A 
    83            google.api_core.retry.Retry value will enable retries, and the 
    84            object will configure backoff and timeout options. 
    85 
    86            See the retry.py source code and docstrings in this package 
    87            (google.cloud.storage.retry) for information on retry types and how 
    88            to configure them. 
    89 
    90    Attributes: 
    91        media_url (str): The URL containing the media to be downloaded. 
    92        start (Optional[int]): The first byte in a range to be downloaded. 
    93        end (Optional[int]): The last byte in a range to be downloaded. 
    94    """ 
    95 
    96    def _write_to_stream(self, response): 
    97        """Write response body to a write-able stream. 
    98 
    99        .. note: 
    100 
    101            This method assumes that the ``_stream`` attribute is set on the 
    102            current download. 
    103 
    104        Args: 
    105            response (~requests.Response): The HTTP response object. 
    106 
    107        Raises: 
    108            ~google.cloud.storage.exceptions.DataCorruption: If the download's 
    109                checksum doesn't agree with server-computed checksum. 
    110        """ 
    111 
    112        # Retrieve the expected checksum only once for the download request, 
    113        # then compute and validate the checksum when the full download completes. 
    114        # Retried requests are range requests, and there's no way to detect 
    115        # data corruption for that byte range alone. 
    116        if self._expected_checksum is None and self._checksum_object is None: 
    117            # `_get_expected_checksum()` may return None even if a checksum was 
    118            # requested, in which case it will emit an info log _MISSING_CHECKSUM. 
    119            # If an invalid checksum type is specified, this will raise ValueError. 
    120            expected_checksum, checksum_object = _helpers._get_expected_checksum( 
    121                response, self._get_headers, self.media_url, checksum_type=self.checksum 
    122            ) 
    123            self._expected_checksum = expected_checksum 
    124            self._checksum_object = checksum_object 
    125        else: 
    126            expected_checksum = self._expected_checksum 
    127            checksum_object = self._checksum_object 
    128 
    129        with response: 
    130            # NOTE: In order to handle compressed streams gracefully, we try 
    131            # to insert our checksum object into the decompression stream. If 
    132            # the stream is indeed compressed, this will delegate the checksum 
    133            # object to the decoder and return a _DoNothingHash here. 
    134            local_checksum_object = _add_decoder(response.raw, checksum_object) 
    135 
    136            # This is useful for smaller files, or when the user wants to 
    137            # download the entire file in one go. 
    138            if self.single_shot_download: 
    139                content = response.raw.read(decode_content=True) 
    140                self._stream.write(content) 
    141                self._bytes_downloaded += len(content) 
    142                local_checksum_object.update(content) 
    143                response._content_consumed = True 
    144            else: 
    145                body_iter = response.iter_content( 
    146                    chunk_size=_request_helpers._SINGLE_GET_CHUNK_SIZE, 
    147                    decode_unicode=False, 
    148                ) 
    149                for chunk in body_iter: 
    150                    self._stream.write(chunk) 
    151                    self._bytes_downloaded += len(chunk) 
    152                    local_checksum_object.update(chunk) 
    153 
    154        # Don't validate the checksum for partial responses. 
    155        if ( 
    156            expected_checksum is not None 
    157            and response.status_code != http.client.PARTIAL_CONTENT 
    158        ): 
    159            actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) 
    160            if actual_checksum != expected_checksum: 
    161                headers = self._get_headers(response) 
    162                x_goog_encoding = headers.get("x-goog-stored-content-encoding") 
    163                x_goog_length = headers.get("x-goog-stored-content-length") 
    164                content_length_msg = _RESPONSE_HEADERS_INFO.format( 
    165                    x_goog_length, x_goog_encoding, self._bytes_downloaded 
    166                ) 
    167                if ( 
    168                    x_goog_length 
    169                    and self._bytes_downloaded < int(x_goog_length) 
    170                    and x_goog_encoding != "gzip" 
    171                ): 
    172                    # The library will attempt to trigger a retry by raising a ConnectionError, if 
    173                    # (a) bytes_downloaded is less than response header x-goog-stored-content-length, and 
    174                    # (b) the object is not gzip-compressed when stored in Cloud Storage. 
    175                    raise ConnectionError(content_length_msg) 
    176                else: 
    177                    msg = _CHECKSUM_MISMATCH.format( 
    178                        self.media_url, 
    179                        expected_checksum, 
    180                        actual_checksum, 
    181                        checksum_type=self.checksum.upper(), 
    182                    ) 
    183                    msg += content_length_msg 
    184                    raise DataCorruption(response, msg) 
    185 
    186    def consume( 
    187        self, 
    188        transport, 
    189        timeout=( 
    190            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    191            _request_helpers._DEFAULT_READ_TIMEOUT, 
    192        ), 
    193    ): 
    194        """Consume the resource to be downloaded. 
    195 
    196        If a ``stream`` is attached to this download, then the downloaded 
    197        resource will be written to the stream. 
    198 
    199        Args: 
    200            transport (~requests.Session): A ``requests`` object which can 
    201                make authenticated requests. 
    202            timeout (Optional[Union[float, Tuple[float, float]]]): 
    203                The number of seconds to wait for the server response. 
    204                Depending on the retry strategy, a request may be repeated 
    205                several times using the same timeout each time. 
    206 
    207                Can also be passed as a tuple (connect_timeout, read_timeout). 
    208                See :meth:`requests.Session.request` documentation for details. 
    209 
    210        Returns: 
    211            ~requests.Response: The HTTP response returned by ``transport``. 
    212 
    213        Raises: 
    214            ~google.cloud.storage.exceptions.DataCorruption: If the download's 
    215                checksum doesn't agree with server-computed checksum. 
    216            ValueError: If the current :class:`Download` has already 
    217                finished. 
    218        """ 
    219        method, _, payload, headers = self._prepare_request() 
    220        # NOTE: We assume "payload is None" but pass it along anyway. 
    221        request_kwargs = { 
    222            "data": payload, 
    223            "headers": headers, 
    224            "timeout": timeout, 
    225        } 
    226        if self._stream is not None: 
    227            request_kwargs["stream"] = True 
    228 
    229        # Assign object generation if generation is specified in the media url. 
    230        if self._object_generation is None: 
    231            self._object_generation = _helpers._get_generation_from_url(self.media_url) 
    232 
    233        # Wrap the request business logic in a function to be retried. 
    234        def retriable_request(): 
    235            url = self.media_url 
    236 
    237            # To restart an interrupted download, read from the offset of last byte 
    238            # received using a range request, and set object generation query param. 
    239            if self._bytes_downloaded > 0: 
    240                _download.add_bytes_range( 
    241                    (self.start or 0) + self._bytes_downloaded, self.end, self._headers 
    242                ) 
    243                request_kwargs["headers"] = self._headers 
    244 
    245                # Set object generation query param to ensure the same object content is requested. 
    246                if ( 
    247                    self._object_generation is not None 
    248                    and _helpers._get_generation_from_url(self.media_url) is None 
    249                ): 
    250                    query_param = {"generation": self._object_generation} 
    251                    url = _helpers.add_query_parameters(self.media_url, query_param) 
    252 
    253            result = transport.request(method, url, **request_kwargs) 
    254 
    255            # If a generation hasn't been specified, and this is the first response we get, let's record the 
    256            # generation. In future requests we'll specify the generation query param to avoid data races. 
    257            if self._object_generation is None: 
    258                self._object_generation = _helpers._parse_generation_header( 
    259                    result, self._get_headers 
    260                ) 
    261 
    262            self._process_response(result) 
    263 
    264            # With decompressive transcoding, GCS serves back the whole file regardless of the range request, 
    265            # thus we reset the stream position to the start of the stream. 
    266            # See: https://cloud.google.com/storage/docs/transcoding#range 
    267            if self._stream is not None: 
    268                if _helpers._is_decompressive_transcoding(result, self._get_headers): 
    269                    try: 
    270                        self._stream.seek(0) 
    271                    except Exception as exc: 
    272                        msg = _STREAM_SEEK_ERROR.format(url) 
    273                        raise Exception(msg) from exc 
    274                    self._bytes_downloaded = 0 
    275 
    276                self._write_to_stream(result) 
    277 
    278            return result 
    279 
    280        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    281 
    282 
    283class RawDownload(_request_helpers.RawRequestsMixin, _download.Download): 
    284    """Helper to manage downloading a raw resource from a Google API. 
    285 
    286    "Slices" of the resource can be retrieved by specifying a range 
    287    with ``start`` and / or ``end``. However, in typical usage, neither 
    288    ``start`` nor ``end`` is expected to be provided. 
    289 
    290    Args: 
    291        media_url (str): The URL containing the media to be downloaded. 
    292        stream (IO[bytes]): A write-able stream (i.e. file-like object) that 
    293            the downloaded resource can be written to. 
    294        start (int): The first byte in a range to be downloaded. If not 
    295            provided, but ``end`` is provided, will download from the 
    296            beginning to ``end`` of the media. 
    297        end (int): The last byte in a range to be downloaded. If not 
    298            provided, but ``start`` is provided, will download from the 
    299            ``start`` to the end of the media. 
    300        headers (Optional[Mapping[str, str]]): Extra headers that should 
    301            be sent with the request, e.g. headers for encrypted data. 
    302        checksum Optional([str]): The type of checksum to compute to verify 
    303            the integrity of the object. The response headers must contain 
    304            a checksum of the requested type. If the headers lack an 
    305            appropriate checksum (for instance in the case of transcoded or 
    306            ranged downloads where the remote service does not know the 
    307            correct checksum) an INFO-level log will be emitted. Supported 
    308            values are "md5", "crc32c", "auto" and None. The default is "auto", 
    309            which will try to detect if the C extension for crc32c is installed 
    310            and fall back to md5 otherwise. 
    311        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    312            RPC. A None value will disable retries. A 
    313            google.api_core.retry.Retry value will enable retries, and the 
    314            object will configure backoff and timeout options. 
    315 
    316            See the retry.py source code and docstrings in this package 
    317            (google.cloud.storage.retry) for information on retry types and how 
    318            to configure them. 
    319 
    320    Attributes: 
    321        media_url (str): The URL containing the media to be downloaded. 
    322        start (Optional[int]): The first byte in a range to be downloaded. 
    323        end (Optional[int]): The last byte in a range to be downloaded. 
    324    """ 
    325 
    326    def _write_to_stream(self, response): 
    327        """Write response body to a write-able stream. 
    328 
    329        .. note: 
    330 
    331            This method assumes that the ``_stream`` attribute is set on the 
    332            current download. 
    333 
    334        Args: 
    335            response (~requests.Response): The HTTP response object. 
    336 
    337        Raises: 
    338            ~google.cloud.storage.exceptions.DataCorruption: If the download's 
    339                checksum doesn't agree with server-computed checksum. 
    340        """ 
    341        # Retrieve the expected checksum only once for the download request, 
    342        # then compute and validate the checksum when the full download completes. 
    343        # Retried requests are range requests, and there's no way to detect 
    344        # data corruption for that byte range alone. 
    345        if self._expected_checksum is None and self._checksum_object is None: 
    346            # `_get_expected_checksum()` may return None even if a checksum was 
    347            # requested, in which case it will emit an info log _MISSING_CHECKSUM. 
    348            # If an invalid checksum type is specified, this will raise ValueError. 
    349            expected_checksum, checksum_object = _helpers._get_expected_checksum( 
    350                response, self._get_headers, self.media_url, checksum_type=self.checksum 
    351            ) 
    352            self._expected_checksum = expected_checksum 
    353            self._checksum_object = checksum_object 
    354        else: 
    355            expected_checksum = self._expected_checksum 
    356            checksum_object = self._checksum_object 
    357 
    358        with response: 
    359            # This is useful for smaller files, or when the user wants to 
    360            # download the entire file in one go. 
    361            if self.single_shot_download: 
    362                content = response.raw.read() 
    363                self._stream.write(content) 
    364                self._bytes_downloaded += len(content) 
    365                checksum_object.update(content) 
    366            else: 
    367                body_iter = response.raw.stream( 
    368                    _request_helpers._SINGLE_GET_CHUNK_SIZE, decode_content=False 
    369                ) 
    370                for chunk in body_iter: 
    371                    self._stream.write(chunk) 
    372                    self._bytes_downloaded += len(chunk) 
    373                    checksum_object.update(chunk) 
    374            response._content_consumed = True 
    375 
    376        # Don't validate the checksum for partial responses. 
    377        if ( 
    378            expected_checksum is not None 
    379            and response.status_code != http.client.PARTIAL_CONTENT 
    380        ): 
    381            actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) 
    382 
    383            if actual_checksum != expected_checksum: 
    384                headers = self._get_headers(response) 
    385                x_goog_encoding = headers.get("x-goog-stored-content-encoding") 
    386                x_goog_length = headers.get("x-goog-stored-content-length") 
    387                content_length_msg = _RESPONSE_HEADERS_INFO.format( 
    388                    x_goog_length, x_goog_encoding, self._bytes_downloaded 
    389                ) 
    390                if ( 
    391                    x_goog_length 
    392                    and self._bytes_downloaded < int(x_goog_length) 
    393                    and x_goog_encoding != "gzip" 
    394                ): 
    395                    # The library will attempt to trigger a retry by raising a ConnectionError, if 
    396                    # (a) bytes_downloaded is less than response header x-goog-stored-content-length, and 
    397                    # (b) the object is not gzip-compressed when stored in Cloud Storage. 
    398                    raise ConnectionError(content_length_msg) 
    399                else: 
    400                    msg = _CHECKSUM_MISMATCH.format( 
    401                        self.media_url, 
    402                        expected_checksum, 
    403                        actual_checksum, 
    404                        checksum_type=self.checksum.upper(), 
    405                    ) 
    406                    msg += content_length_msg 
    407                    raise DataCorruption(response, msg) 
    408 
    409    def consume( 
    410        self, 
    411        transport, 
    412        timeout=( 
    413            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    414            _request_helpers._DEFAULT_READ_TIMEOUT, 
    415        ), 
    416    ): 
    417        """Consume the resource to be downloaded. 
    418 
    419        If a ``stream`` is attached to this download, then the downloaded 
    420        resource will be written to the stream. 
    421 
    422        Args: 
    423            transport (~requests.Session): A ``requests`` object which can 
    424                make authenticated requests. 
    425            timeout (Optional[Union[float, Tuple[float, float]]]): 
    426                The number of seconds to wait for the server response. 
    427                Depending on the retry strategy, a request may be repeated 
    428                several times using the same timeout each time. 
    429 
    430                Can also be passed as a tuple (connect_timeout, read_timeout). 
    431                See :meth:`requests.Session.request` documentation for details. 
    432 
    433        Returns: 
    434            ~requests.Response: The HTTP response returned by ``transport``. 
    435 
    436        Raises: 
    437            ~google.cloud.storage.exceptions.DataCorruption: If the download's 
    438                checksum doesn't agree with server-computed checksum. 
    439            ValueError: If the current :class:`Download` has already 
    440                finished. 
    441        """ 
    442        method, _, payload, headers = self._prepare_request() 
    443        # NOTE: We assume "payload is None" but pass it along anyway. 
    444        request_kwargs = { 
    445            "data": payload, 
    446            "headers": headers, 
    447            "timeout": timeout, 
    448            "stream": True, 
    449        } 
    450 
    451        # Assign object generation if generation is specified in the media url. 
    452        if self._object_generation is None: 
    453            self._object_generation = _helpers._get_generation_from_url(self.media_url) 
    454 
    455        # Wrap the request business logic in a function to be retried. 
    456        def retriable_request(): 
    457            url = self.media_url 
    458 
    459            # To restart an interrupted download, read from the offset of last byte 
    460            # received using a range request, and set object generation query param. 
    461            if self._bytes_downloaded > 0: 
    462                _download.add_bytes_range( 
    463                    (self.start or 0) + self._bytes_downloaded, self.end, self._headers 
    464                ) 
    465                request_kwargs["headers"] = self._headers 
    466 
    467                # Set object generation query param to ensure the same object content is requested. 
    468                if ( 
    469                    self._object_generation is not None 
    470                    and _helpers._get_generation_from_url(self.media_url) is None 
    471                ): 
    472                    query_param = {"generation": self._object_generation} 
    473                    url = _helpers.add_query_parameters(self.media_url, query_param) 
    474 
    475            result = transport.request(method, url, **request_kwargs) 
    476 
    477            # If a generation hasn't been specified, and this is the first response we get, let's record the 
    478            # generation. In future requests we'll specify the generation query param to avoid data races. 
    479            if self._object_generation is None: 
    480                self._object_generation = _helpers._parse_generation_header( 
    481                    result, self._get_headers 
    482                ) 
    483 
    484            self._process_response(result) 
    485 
    486            # With decompressive transcoding, GCS serves back the whole file regardless of the range request, 
    487            # thus we reset the stream position to the start of the stream. 
    488            # See: https://cloud.google.com/storage/docs/transcoding#range 
    489            if self._stream is not None: 
    490                if _helpers._is_decompressive_transcoding(result, self._get_headers): 
    491                    try: 
    492                        self._stream.seek(0) 
    493                    except Exception as exc: 
    494                        msg = _STREAM_SEEK_ERROR.format(url) 
    495                        raise Exception(msg) from exc 
    496                    self._bytes_downloaded = 0 
    497 
    498                self._write_to_stream(result) 
    499 
    500            return result 
    501 
    502        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    503 
    504 
    505class ChunkedDownload(_request_helpers.RequestsMixin, _download.ChunkedDownload): 
    506    """Download a resource in chunks from a Google API. 
    507 
    508    Args: 
    509        media_url (str): The URL containing the media to be downloaded. 
    510        chunk_size (int): The number of bytes to be retrieved in each 
    511            request. 
    512        stream (IO[bytes]): A write-able stream (i.e. file-like object) that 
    513            will be used to concatenate chunks of the resource as they are 
    514            downloaded. 
    515        start (int): The first byte in a range to be downloaded. If not 
    516            provided, defaults to ``0``. 
    517        end (int): The last byte in a range to be downloaded. If not 
    518            provided, will download to the end of the media. 
    519        headers (Optional[Mapping[str, str]]): Extra headers that should 
    520            be sent with each request, e.g. headers for data encryption 
    521            key headers. 
    522        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    523            RPC. A None value will disable retries. A 
    524            google.api_core.retry.Retry value will enable retries, and the 
    525            object will configure backoff and timeout options. 
    526 
    527            See the retry.py source code and docstrings in this package 
    528            (google.cloud.storage.retry) for information on retry types and how 
    529            to configure them. 
    530 
    531    Attributes: 
    532        media_url (str): The URL containing the media to be downloaded. 
    533        start (Optional[int]): The first byte in a range to be downloaded. 
    534        end (Optional[int]): The last byte in a range to be downloaded. 
    535        chunk_size (int): The number of bytes to be retrieved in each request. 
    536 
    537    Raises: 
    538        ValueError: If ``start`` is negative. 
    539    """ 
    540 
    541    def consume_next_chunk( 
    542        self, 
    543        transport, 
    544        timeout=( 
    545            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    546            _request_helpers._DEFAULT_READ_TIMEOUT, 
    547        ), 
    548    ): 
    549        """Consume the next chunk of the resource to be downloaded. 
    550 
    551        Args: 
    552            transport (~requests.Session): A ``requests`` object which can 
    553                make authenticated requests. 
    554            timeout (Optional[Union[float, Tuple[float, float]]]): 
    555                The number of seconds to wait for the server response. 
    556                Depending on the retry strategy, a request may be repeated 
    557                several times using the same timeout each time. 
    558 
    559                Can also be passed as a tuple (connect_timeout, read_timeout). 
    560                See :meth:`requests.Session.request` documentation for details. 
    561 
    562        Returns: 
    563            ~requests.Response: The HTTP response returned by ``transport``. 
    564 
    565        Raises: 
    566            ValueError: If the current download has finished. 
    567        """ 
    568        method, url, payload, headers = self._prepare_request() 
    569 
    570        # Wrap the request business logic in a function to be retried. 
    571        def retriable_request(): 
    572            # NOTE: We assume "payload is None" but pass it along anyway. 
    573            result = transport.request( 
    574                method, 
    575                url, 
    576                data=payload, 
    577                headers=headers, 
    578                timeout=timeout, 
    579            ) 
    580            self._process_response(result) 
    581            return result 
    582 
    583        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    584 
    585 
    586class RawChunkedDownload(_request_helpers.RawRequestsMixin, _download.ChunkedDownload): 
    587    """Download a raw resource in chunks from a Google API. 
    588 
    589    Args: 
    590        media_url (str): The URL containing the media to be downloaded. 
    591        chunk_size (int): The number of bytes to be retrieved in each 
    592            request. 
    593        stream (IO[bytes]): A write-able stream (i.e. file-like object) that 
    594            will be used to concatenate chunks of the resource as they are 
    595            downloaded. 
    596        start (int): The first byte in a range to be downloaded. If not 
    597            provided, defaults to ``0``. 
    598        end (int): The last byte in a range to be downloaded. If not 
    599            provided, will download to the end of the media. 
    600        headers (Optional[Mapping[str, str]]): Extra headers that should 
    601            be sent with each request, e.g. headers for data encryption 
    602            key headers. 
    603        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    604            RPC. A None value will disable retries. A 
    605            google.api_core.retry.Retry value will enable retries, and the 
    606            object will configure backoff and timeout options. 
    607 
    608            See the retry.py source code and docstrings in this package 
    609            (google.cloud.storage.retry) for information on retry types and how 
    610            to configure them. 
    611 
    612    Attributes: 
    613        media_url (str): The URL containing the media to be downloaded. 
    614        start (Optional[int]): The first byte in a range to be downloaded. 
    615        end (Optional[int]): The last byte in a range to be downloaded. 
    616        chunk_size (int): The number of bytes to be retrieved in each request. 
    617 
    618    Raises: 
    619        ValueError: If ``start`` is negative. 
    620    """ 
    621 
    622    def consume_next_chunk( 
    623        self, 
    624        transport, 
    625        timeout=( 
    626            _request_helpers._DEFAULT_CONNECT_TIMEOUT, 
    627            _request_helpers._DEFAULT_READ_TIMEOUT, 
    628        ), 
    629    ): 
    630        """Consume the next chunk of the resource to be downloaded. 
    631 
    632        Args: 
    633            transport (~requests.Session): A ``requests`` object which can 
    634                make authenticated requests. 
    635            timeout (Optional[Union[float, Tuple[float, float]]]): 
    636                The number of seconds to wait for the server response. 
    637                Depending on the retry strategy, a request may be repeated 
    638                several times using the same timeout each time. 
    639 
    640                Can also be passed as a tuple (connect_timeout, read_timeout). 
    641                See :meth:`requests.Session.request` documentation for details. 
    642 
    643        Returns: 
    644            ~requests.Response: The HTTP response returned by ``transport``. 
    645 
    646        Raises: 
    647            ValueError: If the current download has finished. 
    648        """ 
    649        method, url, payload, headers = self._prepare_request() 
    650 
    651        # Wrap the request business logic in a function to be retried. 
    652        def retriable_request(): 
    653            # NOTE: We assume "payload is None" but pass it along anyway. 
    654            result = transport.request( 
    655                method, 
    656                url, 
    657                data=payload, 
    658                headers=headers, 
    659                stream=True, 
    660                timeout=timeout, 
    661            ) 
    662            self._process_response(result) 
    663            return result 
    664 
    665        return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) 
    666 
    667 
    668def _add_decoder(response_raw, checksum): 
    669    """Patch the ``_decoder`` on a ``urllib3`` response. 
    670 
    671    This is so that we can intercept the compressed bytes before they are 
    672    decoded. 
    673 
    674    Only patches if the content encoding is ``gzip`` or ``br``. 
    675 
    676    Args: 
    677        response_raw (urllib3.response.HTTPResponse): The raw response for 
    678            an HTTP request. 
    679        checksum (object): 
    680            A checksum which will be updated with compressed bytes. 
    681 
    682    Returns: 
    683        object: Either the original ``checksum`` if ``_decoder`` is not 
    684        patched, or a ``_DoNothingHash`` if the decoder is patched, since the 
    685        caller will no longer need to hash to decoded bytes. 
    686    """ 
    687    encoding = response_raw.headers.get("content-encoding", "").lower() 
    688    if encoding == "gzip": 
    689        response_raw._decoder = _GzipDecoder(checksum) 
    690        return _helpers._DoNothingHash() 
    691    # Only activate if brotli is installed 
    692    elif encoding == "br" and _BrotliDecoder:  # type: ignore 
    693        response_raw._decoder = _BrotliDecoder(checksum) 
    694        return _helpers._DoNothingHash() 
    695    else: 
    696        return checksum 
    697 
    698 
    699class _GzipDecoder(urllib3.response.GzipDecoder): 
    700    """Custom subclass of ``urllib3`` decoder for ``gzip``-ed bytes. 
    701 
    702    Allows a checksum function to see the compressed bytes before they are 
    703    decoded. This way the checksum of the compressed value can be computed. 
    704 
    705    Args: 
    706        checksum (object): 
    707            A checksum which will be updated with compressed bytes. 
    708    """ 
    709 
    710    def __init__(self, checksum): 
    711        super().__init__() 
    712        self._checksum = checksum 
    713 
    714    def decompress(self, data): 
    715        """Decompress the bytes. 
    716 
    717        Args: 
    718            data (bytes): The compressed bytes to be decompressed. 
    719 
    720        Returns: 
    721            bytes: The decompressed bytes from ``data``. 
    722        """ 
    723        self._checksum.update(data) 
    724        return super().decompress(data) 
    725 
    726 
    727# urllib3.response.BrotliDecoder might not exist depending on whether brotli is 
    728# installed. 
    729if hasattr(urllib3.response, "BrotliDecoder"): 
    730 
    731    class _BrotliDecoder: 
    732        """Handler for ``brotli`` encoded bytes. 
    733 
    734        Allows a checksum function to see the compressed bytes before they are 
    735        decoded. This way the checksum of the compressed value can be computed. 
    736 
    737        Because BrotliDecoder's decompress method is dynamically created in 
    738        urllib3, a subclass is not practical. Instead, this class creates a 
    739        captive urllib3.requests.BrotliDecoder instance and acts as a proxy. 
    740 
    741        Args: 
    742            checksum (object): 
    743                A checksum which will be updated with compressed bytes. 
    744        """ 
    745 
    746        def __init__(self, checksum): 
    747            self._decoder = urllib3.response.BrotliDecoder() 
    748            self._checksum = checksum 
    749 
    750        def decompress(self, data): 
    751            """Decompress the bytes. 
    752 
    753            Args: 
    754                data (bytes): The compressed bytes to be decompressed. 
    755 
    756            Returns: 
    757                bytes: The decompressed bytes from ``data``. 
    758            """ 
    759            self._checksum.update(data) 
    760            return self._decoder.decompress(data) 
    761 
    762        def flush(self): 
    763            return self._decoder.flush() 
    764 
    765else:  # pragma: NO COVER 
    766    _BrotliDecoder = None  # type: ignore # pragma: NO COVER