Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/operations_v1/transports/rest_asyncio.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

130 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16 

17import json 

18from typing import Any, Callable, Coroutine, Dict, Optional, Sequence, Tuple 

19 

20from google.auth import __version__ as auth_version 

21 

22try: 

23 from google.auth.aio.transport.sessions import AsyncAuthorizedSession # type: ignore 

24except ImportError as e: # pragma: NO COVER 

25 raise ImportError( 

26 "The `async_rest` extra of `google-api-core` is required to use long-running operations. Install it by running " 

27 "`pip install google-api-core[async_rest]`." 

28 ) from e 

29 

30from google.api_core import exceptions as core_exceptions # type: ignore 

31from google.api_core import gapic_v1 # type: ignore 

32from google.api_core import path_template # type: ignore 

33from google.api_core import rest_helpers # type: ignore 

34from google.api_core import retry_async as retries_async # type: ignore 

35from google.auth.aio import credentials as ga_credentials_async # type: ignore 

36from google.longrunning import operations_pb2 # type: ignore 

37from google.protobuf import empty_pb2 # type: ignore 

38from google.protobuf import json_format # type: ignore 

39 

40from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport 

41 

42DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

43 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

44 grpc_version=None, 

45 rest_version=f"google-auth@{auth_version}", 

46) 

47 

48 

49class AsyncOperationsRestTransport(OperationsTransport): 

50 """Asynchronous REST backend transport for Operations. 

51 

52 Manages async long-running operations with an API service. 

53 

54 When an API method normally takes long time to complete, it can be 

55 designed to return [Operation][google.api_core.operations_v1.Operation] to the 

56 client, and the client can use this interface to receive the real 

57 response asynchronously by polling the operation resource, or pass 

58 the operation resource to another API (such as Google Cloud Pub/Sub 

59 API) to receive the response. Any API service that returns 

60 long-running operations should implement the ``Operations`` 

61 interface so developers can have a consistent client experience. 

62 

63 This class defines the same methods as the primary client, so the 

64 primary client can load the underlying transport implementation 

65 and call it. 

66 

67 It sends JSON representations of protocol buffers over HTTP/1.1 

68 """ 

69 

70 def __init__( 

71 self, 

72 *, 

73 host: str = "longrunning.googleapis.com", 

74 credentials: Optional[ga_credentials_async.Credentials] = None, 

75 credentials_file: Optional[str] = None, 

76 scopes: Optional[Sequence[str]] = None, 

77 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

78 quota_project_id: Optional[str] = None, 

79 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

80 always_use_jwt_access: Optional[bool] = False, 

81 url_scheme: str = "https", 

82 http_options: Optional[Dict] = None, 

83 path_prefix: str = "v1", 

84 # TODO(https://github.com/googleapis/python-api-core/issues/715): Add docstring for `credentials_file` to async REST transport. 

85 # TODO(https://github.com/googleapis/python-api-core/issues/716): Add docstring for `scopes` to async REST transport. 

86 # TODO(https://github.com/googleapis/python-api-core/issues/717): Add docstring for `quota_project_id` to async REST transport. 

87 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add docstring for `client_cert_source` to async REST transport. 

88 ) -> None: 

89 """Instantiate the transport. 

90 

91 Args: 

92 host (Optional[str]): 

93 The hostname to connect to. 

94 credentials (Optional[google.auth.aio.credentials.Credentials]): The 

95 authorization credentials to attach to requests. These 

96 credentials identify the application to the service; if none 

97 are specified, the client will attempt to ascertain the 

98 credentials from the environment. 

99 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

100 The client info used to send a user-agent string along with 

101 API requests. If ``None``, then default info will be used. 

102 Generally, you only need to set this if you're developing 

103 your own client library. 

104 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

105 be used for service account credentials. 

106 url_scheme: the protocol scheme for the API endpoint. Normally 

107 "https", but for testing or local servers, 

108 "http" can be specified. 

109 http_options: a dictionary of http_options for transcoding, to override 

110 the defaults from operations.proto. Each method has an entry 

111 with the corresponding http rules as value. 

112 path_prefix: path prefix (usually represents API version). Set to 

113 "v1" by default. 

114 

115 """ 

