1# Copyright 2014 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Batch updates / deletes of storage buckets / blobs.
15
16A batch request is a single standard HTTP request containing multiple Cloud Storage JSON API calls.
17Within this main HTTP request, there are multiple parts which each contain a nested HTTP request.
18The body of each part is itself a complete HTTP request, with its own verb, URL, headers, and body.
19
20Note that Cloud Storage does not support batch operations for uploading or downloading.
21Additionally, the current batch design does not support library methods whose return values
22depend on the response payload. See more details in the [Sending Batch Requests official guide](https://cloud.google.com/storage/docs/batch).
23
24Examples of situations when you might want to use the Batch module:
25``blob.patch()``
26``blob.update()``
27``blob.delete()``
28``bucket.delete_blob()``
29``bucket.patch()``
30``bucket.update()``
31"""
32from email.encoders import encode_noop
33from email.generator import Generator
34from email.mime.application import MIMEApplication
35from email.mime.multipart import MIMEMultipart
36from email.parser import Parser
37import io
38import json
39
40import requests
41
42from google.cloud import _helpers
43from google.cloud import exceptions
44from google.cloud.storage._http import Connection
45from google.cloud.storage.constants import _DEFAULT_TIMEOUT
46
47
48class MIMEApplicationHTTP(MIMEApplication):
49 """MIME type for ``application/http``.
50
51 Constructs payload from headers and body
52
53 :type method: str
54 :param method: HTTP method
55
56 :type uri: str
57 :param uri: URI for HTTP request
58
59 :type headers: dict
60 :param headers: HTTP headers
61
62 :type body: str
63 :param body: (Optional) HTTP payload
64
65 """
66
67 def __init__(self, method, uri, headers, body):
68 if isinstance(body, dict):
69 body = json.dumps(body)
70 headers["Content-Type"] = "application/json"
71 headers["Content-Length"] = len(body)
72 if body is None:
73 body = ""
74 lines = [f"{method} {uri} HTTP/1.1"]
75 lines.extend([f"{key}: {value}" for key, value in sorted(headers.items())])
76 lines.append("")
77 lines.append(body)
78 payload = "\r\n".join(lines)
79 super().__init__(payload, "http", encode_noop)
80
81
82class _FutureDict(object):
83 """Class to hold a future value for a deferred request.
84
85 Used by for requests that get sent in a :class:`Batch`.
86 """
87
88 @staticmethod
89 def get(key, default=None):
90 """Stand-in for dict.get.
91
92 :type key: object
93 :param key: Hashable dictionary key.
94
95 :type default: object
96 :param default: Fallback value to dict.get.
97
98 :raises: :class:`KeyError` always since the future is intended to fail
99 as a dictionary.
100 """
101 raise KeyError(f"Cannot get({key!r}, default={default!r}) on a future")
102
103 def __getitem__(self, key):
104 """Stand-in for dict[key].
105
106 :type key: object
107 :param key: Hashable dictionary key.
108
109 :raises: :class:`KeyError` always since the future is intended to fail
110 as a dictionary.
111 """
112 raise KeyError(f"Cannot get item {key!r} from a future")
113
114 def __setitem__(self, key, value):
115 """Stand-in for dict[key] = value.
116
117 :type key: object
118 :param key: Hashable dictionary key.
119
120 :type value: object
121 :param value: Dictionary value.
122
123 :raises: :class:`KeyError` always since the future is intended to fail
124 as a dictionary.
125 """
126 raise KeyError(f"Cannot set {key!r} -> {value!r} on a future")
127
128
129class _FutureResponse(requests.Response):
130 """Reponse that returns a placeholder dictionary for a batched requests."""
131
132 def __init__(self, future_dict):
133 super(_FutureResponse, self).__init__()
134 self._future_dict = future_dict
135 self.status_code = 204
136
137 def json(self):
138 return self._future_dict
139
140 @property
141 def content(self):
142 return self._future_dict
143
144
145class Batch(Connection):
146 """Proxy an underlying connection, batching up change operations.
147
148 .. warning::
149
150 Cloud Storage does not support batch operations for uploading or downloading.
151 Additionally, the current batch design does not support library methods whose
152 return values depend on the response payload.
153
154 :type client: :class:`google.cloud.storage.client.Client`
155 :param client: The client to use for making connections.
156
157 :type raise_exception: bool
158 :param raise_exception:
159 (Optional) Defaults to True. If True, instead of adding exceptions
160 to the list of return responses, the final exception will be raised.
161 Note that exceptions are unwrapped after all operations are complete
162 in success or failure, and only the last exception is raised.
163 """
164
165 _MAX_BATCH_SIZE = 1000
166
167 def __init__(self, client, raise_exception=True):
168 api_endpoint = client._connection.API_BASE_URL
169 client_info = client._connection._client_info
170 super(Batch, self).__init__(
171 client, client_info=client_info, api_endpoint=api_endpoint
172 )
173 self._requests = []
174 self._target_objects = []
175 self._responses = []
176 self._raise_exception = raise_exception
177
178 def _do_request(
179 self, method, url, headers, data, target_object, timeout=_DEFAULT_TIMEOUT
180 ):
181 """Override Connection: defer actual HTTP request.
182
183 Only allow up to ``_MAX_BATCH_SIZE`` requests to be deferred.
184
185 :type method: str
186 :param method: The HTTP method to use in the request.
187
188 :type url: str
189 :param url: The URL to send the request to.
190
191 :type headers: dict
192 :param headers: A dictionary of HTTP headers to send with the request.
193
194 :type data: str
195 :param data: The data to send as the body of the request.
196
197 :type target_object: object
198 :param target_object:
199 (Optional) This allows us to enable custom behavior in our batch
200 connection. Here we defer an HTTP request and complete
201 initialization of the object at a later time.
202
203 :type timeout: float or tuple
204 :param timeout:
205 (Optional) The amount of time, in seconds, to wait
206 for the server response. See: :ref:`configuring_timeouts`
207
208 :rtype: tuple of ``response`` (a dictionary of sorts)
209 and ``content`` (a string).
210 :returns: The HTTP response object and the content of the response.
211 """
212 if len(self._requests) >= self._MAX_BATCH_SIZE:
213 raise ValueError(
214 "Too many deferred requests (max %d)" % self._MAX_BATCH_SIZE
215 )
216 self._requests.append((method, url, headers, data, timeout))
217 result = _FutureDict()
218 self._target_objects.append(target_object)
219 if target_object is not None:
220 target_object._properties = result
221 return _FutureResponse(result)
222
223 def _prepare_batch_request(self):
224 """Prepares headers and body for a batch request.
225
226 :rtype: tuple (dict, str)
227 :returns: The pair of headers and body of the batch request to be sent.
228 :raises: :class:`ValueError` if no requests have been deferred.
229 """
230 if len(self._requests) == 0:
231 raise ValueError("No deferred requests")
232
233 multi = MIMEMultipart()
234
235 # Use timeout of last request, default to _DEFAULT_TIMEOUT
236 timeout = _DEFAULT_TIMEOUT
237 for method, uri, headers, body, _timeout in self._requests:
238 subrequest = MIMEApplicationHTTP(method, uri, headers, body)
239 multi.attach(subrequest)
240 timeout = _timeout
241
242 buf = io.StringIO()
243 generator = Generator(buf, False, 0)
244 generator.flatten(multi)
245 payload = buf.getvalue()
246
247 # Strip off redundant header text
248 _, body = payload.split("\n\n", 1)
249 return dict(multi._headers), body, timeout
250
251 def _finish_futures(self, responses, raise_exception=True):
252 """Apply all the batch responses to the futures created.
253
254 :type responses: list of (headers, payload) tuples.
255 :param responses: List of headers and payloads from each response in
256 the batch.
257
258 :type raise_exception: bool
259 :param raise_exception:
260 (Optional) Defaults to True. If True, instead of adding exceptions
261 to the list of return responses, the final exception will be raised.
262 Note that exceptions are unwrapped after all operations are complete
263 in success or failure, and only the last exception is raised.
264
265 :raises: :class:`ValueError` if no requests have been deferred.
266 """
267 # If a bad status occurs, we track it, but don't raise an exception
268 # until all futures have been populated.
269 # If raise_exception=False, we add exceptions to the list of responses.
270 exception_args = None
271
272 if len(self._target_objects) != len(responses): # pragma: NO COVER
273 raise ValueError("Expected a response for every request.")
274
275 for target_object, subresponse in zip(self._target_objects, responses):
276 # For backwards compatibility, only the final exception will be raised.
277 # Set raise_exception=False to include all exceptions to the list of return responses.
278 if not 200 <= subresponse.status_code < 300 and raise_exception:
279 exception_args = exception_args or subresponse
280 elif target_object is not None:
281 try:
282 target_object._properties = subresponse.json()
283 except ValueError:
284 target_object._properties = subresponse.content
285
286 if exception_args is not None:
287 raise exceptions.from_http_response(exception_args)
288
289 def finish(self, raise_exception=True):
290 """Submit a single `multipart/mixed` request with deferred requests.
291
292 :type raise_exception: bool
293 :param raise_exception:
294 (Optional) Defaults to True. If True, instead of adding exceptions
295 to the list of return responses, the final exception will be raised.
296 Note that exceptions are unwrapped after all operations are complete
297 in success or failure, and only the last exception is raised.
298
299 :rtype: list of tuples
300 :returns: one ``(headers, payload)`` tuple per deferred request.
301 """
302 headers, body, timeout = self._prepare_batch_request()
303
304 url = f"{self.API_BASE_URL}/batch/storage/v1"
305
306 # Use the private ``_base_connection`` rather than the property
307 # ``_connection``, since the property may be this
308 # current batch.
309 response = self._client._base_connection._make_request(
310 "POST", url, data=body, headers=headers, timeout=timeout
311 )
312
313 # Raise exception if the top-level batch request fails
314 if not 200 <= response.status_code < 300:
315 raise exceptions.from_http_response(response)
316
317 responses = list(_unpack_batch_response(response))
318 self._finish_futures(responses, raise_exception=raise_exception)
319 self._responses = responses
320 return responses
321
322 def current(self):
323 """Return the topmost batch, or None."""
324 return self._client.current_batch
325
326 def __enter__(self):
327 self._client._push_batch(self)
328 return self
329
330 def __exit__(self, exc_type, exc_val, exc_tb):
331 try:
332 if exc_type is None:
333 self.finish(raise_exception=self._raise_exception)
334 finally:
335 self._client._pop_batch()
336
337
338def _generate_faux_mime_message(parser, response):
339 """Convert response, content -> (multipart) email.message.
340
341 Helper for _unpack_batch_response.
342 """
343 # We coerce to bytes to get consistent concat across
344 # Py2 and Py3. Percent formatting is insufficient since
345 # it includes the b in Py3.
346 content_type = _helpers._to_bytes(response.headers.get("content-type", ""))
347
348 faux_message = b"".join(
349 [b"Content-Type: ", content_type, b"\nMIME-Version: 1.0\n\n", response.content]
350 )
351
352 return parser.parsestr(faux_message.decode("utf-8"))
353
354
355def _unpack_batch_response(response):
356 """Convert requests.Response -> [(headers, payload)].
357
358 Creates a generator of tuples of emulating the responses to
359 :meth:`requests.Session.request`.
360
361 :type response: :class:`requests.Response`
362 :param response: HTTP response / headers from a request.
363 """
364 parser = Parser()
365 message = _generate_faux_mime_message(parser, response)
366
367 if not isinstance(message._payload, list): # pragma: NO COVER
368 raise ValueError("Bad response: not multi-part")
369
370 for subrequest in message._payload:
371 status_line, rest = subrequest._payload.split("\n", 1)
372 _, status, _ = status_line.split(" ", 2)
373 sub_message = parser.parsestr(rest)
374 payload = sub_message._payload
375 msg_headers = dict(sub_message._headers)
376 content_id = msg_headers.get("Content-ID")
377
378 subresponse = requests.Response()
379 subresponse.request = requests.Request(
380 method="BATCH", url=f"contentid://{content_id}"
381 ).prepare()
382 subresponse.status_code = int(status)
383 subresponse.headers.update(msg_headers)
384 subresponse._content = payload.encode("utf-8")
385
386 yield subresponse