Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/operations_v1/transports/rest_asyncio.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

134 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16 

17import json 

18from typing import Any, Callable, Coroutine, Dict, Optional, Sequence, Tuple 

19import warnings 

20 

21from google.auth import __version__ as auth_version 

22 

23try: 

24 from google.auth.aio.transport.sessions import AsyncAuthorizedSession # type: ignore 

25except ImportError as e: # pragma: NO COVER 

26 raise ImportError( 

27 "The `async_rest` extra of `google-api-core` is required to use long-running operations. Install it by running " 

28 "`pip install google-api-core[async_rest]`." 

29 ) from e 

30 

31from google.api_core import exceptions as core_exceptions # type: ignore 

32from google.api_core import gapic_v1 # type: ignore 

33from google.api_core import general_helpers 

34from google.api_core import path_template # type: ignore 

35from google.api_core import rest_helpers # type: ignore 

36from google.api_core import retry_async as retries_async # type: ignore 

37from google.auth.aio import credentials as ga_credentials_async # type: ignore 

38from google.longrunning import operations_pb2 # type: ignore 

39from google.protobuf import empty_pb2 # type: ignore 

40from google.protobuf import json_format # type: ignore 

41 

42from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport 

43 

44DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

45 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

46 grpc_version=None, 

47 rest_version=f"google-auth@{auth_version}", 

48) 

49 

50 

51class AsyncOperationsRestTransport(OperationsTransport): 

52 """Asynchronous REST backend transport for Operations. 

53 

54 Manages async long-running operations with an API service. 

55 

56 When an API method normally takes long time to complete, it can be 

57 designed to return [Operation][google.api_core.operations_v1.Operation] to the 

58 client, and the client can use this interface to receive the real 

59 response asynchronously by polling the operation resource, or pass 

60 the operation resource to another API (such as Google Cloud Pub/Sub 

61 API) to receive the response. Any API service that returns 

62 long-running operations should implement the ``Operations`` 

63 interface so developers can have a consistent client experience. 

64 

65 This class defines the same methods as the primary client, so the 

66 primary client can load the underlying transport implementation 

67 and call it. 

68 

69 It sends JSON representations of protocol buffers over HTTP/1.1 

70 """ 

71 

72 def __init__( 

73 self, 

74 *, 

75 host: str = "longrunning.googleapis.com", 

76 credentials: Optional[ga_credentials_async.Credentials] = None, 

77 credentials_file: Optional[str] = None, 

78 scopes: Optional[Sequence[str]] = None, 

79 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

80 quota_project_id: Optional[str] = None, 

81 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

82 always_use_jwt_access: Optional[bool] = False, 

83 url_scheme: str = "https", 

84 http_options: Optional[Dict] = None, 

85 path_prefix: str = "v1", 

86 # TODO(https://github.com/googleapis/python-api-core/issues/715): Add docstring for `credentials_file` to async REST transport. 

87 # TODO(https://github.com/googleapis/python-api-core/issues/716): Add docstring for `scopes` to async REST transport. 

88 # TODO(https://github.com/googleapis/python-api-core/issues/717): Add docstring for `quota_project_id` to async REST transport. 

89 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add docstring for `client_cert_source` to async REST transport. 

90 ) -> None: 

91 """Instantiate the transport. 

92 

93 Args: 

94 host (Optional[str]): 

95 The hostname to connect to. 

96 credentials (Optional[google.auth.aio.credentials.Credentials]): The 

97 authorization credentials to attach to requests. These 

98 credentials identify the application to the service; if none 

99 are specified, the client will attempt to ascertain the 

100 credentials from the environment. 

101 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

102 be loaded with :func:`google.auth.load_credentials_from_file`. 

103 This argument is ignored if ``channel`` is provided. This argument will be 

104 removed in the next major version of `google-api-core`. 

105 

106 .. warning:: 

107 Important: If you accept a credential configuration (credential JSON/File/Stream) 

108 from an external source for authentication to Google Cloud Platform, you must 

109 validate it before providing it to any Google API or client library. Providing an 

110 unvalidated credential configuration to Google APIs or libraries can compromise 

111 the security of your systems and data. For more information, refer to 

112 `Validate credential configurations from external sources`_. 

113 

114 .. _Validate credential configurations from external sources: 

115 

116 https://cloud.google.com/docs/authentication/external/externally-sourced-credentials 

117 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

118 The client info used to send a user-agent string along with 

119 API requests. If ``None``, then default info will be used. 

120 Generally, you only need to set this if you're developing 

121 your own client library. 

122 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

123 be used for service account credentials. 

124 url_scheme: the protocol scheme for the API endpoint. Normally 

125 "https", but for testing or local servers, 

126 "http" can be specified. 

127 http_options: a dictionary of http_options for transcoding, to override 

128 the defaults from operations.proto. Each method has an entry 

129 with the corresponding http rules as value. 

130 path_prefix: path prefix (usually represents API version). Set to 

131 "v1" by default. 

132 

133 """ 