116 unsupported_params = { 

117 # TODO(https://github.com/googleapis/python-api-core/issues/715): Add support for `credentials_file` to async REST transport. 

118 "google.api_core.client_options.ClientOptions.credentials_file": credentials_file, 

119 # TODO(https://github.com/googleapis/python-api-core/issues/716): Add support for `scopes` to async REST transport. 

120 "google.api_core.client_options.ClientOptions.scopes": scopes, 

121 # TODO(https://github.com/googleapis/python-api-core/issues/717): Add support for `quota_project_id` to async REST transport. 

122 "google.api_core.client_options.ClientOptions.quota_project_id": quota_project_id, 

123 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport. 

124 "google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls, 

125 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport. 

126 "google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls, 

127 } 

128 provided_unsupported_params = [ 

129 name for name, value in unsupported_params.items() if value is not None 

130 ] 

131 if provided_unsupported_params: 

132 raise core_exceptions.AsyncRestUnsupportedParameterError( 

133 f"The following provided parameters are not supported for `transport=rest_asyncio`: {', '.join(provided_unsupported_params)}" 

134 ) 

135 

136 super().__init__( 

137 host=host, 

138 # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved. 

139 credentials=credentials, # type: ignore 

140 client_info=client_info, 

141 # TODO(https://github.com/googleapis/python-api-core/issues/725): Set always_use_jwt_access token when supported. 

142 always_use_jwt_access=False, 

143 ) 

144 # TODO(https://github.com/googleapis/python-api-core/issues/708): add support for 

145 # `default_host` in AsyncAuthorizedSession for feature parity with the synchronous 

146 # code. 

147 # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved. 

148 self._session = AsyncAuthorizedSession(self._credentials) # type: ignore 

149 # TODO(https://github.com/googleapis/python-api-core/issues/720): Add wrap logic directly to the property methods for callables. 

150 self._prep_wrapped_messages(client_info) 

151 self._http_options = http_options or {} 

152 self._path_prefix = path_prefix 

153 

154 def _prep_wrapped_messages(self, client_info): 

155 # Precompute the wrapped methods. 

156 self._wrapped_methods = { 

157 self.list_operations: gapic_v1.method_async.wrap_method( 

158 self.list_operations, 

159 default_retry=retries_async.AsyncRetry( 

160 initial=0.5, 

161 maximum=10.0, 

162 multiplier=2.0, 

163 predicate=retries_async.if_exception_type( 

164 core_exceptions.ServiceUnavailable, 

165 ), 

166 deadline=10.0, 

167 ), 

168 default_timeout=10.0, 

169 client_info=client_info, 

170 kind="rest_asyncio", 

171 ), 

172 self.get_operation: gapic_v1.method_async.wrap_method( 

173 self.get_operation, 

174 default_retry=retries_async.AsyncRetry( 

175 initial=0.5, 

176 maximum=10.0, 

177 multiplier=2.0, 

178 predicate=retries_async.if_exception_type( 

179 core_exceptions.ServiceUnavailable, 

180 ), 

181 deadline=10.0, 

182 ), 

183 default_timeout=10.0, 

184 client_info=client_info, 

185 kind="rest_asyncio", 

186 ), 

187 self.delete_operation: gapic_v1.method_async.wrap_method( 

188 self.delete_operation, 

189 default_retry=retries_async.AsyncRetry( 

190 initial=0.5, 

191 maximum=10.0, 

192 multiplier=2.0, 

193 predicate=retries_async.if_exception_type( 

194 core_exceptions.ServiceUnavailable, 

195 ), 

196 deadline=10.0, 

197 ), 

198 default_timeout=10.0, 

199 client_info=client_info, 

200 kind="rest_asyncio", 

201 ), 

202 self.cancel_operation: gapic_v1.method_async.wrap_method( 

203 self.cancel_operation, 

204 default_retry=retries_async.AsyncRetry( 

205 initial=0.5, 

206 maximum=10.0, 

207 multiplier=2.0, 

208 predicate=retries_async.if_exception_type( 

209 core_exceptions.ServiceUnavailable, 

210 ), 

211 deadline=10.0, 

212 ), 

213 default_timeout=10.0, 

214 client_info=client_info, 

215 kind="rest_asyncio", 

216 ), 

217 } 

