Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/storage/batch.py: 28%

131 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:17 +0000

1# Copyright 2014 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Batch updates / deletes of storage buckets / blobs. 

15 

16A batch request is a single standard HTTP request containing multiple Cloud Storage JSON API calls. 

17Within this main HTTP request, there are multiple parts which each contain a nested HTTP request. 

18The body of each part is itself a complete HTTP request, with its own verb, URL, headers, and body. 

19 

20Note that Cloud Storage does not support batch operations for uploading or downloading. 

21Additionally, the current batch design does not support library methods whose return values 

22depend on the response payload. See more details in the [Sending Batch Requests official guide](https://cloud.google.com/storage/docs/batch). 

23 

24Examples of situations when you might want to use the Batch module: 

25``blob.patch()`` 

26``blob.update()`` 

27``blob.delete()`` 

28``bucket.delete_blob()`` 

29``bucket.patch()`` 

30``bucket.update()`` 

31""" 

32from email.encoders import encode_noop 

33from email.generator import Generator 

34from email.mime.application import MIMEApplication 

35from email.mime.multipart import MIMEMultipart 

36from email.parser import Parser 

37import io 

38import json 

39 

40import requests 

41 

42from google.cloud import _helpers 

43from google.cloud import exceptions 

44from google.cloud.storage._http import Connection 

45from google.cloud.storage.constants import _DEFAULT_TIMEOUT 

46 

47 

48class MIMEApplicationHTTP(MIMEApplication): 

49 """MIME type for ``application/http``. 

50 

51 Constructs payload from headers and body 

52 

53 :type method: str 

54 :param method: HTTP method 

55 

56 :type uri: str 

57 :param uri: URI for HTTP request 

58 

59 :type headers: dict 

60 :param headers: HTTP headers 

61 

62 :type body: str 

63 :param body: (Optional) HTTP payload 

64 

65 """ 

66 

67 def __init__(self, method, uri, headers, body): 

68 if isinstance(body, dict): 

69 body = json.dumps(body) 

70 headers["Content-Type"] = "application/json" 

71 headers["Content-Length"] = len(body) 

72 if body is None: 

73 body = "" 

74 lines = [f"{method} {uri} HTTP/1.1"] 

75 lines.extend([f"{key}: {value}" for key, value in sorted(headers.items())]) 

76 lines.append("") 

77 lines.append(body) 

78 payload = "\r\n".join(lines) 

79 super().__init__(payload, "http", encode_noop) 

80 

81 

82class _FutureDict(object): 

83 """Class to hold a future value for a deferred request. 

84 

85 Used by for requests that get sent in a :class:`Batch`. 

86 """ 

87 

88 @staticmethod 

89 def get(key, default=None): 

90 """Stand-in for dict.get. 

91 

92 :type key: object 

93 :param key: Hashable dictionary key. 

94 

95 :type default: object 

96 :param default: Fallback value to dict.get. 

97 

98 :raises: :class:`KeyError` always since the future is intended to fail 

99 as a dictionary. 

100 """ 

101 raise KeyError(f"Cannot get({key!r}, default={default!r}) on a future") 

102 

103 def __getitem__(self, key): 

104 """Stand-in for dict[key]. 

105 

106 :type key: object 

107 :param key: Hashable dictionary key. 

108 

109 :raises: :class:`KeyError` always since the future is intended to fail 

110 as a dictionary. 

111 """ 

112 raise KeyError(f"Cannot get item {key!r} from a future") 

113 

114 def __setitem__(self, key, value): 

115 """Stand-in for dict[key] = value. 

116 

117 :type key: object 

118 :param key: Hashable dictionary key. 

119 

120 :type value: object 

121 :param value: Dictionary value. 

122 

123 :raises: :class:`KeyError` always since the future is intended to fail 

124 as a dictionary. 

125 """ 

126 raise KeyError(f"Cannot set {key!r} -> {value!r} on a future") 

127 

128 

129class _FutureResponse(requests.Response): 

130 """Reponse that returns a placeholder dictionary for a batched requests.""" 

131 

132 def __init__(self, future_dict): 

133 super(_FutureResponse, self).__init__() 

134 self._future_dict = future_dict 

135 self.status_code = 204 

136 

137 def json(self): 

138 return self._future_dict 

139 

140 @property 

141 def content(self): 

142 return self._future_dict 

143 

144 

145class Batch(Connection): 

146 """Proxy an underlying connection, batching up change operations. 

147 

148 .. warning:: 

149 

150 Cloud Storage does not support batch operations for uploading or downloading. 

151 Additionally, the current batch design does not support library methods whose 

152 return values depend on the response payload. 

153 

154 :type client: :class:`google.cloud.storage.client.Client` 

155 :param client: The client to use for making connections. 

156 

157 :type raise_exception: bool 

158 :param raise_exception: 

159 (Optional) Defaults to True. If True, instead of adding exceptions 

160 to the list of return responses, the final exception will be raised. 

161 Note that exceptions are unwrapped after all operations are complete 

162 in success or failure, and only the last exception is raised. 

163 """ 

164 

165 _MAX_BATCH_SIZE = 1000 

166 

167 def __init__(self, client, raise_exception=True): 

168 api_endpoint = client._connection.API_BASE_URL 

169 client_info = client._connection._client_info 

170 super(Batch, self).__init__( 

171 client, client_info=client_info, api_endpoint=api_endpoint 

172 ) 

173 self._requests = [] 

174 self._target_objects = [] 

175 self._responses = [] 

176 self._raise_exception = raise_exception 

177 

178 def _do_request( 

179 self, method, url, headers, data, target_object, timeout=_DEFAULT_TIMEOUT 

180 ): 

181 """Override Connection: defer actual HTTP request. 

182 

183 Only allow up to ``_MAX_BATCH_SIZE`` requests to be deferred. 

184 

185 :type method: str 

186 :param method: The HTTP method to use in the request. 

187 

188 :type url: str 

189 :param url: The URL to send the request to. 

190 

191 :type headers: dict 

192 :param headers: A dictionary of HTTP headers to send with the request. 

193 

194 :type data: str 

195 :param data: The data to send as the body of the request. 

196 

197 :type target_object: object 

198 :param target_object: 

199 (Optional) This allows us to enable custom behavior in our batch 

200 connection. Here we defer an HTTP request and complete 

201 initialization of the object at a later time. 

202 

203 :type timeout: float or tuple 

204 :param timeout: 

205 (Optional) The amount of time, in seconds, to wait 

206 for the server response. See: :ref:`configuring_timeouts` 

207 

208 :rtype: tuple of ``response`` (a dictionary of sorts) 

209 and ``content`` (a string). 

210 :returns: The HTTP response object and the content of the response. 

211 """ 

212 if len(self._requests) >= self._MAX_BATCH_SIZE: 

213 raise ValueError( 

214 "Too many deferred requests (max %d)" % self._MAX_BATCH_SIZE 

215 ) 

216 self._requests.append((method, url, headers, data, timeout)) 

217 result = _FutureDict() 

218 self._target_objects.append(target_object) 

219 if target_object is not None: 

220 target_object._properties = result 

221 return _FutureResponse(result) 

222 

223 def _prepare_batch_request(self): 

224 """Prepares headers and body for a batch request. 

225 

226 :rtype: tuple (dict, str) 

227 :returns: The pair of headers and body of the batch request to be sent. 

228 :raises: :class:`ValueError` if no requests have been deferred. 

229 """ 

230 if len(self._requests) == 0: 

231 raise ValueError("No deferred requests") 

232 

233 multi = MIMEMultipart() 

234 

235 # Use timeout of last request, default to _DEFAULT_TIMEOUT 

236 timeout = _DEFAULT_TIMEOUT 

237 for method, uri, headers, body, _timeout in self._requests: 

238 subrequest = MIMEApplicationHTTP(method, uri, headers, body) 

239 multi.attach(subrequest) 

240 timeout = _timeout 

241 

242 buf = io.StringIO() 

243 generator = Generator(buf, False, 0) 

244 generator.flatten(multi) 

245 payload = buf.getvalue() 

246 

247 # Strip off redundant header text 

248 _, body = payload.split("\n\n", 1) 

249 return dict(multi._headers), body, timeout 

250 

251 def _finish_futures(self, responses, raise_exception=True): 

252 """Apply all the batch responses to the futures created. 

253 

254 :type responses: list of (headers, payload) tuples. 

255 :param responses: List of headers and payloads from each response in 

256 the batch. 

257 

258 :type raise_exception: bool 

259 :param raise_exception: 

260 (Optional) Defaults to True. If True, instead of adding exceptions 

261 to the list of return responses, the final exception will be raised. 

262 Note that exceptions are unwrapped after all operations are complete 

263 in success or failure, and only the last exception is raised. 

264 

265 :raises: :class:`ValueError` if no requests have been deferred. 

266 """ 

267 # If a bad status occurs, we track it, but don't raise an exception 

268 # until all futures have been populated. 

269 # If raise_exception=False, we add exceptions to the list of responses. 

270 exception_args = None 

271 

272 if len(self._target_objects) != len(responses): # pragma: NO COVER 

273 raise ValueError("Expected a response for every request.") 

274 

275 for target_object, subresponse in zip(self._target_objects, responses): 

276 # For backwards compatibility, only the final exception will be raised. 

277 # Set raise_exception=False to include all exceptions to the list of return responses. 

278 if not 200 <= subresponse.status_code < 300 and raise_exception: 

279 exception_args = exception_args or subresponse 

280 elif target_object is not None: 

281 try: 

282 target_object._properties = subresponse.json() 

283 except ValueError: 

284 target_object._properties = subresponse.content 

285 

286 if exception_args is not None: 

287 raise exceptions.from_http_response(exception_args) 

288 

289 def finish(self, raise_exception=True): 

290 """Submit a single `multipart/mixed` request with deferred requests. 

291 

292 :type raise_exception: bool 

293 :param raise_exception: 

294 (Optional) Defaults to True. If True, instead of adding exceptions 

295 to the list of return responses, the final exception will be raised. 

296 Note that exceptions are unwrapped after all operations are complete 

297 in success or failure, and only the last exception is raised. 

298 

299 :rtype: list of tuples 

300 :returns: one ``(headers, payload)`` tuple per deferred request. 

301 """ 

302 headers, body, timeout = self._prepare_batch_request() 

303 

304 url = f"{self.API_BASE_URL}/batch/storage/v1" 

305 

306 # Use the private ``_base_connection`` rather than the property 

307 # ``_connection``, since the property may be this 

308 # current batch. 

309 response = self._client._base_connection._make_request( 

310 "POST", url, data=body, headers=headers, timeout=timeout 

311 ) 

312 

313 # Raise exception if the top-level batch request fails 

314 if not 200 <= response.status_code < 300: 

315 raise exceptions.from_http_response(response) 

316 

317 responses = list(_unpack_batch_response(response)) 

318 self._finish_futures(responses, raise_exception=raise_exception) 

319 self._responses = responses 

320 return responses 

321 

322 def current(self): 

323 """Return the topmost batch, or None.""" 

324 return self._client.current_batch 

325 

326 def __enter__(self): 

327 self._client._push_batch(self) 

328 return self 

329 

330 def __exit__(self, exc_type, exc_val, exc_tb): 

331 try: 

332 if exc_type is None: 

333 self.finish(raise_exception=self._raise_exception) 

334 finally: 

335 self._client._pop_batch() 

336 

337 

338def _generate_faux_mime_message(parser, response): 

339 """Convert response, content -> (multipart) email.message. 

340 

341 Helper for _unpack_batch_response. 

342 """ 

343 # We coerce to bytes to get consistent concat across 

344 # Py2 and Py3. Percent formatting is insufficient since 

345 # it includes the b in Py3. 

346 content_type = _helpers._to_bytes(response.headers.get("content-type", "")) 

347 

348 faux_message = b"".join( 

349 [b"Content-Type: ", content_type, b"\nMIME-Version: 1.0\n\n", response.content] 

350 ) 

351 

352 return parser.parsestr(faux_message.decode("utf-8")) 

353 

354 

355def _unpack_batch_response(response): 

356 """Convert requests.Response -> [(headers, payload)]. 

357 

358 Creates a generator of tuples of emulating the responses to 

359 :meth:`requests.Session.request`. 

360 

361 :type response: :class:`requests.Response` 

362 :param response: HTTP response / headers from a request. 

363 """ 

364 parser = Parser() 

365 message = _generate_faux_mime_message(parser, response) 

366 

367 if not isinstance(message._payload, list): # pragma: NO COVER 

368 raise ValueError("Bad response: not multi-part") 

369 

370 for subrequest in message._payload: 

371 status_line, rest = subrequest._payload.split("\n", 1) 

372 _, status, _ = status_line.split(" ", 2) 

373 sub_message = parser.parsestr(rest) 

374 payload = sub_message._payload 

375 msg_headers = dict(sub_message._headers) 

376 content_id = msg_headers.get("Content-ID") 

377 

378 subresponse = requests.Response() 

379 subresponse.request = requests.Request( 

380 method="BATCH", url=f"contentid://{content_id}" 

381 ).prepare() 

382 subresponse.status_code = int(status) 

383 subresponse.headers.update(msg_headers) 

384 subresponse._content = payload.encode("utf-8") 

385 

386 yield subresponse