134 if credentials_file is not None: 

135 warnings.warn(general_helpers._CREDENTIALS_FILE_WARNING, DeprecationWarning) 

136 

137 unsupported_params = { 

138 # TODO(https://github.com/googleapis/python-api-core/issues/715): Add support for `credentials_file` to async REST transport. 

139 "google.api_core.client_options.ClientOptions.credentials_file": credentials_file, 

140 # TODO(https://github.com/googleapis/python-api-core/issues/716): Add support for `scopes` to async REST transport. 

141 "google.api_core.client_options.ClientOptions.scopes": scopes, 

142 # TODO(https://github.com/googleapis/python-api-core/issues/717): Add support for `quota_project_id` to async REST transport. 

143 "google.api_core.client_options.ClientOptions.quota_project_id": quota_project_id, 

144 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport. 

145 "google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls, 

146 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport. 

147 "google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls, 

148 } 

149 provided_unsupported_params = [ 

150 name for name, value in unsupported_params.items() if value is not None 

151 ] 

152 if provided_unsupported_params: 

153 raise core_exceptions.AsyncRestUnsupportedParameterError( 

154 f"The following provided parameters are not supported for `transport=rest_asyncio`: {', '.join(provided_unsupported_params)}" 

155 ) 

156 

157 super().__init__( 

158 host=host, 

159 # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved. 

160 credentials=credentials, # type: ignore 

161 client_info=client_info, 

162 # TODO(https://github.com/googleapis/python-api-core/issues/725): Set always_use_jwt_access token when supported. 

163 always_use_jwt_access=False, 

164 ) 

165 # TODO(https://github.com/googleapis/python-api-core/issues/708): add support for 

166 # `default_host` in AsyncAuthorizedSession for feature parity with the synchronous 

167 # code. 

168 # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved. 

169 self._session = AsyncAuthorizedSession(self._credentials) # type: ignore 

170 # TODO(https://github.com/googleapis/python-api-core/issues/720): Add wrap logic directly to the property methods for callables. 

171 self._prep_wrapped_messages(client_info) 

172 self._http_options = http_options or {} 

173 self._path_prefix = path_prefix 

174 

175 def _prep_wrapped_messages(self, client_info): 

176 # Precompute the wrapped methods. 