218 

219 async def _list_operations( 

220 self, 

221 request: operations_pb2.ListOperationsRequest, 

222 *, 

223 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry` 

224 # to allow configuring retryable error codes. 

225 retry=gapic_v1.method_async.DEFAULT, 

226 timeout: Optional[float] = None, 

227 metadata: Sequence[Tuple[str, str]] = (), 

228 ) -> operations_pb2.ListOperationsResponse: 

229 r"""Asynchronously call the list operations method over HTTP. 

230 

231 Args: 

232 request (~.operations_pb2.ListOperationsRequest): 

233 The request object. The request message for 

234 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

235 timeout (float): The timeout for this request. 

236 metadata (Sequence[Tuple[str, str]]): Strings which should be 

237 sent along with the request as metadata. 

238 

239 Returns: 

240 ~.operations_pb2.ListOperationsResponse: 

241 The response message for 

242 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

243 

244 """ 

245 

246 http_options = [ 

247 { 

248 "method": "get", 

249 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix), 

250 }, 

251 ] 

252 if "google.longrunning.Operations.ListOperations" in self._http_options: 

253 http_options = self._http_options[ 

254 "google.longrunning.Operations.ListOperations" 

255 ] 

256 

257 request_kwargs = self._convert_protobuf_message_to_dict(request) 

258 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

259 

260 uri = transcoded_request["uri"] 

261 method = transcoded_request["method"] 

262 

263 # Jsonify the query params 

264 query_params_request = operations_pb2.ListOperationsRequest() 

265 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

266 query_params = json_format.MessageToDict( 

267 query_params_request, 

268 preserving_proto_field_name=False, 

269 use_integers_for_enums=False, 

270 ) 

271 

272 # Send the request 

273 headers = dict(metadata) 

274 headers["Content-Type"] = "application/json" 

275 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

276 response = await getattr(self._session, method)( 

277 "{host}{uri}".format(host=self._host, uri=uri), 

278 timeout=timeout, 

279 headers=headers, 

280 params=rest_helpers.flatten_query_params(query_params), 

281 ) 

282 content = await response.read() 

283 

284 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

285 # subclass. 

286 if response.status_code >= 400: 

287 payload = json.loads(content.decode("utf-8")) 

288 request_url = "{host}{uri}".format(host=self._host, uri=uri) 

289 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore 

290 

291 # Return the response 

292 api_response = operations_pb2.ListOperationsResponse() 

293 json_format.Parse(content, api_response, ignore_unknown_fields=False) 

294 return api_response 

295 

296 async def _get_operation( 

297 self, 

298 request: operations_pb2.GetOperationRequest, 

299 *, 

300 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry` 

301 # to allow configuring retryable error codes. 

302 retry=gapic_v1.method_async.DEFAULT, 

303 timeout: Optional[float] = None, 

304 metadata: Sequence[Tuple[str, str]] = (), 

305 ) -> operations_pb2.Operation: 

306 r"""Asynchronously call the get operation method over HTTP. 

307 

308 Args: 

309 request (~.operations_pb2.GetOperationRequest): 

310 The request object. The request message for 

311 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]. 

312 timeout (float): The timeout for this request. 

313 metadata (Sequence[Tuple[str, str]]): Strings which should be 

314 sent along with the request as metadata. 

315 

316 Returns: 

317 ~.operations_pb2.Operation: 

318 This resource represents a long- 

319 running operation that is the result of a 

320 network API call. 

321 

322 """ 

