1# Copyright 2014 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14"""Batch updates / deletes of storage buckets / blobs. 
    15 
    16A batch request is a single standard HTTP request containing multiple Cloud Storage JSON API calls. 
    17Within this main HTTP request, there are multiple parts which each contain a nested HTTP request. 
    18The body of each part is itself a complete HTTP request, with its own verb, URL, headers, and body. 
    19 
    20Note that Cloud Storage does not support batch operations for uploading or downloading. 
    21Additionally, the current batch design does not support library methods whose return values 
    22depend on the response payload. See more details in the [Sending Batch Requests official guide](https://cloud.google.com/storage/docs/batch). 
    23 
    24Examples of situations when you might want to use the Batch module: 
    25``blob.patch()`` 
    26``blob.update()`` 
    27``blob.delete()`` 
    28``bucket.delete_blob()`` 
    29``bucket.patch()`` 
    30``bucket.update()`` 
    31""" 
    32from email.encoders import encode_noop 
    33from email.generator import Generator 
    34from email.mime.application import MIMEApplication 
    35from email.mime.multipart import MIMEMultipart 
    36from email.parser import Parser 
    37import io 
    38import json 
    39 
    40import requests 
    41 
    42from google.cloud import _helpers 
    43from google.cloud import exceptions 
    44from google.cloud.storage._http import Connection 
    45from google.cloud.storage.constants import _DEFAULT_TIMEOUT 
    46 
    47 
    48class MIMEApplicationHTTP(MIMEApplication): 
    49    """MIME type for ``application/http``. 
    50 
    51    Constructs payload from headers and body 
    52 
    53    :type method: str 
    54    :param method: HTTP method 
    55 
    56    :type uri: str 
    57    :param uri: URI for HTTP request 
    58 
    59    :type headers:  dict 
    60    :param headers: HTTP headers 
    61 
    62    :type body: str 
    63    :param body: (Optional) HTTP payload 
    64 
    65    """ 
    66 
    67    def __init__(self, method, uri, headers, body): 
    68        if isinstance(body, dict): 
    69            body = json.dumps(body) 
    70            headers["Content-Type"] = "application/json" 
    71            headers["Content-Length"] = len(body) 
    72        if body is None: 
    73            body = "" 
    74        lines = [f"{method} {uri} HTTP/1.1"] 
    75        lines.extend([f"{key}: {value}" for key, value in sorted(headers.items())]) 
    76        lines.append("") 
    77        lines.append(body) 
    78        payload = "\r\n".join(lines) 
    79        super().__init__(payload, "http", encode_noop) 
    80 
    81 
    82class _FutureDict(object): 
    83    """Class to hold a future value for a deferred request. 
    84 
    85    Used by for requests that get sent in a :class:`Batch`. 
    86    """ 
    87 
    88    @staticmethod 
    89    def get(key, default=None): 
    90        """Stand-in for dict.get. 
    91 
    92        :type key: object 
    93        :param key: Hashable dictionary key. 
    94 
    95        :type default: object 
    96        :param default: Fallback value to dict.get. 
    97 
    98        :raises: :class:`KeyError` always since the future is intended to fail 
    99                 as a dictionary. 
    100        """ 
    101        raise KeyError(f"Cannot get({key!r}, default={default!r}) on a future") 
    102 
    103    def __getitem__(self, key): 
    104        """Stand-in for dict[key]. 
    105 
    106        :type key: object 
    107        :param key: Hashable dictionary key. 
    108 
    109        :raises: :class:`KeyError` always since the future is intended to fail 
    110                 as a dictionary. 
    111        """ 
    112        raise KeyError(f"Cannot get item {key!r} from a future") 
    113 
    114    def __setitem__(self, key, value): 
    115        """Stand-in for dict[key] = value. 
    116 
    117        :type key: object 
    118        :param key: Hashable dictionary key. 
    119 
    120        :type value: object 
    121        :param value: Dictionary value. 
    122 
    123        :raises: :class:`KeyError` always since the future is intended to fail 
    124                 as a dictionary. 
    125        """ 
    126        raise KeyError(f"Cannot set {key!r} -> {value!r} on a future") 
    127 
    128 
    129class _FutureResponse(requests.Response): 
    130    """Reponse that returns a placeholder dictionary for a batched requests.""" 
    131 
    132    def __init__(self, future_dict): 
    133        super(_FutureResponse, self).__init__() 
    134        self._future_dict = future_dict 
    135        self.status_code = 204 
    136 
    137    def json(self): 
    138        return self._future_dict 
    139 
    140    @property 
    141    def content(self): 
    142        return self._future_dict 
    143 
    144 
    145class Batch(Connection): 
    146    """Proxy an underlying connection, batching up change operations. 
    147 
    148    .. warning:: 
    149 
    150        Cloud Storage does not support batch operations for uploading or downloading. 
    151        Additionally, the current batch design does not support library methods whose 
    152        return values depend on the response payload. 
    153 
    154    :type client: :class:`google.cloud.storage.client.Client` 
    155    :param client: The client to use for making connections. 
    156 
    157    :type raise_exception: bool 
    158    :param raise_exception: 
    159        (Optional) Defaults to True. If True, instead of adding exceptions 
    160        to the list of return responses, the final exception will be raised. 
    161        Note that exceptions are unwrapped after all operations are complete 
    162        in success or failure, and only the last exception is raised. 
    163    """ 
    164 
    165    _MAX_BATCH_SIZE = 1000 
    166 
    167    def __init__(self, client, raise_exception=True): 
    168        api_endpoint = client._connection.API_BASE_URL 
    169        client_info = client._connection._client_info 
    170        super(Batch, self).__init__( 
    171            client, client_info=client_info, api_endpoint=api_endpoint 
    172        ) 
    173        self._requests = [] 
    174        self._target_objects = [] 
    175        self._responses = [] 
    176        self._raise_exception = raise_exception 
    177 
    178    def _do_request( 
    179        self, method, url, headers, data, target_object, timeout=_DEFAULT_TIMEOUT 
    180    ): 
    181        """Override Connection:  defer actual HTTP request. 
    182 
    183        Only allow up to ``_MAX_BATCH_SIZE`` requests to be deferred. 
    184 
    185        :type method: str 
    186        :param method: The HTTP method to use in the request. 
    187 
    188        :type url: str 
    189        :param url: The URL to send the request to. 
    190 
    191        :type headers: dict 
    192        :param headers: A dictionary of HTTP headers to send with the request. 
    193 
    194        :type data: str 
    195        :param data: The data to send as the body of the request. 
    196 
    197        :type target_object: object 
    198        :param target_object: 
    199            (Optional) This allows us to enable custom behavior in our batch 
    200            connection. Here we defer an HTTP request and complete 
    201            initialization of the object at a later time. 
    202 
    203        :type timeout: float or tuple 
    204        :param timeout: 
    205            (Optional) The amount of time, in seconds, to wait 
    206            for the server response.  See: :ref:`configuring_timeouts` 
    207 
    208        :rtype: tuple of ``response`` (a dictionary of sorts) 
    209                and ``content`` (a string). 
    210        :returns: The HTTP response object and the content of the response. 
    211        """ 
    212        if len(self._requests) >= self._MAX_BATCH_SIZE: 
    213            raise ValueError( 
    214                "Too many deferred requests (max %d)" % self._MAX_BATCH_SIZE 
    215            ) 
    216        self._requests.append((method, url, headers, data, timeout)) 
    217        result = _FutureDict() 
    218        self._target_objects.append(target_object) 
    219        if target_object is not None: 
    220            target_object._properties = result 
    221        return _FutureResponse(result) 
    222 
    223    def _prepare_batch_request(self): 
    224        """Prepares headers and body for a batch request. 
    225 
    226        :rtype: tuple (dict, str) 
    227        :returns: The pair of headers and body of the batch request to be sent. 
    228        :raises: :class:`ValueError` if no requests have been deferred. 
    229        """ 
    230        if len(self._requests) == 0: 
    231            raise ValueError("No deferred requests") 
    232 
    233        multi = MIMEMultipart() 
    234 
    235        # Use timeout of last request, default to _DEFAULT_TIMEOUT 
    236        timeout = _DEFAULT_TIMEOUT 
    237        for method, uri, headers, body, _timeout in self._requests: 
    238            subrequest = MIMEApplicationHTTP(method, uri, headers, body) 
    239            multi.attach(subrequest) 
    240            timeout = _timeout 
    241 
    242        buf = io.StringIO() 
    243        generator = Generator(buf, False, 0) 
    244        generator.flatten(multi) 
    245        payload = buf.getvalue() 
    246 
    247        # Strip off redundant header text 
    248        _, body = payload.split("\n\n", 1) 
    249        return dict(multi._headers), body, timeout 
    250 
    251    def _finish_futures(self, responses, raise_exception=True): 
    252        """Apply all the batch responses to the futures created. 
    253 
    254        :type responses: list of (headers, payload) tuples. 
    255        :param responses: List of headers and payloads from each response in 
    256                          the batch. 
    257 
    258        :type raise_exception: bool 
    259        :param raise_exception: 
    260            (Optional) Defaults to True. If True, instead of adding exceptions 
    261            to the list of return responses, the final exception will be raised. 
    262            Note that exceptions are unwrapped after all operations are complete 
    263            in success or failure, and only the last exception is raised. 
    264 
    265        :raises: :class:`ValueError` if no requests have been deferred. 
    266        """ 
    267        # If a bad status occurs, we track it, but don't raise an exception 
    268        # until all futures have been populated. 
    269        # If raise_exception=False, we add exceptions to the list of responses. 
    270        exception_args = None 
    271 
    272        if len(self._target_objects) != len(responses):  # pragma: NO COVER 
    273            raise ValueError("Expected a response for every request.") 
    274 
    275        for target_object, subresponse in zip(self._target_objects, responses): 
    276            # For backwards compatibility, only the final exception will be raised. 
    277            # Set raise_exception=False to include all exceptions to the list of return responses. 
    278            if not 200 <= subresponse.status_code < 300 and raise_exception: 
    279                exception_args = exception_args or subresponse 
    280            elif target_object is not None: 
    281                try: 
    282                    target_object._properties = subresponse.json() 
    283                except ValueError: 
    284                    target_object._properties = subresponse.content 
    285 
    286        if exception_args is not None: 
    287            raise exceptions.from_http_response(exception_args) 
    288 
    289    def finish(self, raise_exception=True): 
    290        """Submit a single `multipart/mixed` request with deferred requests. 
    291 
    292        :type raise_exception: bool 
    293        :param raise_exception: 
    294            (Optional) Defaults to True. If True, instead of adding exceptions 
    295            to the list of return responses, the final exception will be raised. 
    296            Note that exceptions are unwrapped after all operations are complete 
    297            in success or failure, and only the last exception is raised. 
    298 
    299        :rtype: list of tuples 
    300        :returns: one ``(headers, payload)`` tuple per deferred request. 
    301        """ 
    302        headers, body, timeout = self._prepare_batch_request() 
    303 
    304        url = f"{self.API_BASE_URL}/batch/storage/v1" 
    305 
    306        # Use the private ``_base_connection`` rather than the property 
    307        # ``_connection``, since the property may be this 
    308        # current batch. 
    309        response = self._client._base_connection._make_request( 
    310            "POST", url, data=body, headers=headers, timeout=timeout 
    311        ) 
    312 
    313        # Raise exception if the top-level batch request fails 
    314        if not 200 <= response.status_code < 300: 
    315            raise exceptions.from_http_response(response) 
    316 
    317        responses = list(_unpack_batch_response(response)) 
    318        self._finish_futures(responses, raise_exception=raise_exception) 
    319        self._responses = responses 
    320        return responses 
    321 
    322    def current(self): 
    323        """Return the topmost batch, or None.""" 
    324        return self._client.current_batch 
    325 
    326    def __enter__(self): 
    327        self._client._push_batch(self) 
    328        return self 
    329 
    330    def __exit__(self, exc_type, exc_val, exc_tb): 
    331        try: 
    332            if exc_type is None: 
    333                self.finish(raise_exception=self._raise_exception) 
    334        finally: 
    335            self._client._pop_batch() 
    336 
    337 
    338def _generate_faux_mime_message(parser, response): 
    339    """Convert response, content -> (multipart) email.message. 
    340 
    341    Helper for _unpack_batch_response. 
    342    """ 
    343    # We coerce to bytes to get consistent concat across 
    344    # Py2 and Py3. Percent formatting is insufficient since 
    345    # it includes the b in Py3. 
    346    content_type = _helpers._to_bytes(response.headers.get("content-type", "")) 
    347 
    348    faux_message = b"".join( 
    349        [b"Content-Type: ", content_type, b"\nMIME-Version: 1.0\n\n", response.content] 
    350    ) 
    351 
    352    return parser.parsestr(faux_message.decode("utf-8")) 
    353 
    354 
    355def _unpack_batch_response(response): 
    356    """Convert requests.Response -> [(headers, payload)]. 
    357 
    358    Creates a generator of tuples of emulating the responses to 
    359    :meth:`requests.Session.request`. 
    360 
    361    :type response: :class:`requests.Response` 
    362    :param response: HTTP response / headers from a request. 
    363    """ 
    364    parser = Parser() 
    365    message = _generate_faux_mime_message(parser, response) 
    366 
    367    if not isinstance(message._payload, list):  # pragma: NO COVER 
    368        raise ValueError("Bad response:  not multi-part") 
    369 
    370    for subrequest in message._payload: 
    371        status_line, rest = subrequest._payload.split("\n", 1) 
    372        _, status, _ = status_line.split(" ", 2) 
    373        sub_message = parser.parsestr(rest) 
    374        payload = sub_message._payload 
    375        msg_headers = dict(sub_message._headers) 
    376        content_id = msg_headers.get("Content-ID") 
    377 
    378        subresponse = requests.Response() 
    379        subresponse.request = requests.Request( 
    380            method="BATCH", url=f"contentid://{content_id}" 
    381        ).prepare() 
    382        subresponse.status_code = int(status) 
    383        subresponse.headers.update(msg_headers) 
    384        subresponse._content = payload.encode("utf-8") 
    385 
    386        yield subresponse