177 self._wrapped_methods = { 

178 self.list_operations: gapic_v1.method_async.wrap_method( 

179 self.list_operations, 

180 default_retry=retries_async.AsyncRetry( 

181 initial=0.5, 

182 maximum=10.0, 

183 multiplier=2.0, 

184 predicate=retries_async.if_exception_type( 

185 core_exceptions.ServiceUnavailable, 

186 ), 

187 deadline=10.0, 

188 ), 

189 default_timeout=10.0, 

190 client_info=client_info, 

191 kind="rest_asyncio", 

192 ), 

193 self.get_operation: gapic_v1.method_async.wrap_method( 

194 self.get_operation, 

195 default_retry=retries_async.AsyncRetry( 

196 initial=0.5, 

197 maximum=10.0, 

198 multiplier=2.0, 

199 predicate=retries_async.if_exception_type( 

200 core_exceptions.ServiceUnavailable, 

201 ), 

202 deadline=10.0, 

203 ), 

204 default_timeout=10.0, 

205 client_info=client_info, 

206 kind="rest_asyncio", 

207 ), 

208 self.delete_operation: gapic_v1.method_async.wrap_method( 

209 self.delete_operation, 

210 default_retry=retries_async.AsyncRetry( 

211 initial=0.5, 

212 maximum=10.0, 

213 multiplier=2.0, 

214 predicate=retries_async.if_exception_type( 

215 core_exceptions.ServiceUnavailable, 

216 ), 

217 deadline=10.0, 

218 ), 

219 default_timeout=10.0, 

220 client_info=client_info, 

221 kind="rest_asyncio", 

222 ), 

223 self.cancel_operation: gapic_v1.method_async.wrap_method( 

224 self.cancel_operation, 

225 default_retry=retries_async.AsyncRetry( 

226 initial=0.5, 

227 maximum=10.0, 

228 multiplier=2.0, 

229 predicate=retries_async.if_exception_type( 

230 core_exceptions.ServiceUnavailable, 

231 ), 

232 deadline=10.0, 

233 ), 

234 default_timeout=10.0, 

235 client_info=client_info, 

236 kind="rest_asyncio", 

237 ), 

238 } 

239 

240 async def _list_operations( 

241 self, 

242 request: operations_pb2.ListOperationsRequest, 

243 *, 

244 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry` 

245 # to allow configuring retryable error codes. 

246 retry=gapic_v1.method_async.DEFAULT, 

247 timeout: Optional[float] = None, 

248 metadata: Sequence[Tuple[str, str]] = (), 

249 ) -> operations_pb2.ListOperationsResponse: 

250 r"""Asynchronously call the list operations method over HTTP. 

251 

252 Args: 

253 request (~.operations_pb2.ListOperationsRequest): 

254 The request object. The request message for 

255 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

256 timeout (float): The timeout for this request. 

257 metadata (Sequence[Tuple[str, str]]): Strings which should be 

258 sent along with the request as metadata. 

259 

260 Returns: 

261 ~.operations_pb2.ListOperationsResponse: 

262 The response message for 

263 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

264 

265 """ 

266 

267 http_options = [ 

268 { 

269 "method": "get", 

270 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix), 

271 }, 

272 ] 

273 if "google.longrunning.Operations.ListOperations" in self._http_options: 

274 http_options = self._http_options[ 

275 "google.longrunning.Operations.ListOperations" 

276 ] 

277 

278 request_kwargs = self._convert_protobuf_message_to_dict(request) 

279 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

280 

281 uri = transcoded_request["uri"] 

282 method = transcoded_request["method"] 

283 

284 # Jsonify the query params 

285 query_params_request = operations_pb2.ListOperationsRequest() 

286 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

287 query_params = json_format.MessageToDict( 

288 query_params_request, 

289 preserving_proto_field_name=False, 

290 use_integers_for_enums=False, 

291 ) 

292 

293 # Send the request 

294 headers = dict(metadata) 

295 headers["Content-Type"] = "application/json" 

296 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

297 response = await getattr(self._session, method)( 

298 "{host}{uri}".format(host=self._host, uri=uri), 

299 timeout=timeout, 

300 headers=headers, 

301 params=rest_helpers.flatten_query_params(query_params), 

302 ) 

303 content = await response.read() 

304 

305 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

306 # subclass. 

307 if response.status_code >= 400: 

308 payload = json.loads(content.decode("utf-8")) 

309 request_url = "{host}{uri}".format(host=self._host, uri=uri) 

310 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore 

311 

312 # Return the response 

313 api_response = operations_pb2.ListOperationsResponse() 

314 json_format.Parse(content, api_response, ignore_unknown_fields=False) 

315 return api_response 

316 

317 async def _get_operation( 

318 self, 

319 request: operations_pb2.GetOperationRequest, 

320 *, 

321 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry` 

322 # to allow configuring retryable error codes. 

323 retry=gapic_v1.method_async.DEFAULT, 

324 timeout: Optional[float] = None, 

325 metadata: Sequence[Tuple[str, str]] = (), 

326 ) -> operations_pb2.Operation: 