323 

324 http_options = [ 

325 { 

326 "method": "get", 

327 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

328 }, 

329 ] 

330 if "google.longrunning.Operations.GetOperation" in self._http_options: 

331 http_options = self._http_options[ 

332 "google.longrunning.Operations.GetOperation" 

333 ] 

334 

335 request_kwargs = self._convert_protobuf_message_to_dict(request) 

336 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

337 

338 uri = transcoded_request["uri"] 

339 method = transcoded_request["method"] 

340 

341 # Jsonify the query params 

342 query_params_request = operations_pb2.GetOperationRequest() 

343 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

344 query_params = json_format.MessageToDict( 

345 query_params_request, 

346 preserving_proto_field_name=False, 

347 use_integers_for_enums=False, 

348 ) 

349 

350 # Send the request 

351 headers = dict(metadata) 

352 headers["Content-Type"] = "application/json" 

353 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

354 response = await getattr(self._session, method)( 

355 "{host}{uri}".format(host=self._host, uri=uri), 

356 timeout=timeout, 

357 headers=headers, 

358 params=rest_helpers.flatten_query_params(query_params), 

359 ) 

360 content = await response.read() 

361 

362 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

363 # subclass. 

364 if response.status_code >= 400: 

365 payload = json.loads(content.decode("utf-8")) 

366 request_url = "{host}{uri}".format(host=self._host, uri=uri) 

367 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore 

368 

369 # Return the response 

370 api_response = operations_pb2.Operation() 

371 json_format.Parse(content, api_response, ignore_unknown_fields=False) 

372 return api_response 

373 

374 async def _delete_operation( 

375 self, 

376 request: operations_pb2.DeleteOperationRequest, 

377 *, 

378 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry` 

379 # to allow configuring retryable error codes. 

380 retry=gapic_v1.method_async.DEFAULT, 

381 timeout: Optional[float] = None, 

382 metadata: Sequence[Tuple[str, str]] = (), 

383 ) -> empty_pb2.Empty: 

384 r"""Asynchronously call the delete operation method over HTTP. 

385 

386 Args: 

387 request (~.operations_pb2.DeleteOperationRequest): 

388 The request object. The request message for 

389 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation]. 

390 

391 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

392 should be retried. 

393 timeout (float): The timeout for this request. 

394 metadata (Sequence[Tuple[str, str]]): Strings which should be 

395 sent along with the request as metadata. 

396 """ 

397 

398 http_options = [ 

399 { 

400 "method": "delete", 

401 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

402 }, 

403 ] 

404 if "google.longrunning.Operations.DeleteOperation" in self._http_options: 

405 http_options = self._http_options[ 

406 "google.longrunning.Operations.DeleteOperation" 

407 ] 

408 

409 request_kwargs = self._convert_protobuf_message_to_dict(request) 

410 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

411 

412 uri = transcoded_request["uri"] 

413 method = transcoded_request["method"] 

414 

415 # Jsonify the query params 

416 query_params_request = operations_pb2.DeleteOperationRequest() 

417 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

418 query_params = json_format.MessageToDict( 

419 query_params_request, 

420 preserving_proto_field_name=False, 

421 use_integers_for_enums=False, 

422 ) 

423 

424 # Send the request 

425 headers = dict(metadata) 

426 headers["Content-Type"] = "application/json" 

427 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

428 response = await getattr(self._session, method)( 

429 "{host}{uri}".format(host=self._host, uri=uri), 

430 timeout=timeout, 

431 headers=headers, 

432 params=rest_helpers.flatten_query_params(query_params), 

433 ) 

434 

435 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

436 # subclass. 

437 if response.status_code >= 400: 

438 content = await response.read() 

439 payload = json.loads(content.decode("utf-8")) 

440 request_url = "{host}{uri}".format(host=self._host, uri=uri) 

441 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore 

442 

443 return empty_pb2.Empty() 

444 

445 async def _cancel_operation( 

446 self, 

447 request: operations_pb2.CancelOperationRequest, 

448 *, 

449 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry` 

450 # to allow configuring retryable error codes. 

451 retry=gapic_v1.method_async.DEFAULT, 

452 timeout: Optional[float] = None, 

453 metadata: Sequence[Tuple[str, str]] = (), 

454 # TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter 

455 # to allow configuring retryable error codes. 

456 ) -> empty_pb2.Empty: 

