Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/storage/batch.py: 28%
131 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:17 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:17 +0000
1# Copyright 2014 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Batch updates / deletes of storage buckets / blobs.
16A batch request is a single standard HTTP request containing multiple Cloud Storage JSON API calls.
17Within this main HTTP request, there are multiple parts which each contain a nested HTTP request.
18The body of each part is itself a complete HTTP request, with its own verb, URL, headers, and body.
20Note that Cloud Storage does not support batch operations for uploading or downloading.
21Additionally, the current batch design does not support library methods whose return values
22depend on the response payload. See more details in the [Sending Batch Requests official guide](https://cloud.google.com/storage/docs/batch).
24Examples of situations when you might want to use the Batch module:
25``blob.patch()``
26``blob.update()``
27``blob.delete()``
28``bucket.delete_blob()``
29``bucket.patch()``
30``bucket.update()``
31"""
32from email.encoders import encode_noop
33from email.generator import Generator
34from email.mime.application import MIMEApplication
35from email.mime.multipart import MIMEMultipart
36from email.parser import Parser
37import io
38import json
40import requests
42from google.cloud import _helpers
43from google.cloud import exceptions
44from google.cloud.storage._http import Connection
45from google.cloud.storage.constants import _DEFAULT_TIMEOUT
48class MIMEApplicationHTTP(MIMEApplication):
49 """MIME type for ``application/http``.
51 Constructs payload from headers and body
53 :type method: str
54 :param method: HTTP method
56 :type uri: str
57 :param uri: URI for HTTP request
59 :type headers: dict
60 :param headers: HTTP headers
62 :type body: str
63 :param body: (Optional) HTTP payload
65 """
67 def __init__(self, method, uri, headers, body):
68 if isinstance(body, dict):
69 body = json.dumps(body)
70 headers["Content-Type"] = "application/json"
71 headers["Content-Length"] = len(body)
72 if body is None:
73 body = ""
74 lines = [f"{method} {uri} HTTP/1.1"]
75 lines.extend([f"{key}: {value}" for key, value in sorted(headers.items())])
76 lines.append("")
77 lines.append(body)
78 payload = "\r\n".join(lines)
79 super().__init__(payload, "http", encode_noop)
82class _FutureDict(object):
83 """Class to hold a future value for a deferred request.
85 Used by for requests that get sent in a :class:`Batch`.
86 """
88 @staticmethod
89 def get(key, default=None):
90 """Stand-in for dict.get.
92 :type key: object
93 :param key: Hashable dictionary key.
95 :type default: object
96 :param default: Fallback value to dict.get.
98 :raises: :class:`KeyError` always since the future is intended to fail
99 as a dictionary.
100 """
101 raise KeyError(f"Cannot get({key!r}, default={default!r}) on a future")
103 def __getitem__(self, key):
104 """Stand-in for dict[key].
106 :type key: object
107 :param key: Hashable dictionary key.
109 :raises: :class:`KeyError` always since the future is intended to fail
110 as a dictionary.
111 """
112 raise KeyError(f"Cannot get item {key!r} from a future")
114 def __setitem__(self, key, value):
115 """Stand-in for dict[key] = value.
117 :type key: object
118 :param key: Hashable dictionary key.
120 :type value: object
121 :param value: Dictionary value.
123 :raises: :class:`KeyError` always since the future is intended to fail
124 as a dictionary.
125 """
126 raise KeyError(f"Cannot set {key!r} -> {value!r} on a future")
129class _FutureResponse(requests.Response):
130 """Reponse that returns a placeholder dictionary for a batched requests."""
132 def __init__(self, future_dict):
133 super(_FutureResponse, self).__init__()
134 self._future_dict = future_dict
135 self.status_code = 204
137 def json(self):
138 return self._future_dict
140 @property
141 def content(self):
142 return self._future_dict
145class Batch(Connection):
146 """Proxy an underlying connection, batching up change operations.
148 .. warning::
150 Cloud Storage does not support batch operations for uploading or downloading.
151 Additionally, the current batch design does not support library methods whose
152 return values depend on the response payload.
154 :type client: :class:`google.cloud.storage.client.Client`
155 :param client: The client to use for making connections.
157 :type raise_exception: bool
158 :param raise_exception:
159 (Optional) Defaults to True. If True, instead of adding exceptions
160 to the list of return responses, the final exception will be raised.
161 Note that exceptions are unwrapped after all operations are complete
162 in success or failure, and only the last exception is raised.
163 """
165 _MAX_BATCH_SIZE = 1000
167 def __init__(self, client, raise_exception=True):
168 api_endpoint = client._connection.API_BASE_URL
169 client_info = client._connection._client_info
170 super(Batch, self).__init__(
171 client, client_info=client_info, api_endpoint=api_endpoint
172 )
173 self._requests = []
174 self._target_objects = []
175 self._responses = []
176 self._raise_exception = raise_exception
178 def _do_request(
179 self, method, url, headers, data, target_object, timeout=_DEFAULT_TIMEOUT
180 ):
181 """Override Connection: defer actual HTTP request.
183 Only allow up to ``_MAX_BATCH_SIZE`` requests to be deferred.
185 :type method: str
186 :param method: The HTTP method to use in the request.
188 :type url: str
189 :param url: The URL to send the request to.
191 :type headers: dict
192 :param headers: A dictionary of HTTP headers to send with the request.
194 :type data: str
195 :param data: The data to send as the body of the request.
197 :type target_object: object
198 :param target_object:
199 (Optional) This allows us to enable custom behavior in our batch
200 connection. Here we defer an HTTP request and complete
201 initialization of the object at a later time.
203 :type timeout: float or tuple
204 :param timeout:
205 (Optional) The amount of time, in seconds, to wait
206 for the server response. See: :ref:`configuring_timeouts`
208 :rtype: tuple of ``response`` (a dictionary of sorts)
209 and ``content`` (a string).
210 :returns: The HTTP response object and the content of the response.
211 """
212 if len(self._requests) >= self._MAX_BATCH_SIZE:
213 raise ValueError(
214 "Too many deferred requests (max %d)" % self._MAX_BATCH_SIZE
215 )
216 self._requests.append((method, url, headers, data, timeout))
217 result = _FutureDict()
218 self._target_objects.append(target_object)
219 if target_object is not None:
220 target_object._properties = result
221 return _FutureResponse(result)
223 def _prepare_batch_request(self):
224 """Prepares headers and body for a batch request.
226 :rtype: tuple (dict, str)
227 :returns: The pair of headers and body of the batch request to be sent.
228 :raises: :class:`ValueError` if no requests have been deferred.
229 """
230 if len(self._requests) == 0:
231 raise ValueError("No deferred requests")
233 multi = MIMEMultipart()
235 # Use timeout of last request, default to _DEFAULT_TIMEOUT
236 timeout = _DEFAULT_TIMEOUT
237 for method, uri, headers, body, _timeout in self._requests:
238 subrequest = MIMEApplicationHTTP(method, uri, headers, body)
239 multi.attach(subrequest)
240 timeout = _timeout
242 buf = io.StringIO()
243 generator = Generator(buf, False, 0)
244 generator.flatten(multi)
245 payload = buf.getvalue()
247 # Strip off redundant header text
248 _, body = payload.split("\n\n", 1)
249 return dict(multi._headers), body, timeout
251 def _finish_futures(self, responses, raise_exception=True):
252 """Apply all the batch responses to the futures created.
254 :type responses: list of (headers, payload) tuples.
255 :param responses: List of headers and payloads from each response in
256 the batch.
258 :type raise_exception: bool
259 :param raise_exception:
260 (Optional) Defaults to True. If True, instead of adding exceptions
261 to the list of return responses, the final exception will be raised.
262 Note that exceptions are unwrapped after all operations are complete
263 in success or failure, and only the last exception is raised.
265 :raises: :class:`ValueError` if no requests have been deferred.
266 """
267 # If a bad status occurs, we track it, but don't raise an exception
268 # until all futures have been populated.
269 # If raise_exception=False, we add exceptions to the list of responses.
270 exception_args = None
272 if len(self._target_objects) != len(responses): # pragma: NO COVER
273 raise ValueError("Expected a response for every request.")
275 for target_object, subresponse in zip(self._target_objects, responses):
276 # For backwards compatibility, only the final exception will be raised.
277 # Set raise_exception=False to include all exceptions to the list of return responses.
278 if not 200 <= subresponse.status_code < 300 and raise_exception:
279 exception_args = exception_args or subresponse
280 elif target_object is not None:
281 try:
282 target_object._properties = subresponse.json()
283 except ValueError:
284 target_object._properties = subresponse.content
286 if exception_args is not None:
287 raise exceptions.from_http_response(exception_args)
289 def finish(self, raise_exception=True):
290 """Submit a single `multipart/mixed` request with deferred requests.
292 :type raise_exception: bool
293 :param raise_exception:
294 (Optional) Defaults to True. If True, instead of adding exceptions
295 to the list of return responses, the final exception will be raised.
296 Note that exceptions are unwrapped after all operations are complete
297 in success or failure, and only the last exception is raised.
299 :rtype: list of tuples
300 :returns: one ``(headers, payload)`` tuple per deferred request.
301 """
302 headers, body, timeout = self._prepare_batch_request()
304 url = f"{self.API_BASE_URL}/batch/storage/v1"
306 # Use the private ``_base_connection`` rather than the property
307 # ``_connection``, since the property may be this
308 # current batch.
309 response = self._client._base_connection._make_request(
310 "POST", url, data=body, headers=headers, timeout=timeout
311 )
313 # Raise exception if the top-level batch request fails
314 if not 200 <= response.status_code < 300:
315 raise exceptions.from_http_response(response)
317 responses = list(_unpack_batch_response(response))
318 self._finish_futures(responses, raise_exception=raise_exception)
319 self._responses = responses
320 return responses
322 def current(self):
323 """Return the topmost batch, or None."""
324 return self._client.current_batch
326 def __enter__(self):
327 self._client._push_batch(self)
328 return self
330 def __exit__(self, exc_type, exc_val, exc_tb):
331 try:
332 if exc_type is None:
333 self.finish(raise_exception=self._raise_exception)
334 finally:
335 self._client._pop_batch()
338def _generate_faux_mime_message(parser, response):
339 """Convert response, content -> (multipart) email.message.
341 Helper for _unpack_batch_response.
342 """
343 # We coerce to bytes to get consistent concat across
344 # Py2 and Py3. Percent formatting is insufficient since
345 # it includes the b in Py3.
346 content_type = _helpers._to_bytes(response.headers.get("content-type", ""))
348 faux_message = b"".join(
349 [b"Content-Type: ", content_type, b"\nMIME-Version: 1.0\n\n", response.content]
350 )
352 return parser.parsestr(faux_message.decode("utf-8"))
355def _unpack_batch_response(response):
356 """Convert requests.Response -> [(headers, payload)].
358 Creates a generator of tuples of emulating the responses to
359 :meth:`requests.Session.request`.
361 :type response: :class:`requests.Response`
362 :param response: HTTP response / headers from a request.
363 """
364 parser = Parser()
365 message = _generate_faux_mime_message(parser, response)
367 if not isinstance(message._payload, list): # pragma: NO COVER
368 raise ValueError("Bad response: not multi-part")
370 for subrequest in message._payload:
371 status_line, rest = subrequest._payload.split("\n", 1)
372 _, status, _ = status_line.split(" ", 2)
373 sub_message = parser.parsestr(rest)
374 payload = sub_message._payload
375 msg_headers = dict(sub_message._headers)
376 content_id = msg_headers.get("Content-ID")
378 subresponse = requests.Response()
379 subresponse.request = requests.Request(
380 method="BATCH", url=f"contentid://{content_id}"
381 ).prepare()
382 subresponse.status_code = int(status)
383 subresponse.headers.update(msg_headers)
384 subresponse._content = payload.encode("utf-8")
386 yield subresponse