327 r"""Asynchronously call the get operation method over HTTP. 

328 

329 Args: 

330 request (~.operations_pb2.GetOperationRequest): 

331 The request object. The request message for 

332 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]. 

333 timeout (float): The timeout for this request. 

334 metadata (Sequence[Tuple[str, str]]): Strings which should be 

335 sent along with the request as metadata. 

336 

337 Returns: 

338 ~.operations_pb2.Operation: 

339 This resource represents a long- 

340 running operation that is the result of a 

341 network API call. 

342 

343 """ 

344 

345 http_options = [ 

346 { 

347 "method": "get", 

348 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

349 }, 

350 ] 

351 if "google.longrunning.Operations.GetOperation" in self._http_options: 

352 http_options = self._http_options[ 

353 "google.longrunning.Operations.GetOperation" 

354 ] 

355 

356 request_kwargs = self._convert_protobuf_message_to_dict(request) 

357 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

358 

359 uri = transcoded_request["uri"] 

360 method = transcoded_request["method"] 

361 

362 # Jsonify the query params 

363 query_params_request = operations_pb2.GetOperationRequest() 

364 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

365 query_params = json_format.MessageToDict( 

366 query_params_request, 

367 preserving_proto_field_name=False, 

368 use_integers_for_enums=False, 

369 ) 

370 

371 # Send the request 

372 headers = dict(metadata) 

373 headers["Content-Type"] = "application/json" 

374 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

375 response = await getattr(self._session, method)( 

376 "{host}{uri}".format(host=self._host, uri=uri), 

377 timeout=timeout, 

378 headers=headers, 

379 params=rest_helpers.flatten_query_params(query_params), 

380 ) 

381 content = await response.read() 

382 

383 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

384 # subclass. 

385 if response.status_code >= 400: 

386 payload = json.loads(content.decode("utf-8")) 

387 request_url = "{host}{uri}".format(host=self._host, uri=uri) 

388 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore 

389 

390 # Return the response 

391 api_response = operations_pb2.Operation() 

392 json_format.Parse(content, api_response, ignore_unknown_fields=False) 

393 return api_response 

394 

395 async def _delete_operation( 

396 self, 

397 request: operations_pb2.DeleteOperationRequest, 

398 *, 

399 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry` 

400 # to allow configuring retryable error codes. 

401 retry=gapic_v1.method_async.DEFAULT, 

402 timeout: Optional[float] = None, 

403 metadata: Sequence[Tuple[str, str]] = (), 

404 ) -> empty_pb2.Empty: 

405 r"""Asynchronously call the delete operation method over HTTP. 

406 

407 Args: 

408 request (~.operations_pb2.DeleteOperationRequest): 

409 The request object. The request message for 

410 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation]. 

411 

412 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

413 should be retried. 

414 timeout (float): The timeout for this request. 

415 metadata (Sequence[Tuple[str, str]]): Strings which should be 

416 sent along with the request as metadata. 

417 """ 

418 

419 http_options = [ 

420 { 

421 "method": "delete", 

422 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

423 }, 

424 ] 

425 if "google.longrunning.Operations.DeleteOperation" in self._http_options: 

426 http_options = self._http_options[ 

427 "google.longrunning.Operations.DeleteOperation" 

428 ] 

429 

430 request_kwargs = self._convert_protobuf_message_to_dict(request) 

431 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

432 

433 uri = transcoded_request["uri"] 

434 method = transcoded_request["method"] 

435 

436 # Jsonify the query params 

437 query_params_request = operations_pb2.DeleteOperationRequest() 

438 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

439 query_params = json_format.MessageToDict( 

440 query_params_request, 

441 preserving_proto_field_name=False, 

442 use_integers_for_enums=False, 

443 ) 

444 

445 # Send the request 

446 headers = dict(metadata) 

447 headers["Content-Type"] = "application/json" 

448 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

449 response = await getattr(self._session, method)( 

450 "{host}{uri}".format(host=self._host, uri=uri), 

451 timeout=timeout, 

452 headers=headers, 

453 params=rest_helpers.flatten_query_params(query_params), 

454 ) 

455 

456 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

457 # subclass. 

458 if response.status_code >= 400: 

459 content = await response.read() 

460 payload = json.loads(content.decode("utf-8")) 

461 request_url = "{host}{uri}".format(host=self._host, uri=uri) 

462 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore 

463 

464 return empty_pb2.Empty() 

465 

466 async def _cancel_operation( 

467 self, 

468 request: operations_pb2.CancelOperationRequest, 

469 *, 

470 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry` 

471 # to allow configuring retryable error codes. 

472 retry=gapic_v1.method_async.DEFAULT, 

473 timeout: Optional[float] = None, 

474 metadata: Sequence[Tuple[str, str]] = (), 

475 # TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter 

476 # to allow configuring retryable error codes. 

477 ) -> empty_pb2.Empty: 

