1# Copyright 2017 Google Inc. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Virtual bases classes for downloading media from Google APIs.""" 
    16 
    17 
    18import http.client 
    19import re 
    20 
    21from google.cloud.storage._media import _helpers 
    22from google.cloud.storage.exceptions import InvalidResponse 
    23from google.cloud.storage.retry import DEFAULT_RETRY 
    24 
    25 
    26_CONTENT_RANGE_RE = re.compile( 
    27    r"bytes (?P<start_byte>\d+)-(?P<end_byte>\d+)/(?P<total_bytes>\d+)", 
    28    flags=re.IGNORECASE, 
    29) 
    30_ACCEPTABLE_STATUS_CODES = (http.client.OK, http.client.PARTIAL_CONTENT) 
    31_GET = "GET" 
    32_ZERO_CONTENT_RANGE_HEADER = "bytes */0" 
    33 
    34 
    35class DownloadBase(object): 
    36    """Base class for download helpers. 
    37 
    38    Defines core shared behavior across different download types. 
    39 
    40    Args: 
    41        media_url (str): The URL containing the media to be downloaded. 
    42        stream (IO[bytes]): A write-able stream (i.e. file-like object) that 
    43            the downloaded resource can be written to. 
    44        start (int): The first byte in a range to be downloaded. 
    45        end (int): The last byte in a range to be downloaded. 
    46        headers (Optional[Mapping[str, str]]): Extra headers that should 
    47            be sent with the request, e.g. headers for encrypted data. 
    48        retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 
    49            A None value will disable retries. A google.api_core.retry.Retry 
    50            value will enable retries, and the object will configure backoff and 
    51            timeout options. 
    52 
    53            See the retry.py source code and docstrings in this package 
    54            (google.cloud.storage.retry) for information on retry types and how 
    55            to configure them. 
    56 
    57    Attributes: 
    58        media_url (str): The URL containing the media to be downloaded. 
    59        start (Optional[int]): The first byte in a range to be downloaded. 
    60        end (Optional[int]): The last byte in a range to be downloaded. 
    61    """ 
    62 
    63    def __init__( 
    64        self, 
    65        media_url, 
    66        stream=None, 
    67        start=None, 
    68        end=None, 
    69        headers=None, 
    70        retry=DEFAULT_RETRY, 
    71    ): 
    72        self.media_url = media_url 
    73        self._stream = stream 
    74        self.start = start 
    75        self.end = end 
    76        if headers is None: 
    77            headers = {} 
    78        self._headers = headers 
    79        self._finished = False 
    80        self._retry_strategy = retry 
    81 
    82    @property 
    83    def finished(self): 
    84        """bool: Flag indicating if the download has completed.""" 
    85        return self._finished 
    86 
    87    @staticmethod 
    88    def _get_status_code(response): 
    89        """Access the status code from an HTTP response. 
    90 
    91        Args: 
    92            response (object): The HTTP response object. 
    93 
    94        Raises: 
    95            NotImplementedError: Always, since virtual. 
    96        """ 
    97        raise NotImplementedError("This implementation is virtual.") 
    98 
    99    @staticmethod 
    100    def _get_headers(response): 
    101        """Access the headers from an HTTP response. 
    102 
    103        Args: 
    104            response (object): The HTTP response object. 
    105 
    106        Raises: 
    107            NotImplementedError: Always, since virtual. 
    108        """ 
    109        raise NotImplementedError("This implementation is virtual.") 
    110 
    111    @staticmethod 
    112    def _get_body(response): 
    113        """Access the response body from an HTTP response. 
    114 
    115        Args: 
    116            response (object): The HTTP response object. 
    117 
    118        Raises: 
    119            NotImplementedError: Always, since virtual. 
    120        """ 
    121        raise NotImplementedError("This implementation is virtual.") 
    122 
    123 
    124class Download(DownloadBase): 
    125    """Helper to manage downloading a resource from a Google API. 
    126 
    127    "Slices" of the resource can be retrieved by specifying a range 
    128    with ``start`` and / or ``end``. However, in typical usage, neither 
    129    ``start`` nor ``end`` is expected to be provided. 
    130 
    131    Args: 
    132        media_url (str): The URL containing the media to be downloaded. 
    133        stream (IO[bytes]): A write-able stream (i.e. file-like object) that 
    134            the downloaded resource can be written to. 
    135        start (int): The first byte in a range to be downloaded. If not 
    136            provided, but ``end`` is provided, will download from the 
    137            beginning to ``end`` of the media. 
    138        end (int): The last byte in a range to be downloaded. If not 
    139            provided, but ``start`` is provided, will download from the 
    140            ``start`` to the end of the media. 
    141        headers (Optional[Mapping[str, str]]): Extra headers that should 
    142            be sent with the request, e.g. headers for encrypted data. 
    143        checksum (Optional[str]): The type of checksum to compute to verify 
    144            the integrity of the object. The response headers must contain 
    145            a checksum of the requested type. If the headers lack an 
    146            appropriate checksum (for instance in the case of transcoded or 
    147            ranged downloads where the remote service does not know the 
    148            correct checksum) an INFO-level log will be emitted. Supported 
    149            values are "md5", "crc32c", "auto" and None. The default is "auto", 
    150            which will try to detect if the C extension for crc32c is installed 
    151            and fall back to md5 otherwise. 
    152        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    153            RPC. A None value will disable retries. A 
    154            google.api_core.retry.Retry value will enable retries, and the 
    155            object will configure backoff and timeout options. 
    156 
    157            See the retry.py source code and docstrings in this package 
    158            (google.cloud.storage.retry) for information on retry types and how 
    159            to configure them. 
    160        single_shot_download (Optional[bool]): If true, download the object in a single request. 
    161            Caution: Enabling this will increase the memory overload for your application. 
    162            Please enable this as per your use case. 
    163 
    164    """ 
    165 
    166    def __init__( 
    167        self, 
    168        media_url, 
    169        stream=None, 
    170        start=None, 
    171        end=None, 
    172        headers=None, 
    173        checksum="auto", 
    174        retry=DEFAULT_RETRY, 
    175        single_shot_download=False, 
    176    ): 
    177        super(Download, self).__init__( 
    178            media_url, stream=stream, start=start, end=end, headers=headers, retry=retry 
    179        ) 
    180        self.checksum = checksum 
    181        if self.checksum == "auto": 
    182            self.checksum = ( 
    183                "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 
    184            ) 
    185        self.single_shot_download = single_shot_download 
    186        self._bytes_downloaded = 0 
    187        self._expected_checksum = None 
    188        self._checksum_object = None 
    189        self._object_generation = None 
    190 
    191    def _prepare_request(self): 
    192        """Prepare the contents of an HTTP request. 
    193 
    194        This is everything that must be done before a request that doesn't 
    195        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    196        philosophy. 
    197 
    198        Returns: 
    199            Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 
    200 
    201              * HTTP verb for the request (always GET) 
    202              * the URL for the request 
    203              * the body of the request (always :data:`None`) 
    204              * headers for the request 
    205 
    206        Raises: 
    207            ValueError: If the current :class:`Download` has already 
    208                finished. 
    209 
    210        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    211        """ 
    212        if self.finished: 
    213            raise ValueError("A download can only be used once.") 
    214 
    215        add_bytes_range(self.start, self.end, self._headers) 
    216        return _GET, self.media_url, None, self._headers 
    217 
    218    def _process_response(self, response): 
    219        """Process the response from an HTTP request. 
    220 
    221        This is everything that must be done after a request that doesn't 
    222        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    223        philosophy. 
    224 
    225        Args: 
    226            response (object): The HTTP response object. 
    227 
    228        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    229        """ 
    230        # Tombstone the current Download so it cannot be used again. 
    231        self._finished = True 
    232        _helpers.require_status_code( 
    233            response, _ACCEPTABLE_STATUS_CODES, self._get_status_code 
    234        ) 
    235 
    236    def consume(self, transport, timeout=None): 
    237        """Consume the resource to be downloaded. 
    238 
    239        If a ``stream`` is attached to this download, then the downloaded 
    240        resource will be written to the stream. 
    241 
    242        Args: 
    243            transport (object): An object which can make authenticated 
    244                requests. 
    245            timeout (Optional[Union[float, Tuple[float, float]]]): 
    246                The number of seconds to wait for the server response. 
    247                Depending on the retry strategy, a request may be repeated 
    248                several times using the same timeout each time. 
    249 
    250                Can also be passed as a tuple (connect_timeout, read_timeout). 
    251                See :meth:`requests.Session.request` documentation for details. 
    252 
    253        Raises: 
    254            NotImplementedError: Always, since virtual. 
    255        """ 
    256        raise NotImplementedError("This implementation is virtual.") 
    257 
    258 
    259class ChunkedDownload(DownloadBase): 
    260    """Download a resource in chunks from a Google API. 
    261 
    262    Args: 
    263        media_url (str): The URL containing the media to be downloaded. 
    264        chunk_size (int): The number of bytes to be retrieved in each 
    265            request. 
    266        stream (IO[bytes]): A write-able stream (i.e. file-like object) that 
    267            will be used to concatenate chunks of the resource as they are 
    268            downloaded. 
    269        start (int): The first byte in a range to be downloaded. If not 
    270            provided, defaults to ``0``. 
    271        end (int): The last byte in a range to be downloaded. If not 
    272            provided, will download to the end of the media. 
    273        headers (Optional[Mapping[str, str]]): Extra headers that should 
    274            be sent with each request, e.g. headers for data encryption 
    275            key headers. 
    276        retry (Optional[google.api_core.retry.Retry]): How to retry the 
    277            RPC. A None value will disable retries. A 
    278            google.api_core.retry.Retry value will enable retries, and the 
    279            object will configure backoff and timeout options. 
    280 
    281            See the retry.py source code and docstrings in this package 
    282            (google.cloud.storage.retry) for information on retry types and how 
    283            to configure them. 
    284 
    285    Attributes: 
    286        media_url (str): The URL containing the media to be downloaded. 
    287        start (Optional[int]): The first byte in a range to be downloaded. 
    288        end (Optional[int]): The last byte in a range to be downloaded. 
    289        chunk_size (int): The number of bytes to be retrieved in each request. 
    290 
    291    Raises: 
    292        ValueError: If ``start`` is negative. 
    293    """ 
    294 
    295    def __init__( 
    296        self, 
    297        media_url, 
    298        chunk_size, 
    299        stream, 
    300        start=0, 
    301        end=None, 
    302        headers=None, 
    303        retry=DEFAULT_RETRY, 
    304    ): 
    305        if start < 0: 
    306            raise ValueError( 
    307                "On a chunked download the starting " "value cannot be negative." 
    308            ) 
    309        super(ChunkedDownload, self).__init__( 
    310            media_url, 
    311            stream=stream, 
    312            start=start, 
    313            end=end, 
    314            headers=headers, 
    315            retry=retry, 
    316        ) 
    317        self.chunk_size = chunk_size 
    318        self._bytes_downloaded = 0 
    319        self._total_bytes = None 
    320        self._invalid = False 
    321 
    322    @property 
    323    def bytes_downloaded(self): 
    324        """int: Number of bytes that have been downloaded.""" 
    325        return self._bytes_downloaded 
    326 
    327    @property 
    328    def total_bytes(self): 
    329        """Optional[int]: The total number of bytes to be downloaded.""" 
    330        return self._total_bytes 
    331 
    332    @property 
    333    def invalid(self): 
    334        """bool: Indicates if the download is in an invalid state. 
    335 
    336        This will occur if a call to :meth:`consume_next_chunk` fails. 
    337        """ 
    338        return self._invalid 
    339 
    340    def _get_byte_range(self): 
    341        """Determines the byte range for the next request. 
    342 
    343        Returns: 
    344            Tuple[int, int]: The pair of begin and end byte for the next 
    345            chunked request. 
    346        """ 
    347        curr_start = self.start + self.bytes_downloaded 
    348        curr_end = curr_start + self.chunk_size - 1 
    349        # Make sure ``curr_end`` does not exceed ``end``. 
    350        if self.end is not None: 
    351            curr_end = min(curr_end, self.end) 
    352        # Make sure ``curr_end`` does not exceed ``total_bytes - 1``. 
    353        if self.total_bytes is not None: 
    354            curr_end = min(curr_end, self.total_bytes - 1) 
    355        return curr_start, curr_end 
    356 
    357    def _prepare_request(self): 
    358        """Prepare the contents of an HTTP request. 
    359 
    360        This is everything that must be done before a request that doesn't 
    361        require network I/O (or other I/O). This is based on the `sans-I/O`_ 
    362        philosophy. 
    363 
    364        .. note: 
    365 
    366            This method will be used multiple times, so ``headers`` will 
    367            be mutated in between requests. However, we don't make a copy 
    368            since the same keys are being updated. 
    369 
    370        Returns: 
    371            Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 
    372 
    373              * HTTP verb for the request (always GET) 
    374              * the URL for the request 
    375              * the body of the request (always :data:`None`) 
    376              * headers for the request 
    377 
    378        Raises: 
    379            ValueError: If the current download has finished. 
    380            ValueError: If the current download is invalid. 
    381 
    382        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    383        """ 
    384        if self.finished: 
    385            raise ValueError("Download has finished.") 
    386        if self.invalid: 
    387            raise ValueError("Download is invalid and cannot be re-used.") 
    388 
    389        curr_start, curr_end = self._get_byte_range() 
    390        add_bytes_range(curr_start, curr_end, self._headers) 
    391        return _GET, self.media_url, None, self._headers 
    392 
    393    def _make_invalid(self): 
    394        """Simple setter for ``invalid``. 
    395 
    396        This is intended to be passed along as a callback to helpers that 
    397        raise an exception so they can mark this instance as invalid before 
    398        raising. 
    399        """ 
    400        self._invalid = True 
    401 
    402    def _process_response(self, response): 
    403        """Process the response from an HTTP request. 
    404 
    405        This is everything that must be done after a request that doesn't 
    406        require network I/O. This is based on the `sans-I/O`_ philosophy. 
    407 
    408        For the time being, this **does require** some form of I/O to write 
    409        a chunk to ``stream``. However, this will (almost) certainly not be 
    410        network I/O. 
    411 
    412        Updates the current state after consuming a chunk. First, 
    413        increments ``bytes_downloaded`` by the number of bytes in the 
    414        ``content-length`` header. 
    415 
    416        If ``total_bytes`` is already set, this assumes (but does not check) 
    417        that we already have the correct value and doesn't bother to check 
    418        that it agrees with the headers. 
    419 
    420        We expect the **total** length to be in the ``content-range`` header, 
    421        but this header is only present on requests which sent the ``range`` 
    422        header. This response header should be of the form 
    423        ``bytes {start}-{end}/{total}`` and ``{end} - {start} + 1`` 
    424        should be the same as the ``Content-Length``. 
    425 
    426        Args: 
    427            response (object): The HTTP response object (need headers). 
    428 
    429        Raises: 
    430            ~google.cloud.storage.exceptions.InvalidResponse: If the number 
    431                of bytes in the body doesn't match the content length header. 
    432 
    433        .. _sans-I/O: https://sans-io.readthedocs.io/ 
    434        """ 
    435        # Verify the response before updating the current instance. 
    436        if _check_for_zero_content_range( 
    437            response, self._get_status_code, self._get_headers 
    438        ): 
    439            self._finished = True 
    440            return 
    441 
    442        _helpers.require_status_code( 
    443            response, 
    444            _ACCEPTABLE_STATUS_CODES, 
    445            self._get_status_code, 
    446            callback=self._make_invalid, 
    447        ) 
    448        headers = self._get_headers(response) 
    449        response_body = self._get_body(response) 
    450 
    451        start_byte, end_byte, total_bytes = get_range_info( 
    452            response, self._get_headers, callback=self._make_invalid 
    453        ) 
    454 
    455        transfer_encoding = headers.get("transfer-encoding") 
    456 
    457        if transfer_encoding is None: 
    458            content_length = _helpers.header_required( 
    459                response, 
    460                "content-length", 
    461                self._get_headers, 
    462                callback=self._make_invalid, 
    463            ) 
    464            num_bytes = int(content_length) 
    465            if len(response_body) != num_bytes: 
    466                self._make_invalid() 
    467                raise InvalidResponse( 
    468                    response, 
    469                    "Response is different size than content-length", 
    470                    "Expected", 
    471                    num_bytes, 
    472                    "Received", 
    473                    len(response_body), 
    474                ) 
    475        else: 
    476            # 'content-length' header not allowed with chunked encoding. 
    477            num_bytes = end_byte - start_byte + 1 
    478 
    479        # First update ``bytes_downloaded``. 
    480        self._bytes_downloaded += num_bytes 
    481        # If the end byte is past ``end`` or ``total_bytes - 1`` we are done. 
    482        if self.end is not None and end_byte >= self.end: 
    483            self._finished = True 
    484        elif end_byte >= total_bytes - 1: 
    485            self._finished = True 
    486        # NOTE: We only use ``total_bytes`` if not already known. 
    487        if self.total_bytes is None: 
    488            self._total_bytes = total_bytes 
    489        # Write the response body to the stream. 
    490        self._stream.write(response_body) 
    491 
    492    def consume_next_chunk(self, transport, timeout=None): 
    493        """Consume the next chunk of the resource to be downloaded. 
    494 
    495        Args: 
    496            transport (object): An object which can make authenticated 
    497                requests. 
    498            timeout (Optional[Union[float, Tuple[float, float]]]): 
    499                The number of seconds to wait for the server response. 
    500                Depending on the retry strategy, a request may be repeated 
    501                several times using the same timeout each time. 
    502 
    503                Can also be passed as a tuple (connect_timeout, read_timeout). 
    504                See :meth:`requests.Session.request` documentation for details. 
    505 
    506        Raises: 
    507            NotImplementedError: Always, since virtual. 
    508        """ 
    509        raise NotImplementedError("This implementation is virtual.") 
    510 
    511 
    512def add_bytes_range(start, end, headers): 
    513    """Add a bytes range to a header dictionary. 
    514 
    515    Some possible inputs and the corresponding bytes ranges:: 
    516 
    517       >>> headers = {} 
    518       >>> add_bytes_range(None, None, headers) 
    519       >>> headers 
    520       {} 
    521       >>> add_bytes_range(500, 999, headers) 
    522       >>> headers['range'] 
    523       'bytes=500-999' 
    524       >>> add_bytes_range(None, 499, headers) 
    525       >>> headers['range'] 
    526       'bytes=0-499' 
    527       >>> add_bytes_range(-500, None, headers) 
    528       >>> headers['range'] 
    529       'bytes=-500' 
    530       >>> add_bytes_range(9500, None, headers) 
    531       >>> headers['range'] 
    532       'bytes=9500-' 
    533 
    534    Args: 
    535        start (Optional[int]): The first byte in a range. Can be zero, 
    536            positive, negative or :data:`None`. 
    537        end (Optional[int]): The last byte in a range. Assumed to be 
    538            positive. 
    539        headers (Mapping[str, str]): A headers mapping which can have the 
    540            bytes range added if at least one of ``start`` or ``end`` 
    541            is not :data:`None`. 
    542    """ 
    543    if start is None: 
    544        if end is None: 
    545            # No range to add. 
    546            return 
    547        else: 
    548            # NOTE: This assumes ``end`` is non-negative. 
    549            bytes_range = "0-{:d}".format(end) 
    550    else: 
    551        if end is None: 
    552            if start < 0: 
    553                bytes_range = "{:d}".format(start) 
    554            else: 
    555                bytes_range = "{:d}-".format(start) 
    556        else: 
    557            # NOTE: This is invalid if ``start < 0``. 
    558            bytes_range = "{:d}-{:d}".format(start, end) 
    559 
    560    headers[_helpers.RANGE_HEADER] = "bytes=" + bytes_range 
    561 
    562 
    563def get_range_info(response, get_headers, callback=_helpers.do_nothing): 
    564    """Get the start, end and total bytes from a content range header. 
    565 
    566    Args: 
    567        response (object): An HTTP response object. 
    568        get_headers (Callable[Any, Mapping[str, str]]): Helper to get headers 
    569            from an HTTP response. 
    570        callback (Optional[Callable]): A callback that takes no arguments, 
    571            to be executed when an exception is being raised. 
    572 
    573    Returns: 
    574        Tuple[int, int, int]: The start byte, end byte and total bytes. 
    575 
    576    Raises: 
    577        ~google.cloud.storage.exceptions.InvalidResponse: If the 
    578            ``Content-Range`` header is not of the form 
    579            ``bytes {start}-{end}/{total}``. 
    580    """ 
    581    content_range = _helpers.header_required( 
    582        response, _helpers.CONTENT_RANGE_HEADER, get_headers, callback=callback 
    583    ) 
    584    match = _CONTENT_RANGE_RE.match(content_range) 
    585    if match is None: 
    586        callback() 
    587        raise InvalidResponse( 
    588            response, 
    589            "Unexpected content-range header", 
    590            content_range, 
    591            'Expected to be of the form "bytes {start}-{end}/{total}"', 
    592        ) 
    593 
    594    return ( 
    595        int(match.group("start_byte")), 
    596        int(match.group("end_byte")), 
    597        int(match.group("total_bytes")), 
    598    ) 
    599 
    600 
    601def _check_for_zero_content_range(response, get_status_code, get_headers): 
    602    """Validate if response status code is 416 and content range is zero. 
    603 
    604    This is the special case for handling zero bytes files. 
    605 
    606    Args: 
    607        response (object): An HTTP response object. 
    608        get_status_code (Callable[Any, int]): Helper to get a status code 
    609            from a response. 
    610        get_headers (Callable[Any, Mapping[str, str]]): Helper to get headers 
    611            from an HTTP response. 
    612 
    613    Returns: 
    614        bool: True if content range total bytes is zero, false otherwise. 
    615    """ 
    616    if get_status_code(response) == http.client.REQUESTED_RANGE_NOT_SATISFIABLE: 
    617        content_range = _helpers.header_required( 
    618            response, 
    619            _helpers.CONTENT_RANGE_HEADER, 
    620            get_headers, 
    621            callback=_helpers.do_nothing, 
    622        ) 
    623        if content_range == _ZERO_CONTENT_RANGE_HEADER: 
    624            return True 
    625    return False