457 r"""Asynchronously call the cancel operation method over HTTP. 

458 

459 Args: 

460 request (~.operations_pb2.CancelOperationRequest): 

461 The request object. The request message for 

462 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation]. 

463 timeout (float): The timeout for this request. 

464 metadata (Sequence[Tuple[str, str]]): Strings which should be 

465 sent along with the request as metadata. 

466 """ 

467 

468 http_options = [ 

469 { 

470 "method": "post", 

471 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix), 

472 "body": "*", 

473 }, 

474 ] 

475 if "google.longrunning.Operations.CancelOperation" in self._http_options: 

476 http_options = self._http_options[ 

477 "google.longrunning.Operations.CancelOperation" 

478 ] 

479 

480 request_kwargs = self._convert_protobuf_message_to_dict(request) 

481 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

482 

483 # Jsonify the request body 

484 body_request = operations_pb2.CancelOperationRequest() 

485 json_format.ParseDict(transcoded_request["body"], body_request) 

486 body = json_format.MessageToDict( 

487 body_request, 

488 preserving_proto_field_name=False, 

489 use_integers_for_enums=False, 

490 ) 

491 uri = transcoded_request["uri"] 

492 method = transcoded_request["method"] 

493 

494 # Jsonify the query params 

495 query_params_request = operations_pb2.CancelOperationRequest() 

496 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

497 query_params = json_format.MessageToDict( 

498 query_params_request, 

499 preserving_proto_field_name=False, 

500 use_integers_for_enums=False, 

501 ) 

502 

503 # Send the request 

504 headers = dict(metadata) 

505 headers["Content-Type"] = "application/json" 

506 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

507 response = await getattr(self._session, method)( 

508 "{host}{uri}".format(host=self._host, uri=uri), 

509 timeout=timeout, 

510 headers=headers, 

511 params=rest_helpers.flatten_query_params(query_params), 

512 data=body, 

513 ) 

514 

515 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

516 # subclass. 

517 if response.status_code >= 400: 

518 content = await response.read() 

519 payload = json.loads(content.decode("utf-8")) 

520 request_url = "{host}{uri}".format(host=self._host, uri=uri) 

521 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore 

522 

523 return empty_pb2.Empty() 

524 

525 @property 

526 def list_operations( 

527 self, 

528 ) -> Callable[ 

529 [operations_pb2.ListOperationsRequest], 

530 Coroutine[Any, Any, operations_pb2.ListOperationsResponse], 

531 ]: 

532 return self._list_operations 

533 

534 @property 

535 def get_operation( 

536 self, 

537 ) -> Callable[ 

538 [operations_pb2.GetOperationRequest], 

539 Coroutine[Any, Any, operations_pb2.Operation], 

540 ]: 

541 return self._get_operation 

542 

543 @property 

544 def delete_operation( 

545 self, 

546 ) -> Callable[ 

547 [operations_pb2.DeleteOperationRequest], Coroutine[Any, Any, empty_pb2.Empty] 

548 ]: 

549 return self._delete_operation 

550 

551 @property 

552 def cancel_operation( 

553 self, 

554 ) -> Callable[ 

555 [operations_pb2.CancelOperationRequest], Coroutine[Any, Any, empty_pb2.Empty] 

556 ]: 

557 return self._cancel_operation 

558 

559 

560__all__ = ("AsyncOperationsRestTransport",)