478 r"""Asynchronously call the cancel operation method over HTTP. 

479 

480 Args: 

481 request (~.operations_pb2.CancelOperationRequest): 

482 The request object. The request message for 

483 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation]. 

484 timeout (float): The timeout for this request. 

485 metadata (Sequence[Tuple[str, str]]): Strings which should be 

486 sent along with the request as metadata. 

487 """ 

488 

489 http_options = [ 

490 { 

491 "method": "post", 

492 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix), 

493 "body": "*", 

494 }, 

495 ] 

496 if "google.longrunning.Operations.CancelOperation" in self._http_options: 

497 http_options = self._http_options[ 

498 "google.longrunning.Operations.CancelOperation" 

499 ] 

500 

501 request_kwargs = self._convert_protobuf_message_to_dict(request) 

502 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

503 

504 # Jsonify the request body 

505 body_request = operations_pb2.CancelOperationRequest() 

506 json_format.ParseDict(transcoded_request["body"], body_request) 

507 body = json_format.MessageToDict( 

508 body_request, 

509 preserving_proto_field_name=False, 

510 use_integers_for_enums=False, 

511 ) 

512 uri = transcoded_request["uri"] 

513 method = transcoded_request["method"] 

514 

515 # Jsonify the query params 

516 query_params_request = operations_pb2.CancelOperationRequest() 

517 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

518 query_params = json_format.MessageToDict( 

519 query_params_request, 

520 preserving_proto_field_name=False, 

521 use_integers_for_enums=False, 

522 ) 

523 

524 # Send the request 

525 headers = dict(metadata) 

526 headers["Content-Type"] = "application/json" 

527 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

528 response = await getattr(self._session, method)( 

529 "{host}{uri}".format(host=self._host, uri=uri), 

530 timeout=timeout, 

531 headers=headers, 

532 params=rest_helpers.flatten_query_params(query_params), 

533 data=body, 

534 ) 

535 

536 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

537 # subclass. 

538 if response.status_code >= 400: 

539 content = await response.read() 

540 payload = json.loads(content.decode("utf-8")) 

541 request_url = "{host}{uri}".format(host=self._host, uri=uri) 

542 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore 

543 

544 return empty_pb2.Empty() 

545 

546 @property 

547 def list_operations( 

548 self, 

549 ) -> Callable[ 

550 [operations_pb2.ListOperationsRequest], 

551 Coroutine[Any, Any, operations_pb2.ListOperationsResponse], 

552 ]: 

553 return self._list_operations 

554 

555 @property 

556 def get_operation( 

557 self, 

558 ) -> Callable[ 

559 [operations_pb2.GetOperationRequest], 

560 Coroutine[Any, Any, operations_pb2.Operation], 

561 ]: 

562 return self._get_operation 

563 

564 @property 

565 def delete_operation( 

566 self, 

567 ) -> Callable[ 

568 [operations_pb2.DeleteOperationRequest], Coroutine[Any, Any, empty_pb2.Empty] 

569 ]: 

570 return self._delete_operation 

571 

572 @property 

573 def cancel_operation( 

574 self, 

575 ) -> Callable[ 

576 [operations_pb2.CancelOperationRequest], Coroutine[Any, Any, empty_pb2.Empty] 

577 ]: 

578 return self._cancel_operation 

579 

580 

581__all__ = ("AsyncOperationsRestTransport",)