Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/operations_v1/transports/rest.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

121 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2020 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16 

17from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

18import warnings 

19 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.auth.transport.requests import AuthorizedSession # type: ignore 

22from google.longrunning import operations_pb2 # type: ignore 

23import google.protobuf 

24from google.protobuf import empty_pb2 # type: ignore 

25from google.protobuf import json_format # type: ignore 

26import grpc 

27from requests import __version__ as requests_version 

28 

29from google.api_core import exceptions as core_exceptions # type: ignore 

30from google.api_core import gapic_v1 # type: ignore 

31from google.api_core import general_helpers 

32from google.api_core import path_template # type: ignore 

33from google.api_core import rest_helpers # type: ignore 

34from google.api_core import retry as retries # type: ignore 

35 

36from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

37from .base import OperationsTransport 

38 

39PROTOBUF_VERSION = google.protobuf.__version__ 

40 

41OptionalRetry = Union[retries.Retry, object] 

42 

43DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

44 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

45 grpc_version=None, 

46 rest_version=f"requests@{requests_version}", 

47) 

48 

49 

50class OperationsRestTransport(OperationsTransport): 

51 """REST backend transport for Operations. 

52 

53 Manages long-running operations with an API service. 

54 

55 When an API method normally takes long time to complete, it can be 

56 designed to return [Operation][google.api_core.operations_v1.Operation] to the 

57 client, and the client can use this interface to receive the real 

58 response asynchronously by polling the operation resource, or pass 

59 the operation resource to another API (such as Google Cloud Pub/Sub 

60 API) to receive the response. Any API service that returns 

61 long-running operations should implement the ``Operations`` 

62 interface so developers can have a consistent client experience. 

63 

64 This class defines the same methods as the primary client, so the 

65 primary client can load the underlying transport implementation 

66 and call it. 

67 

68 It sends JSON representations of protocol buffers over HTTP/1.1 

69 """ 

70 

71 def __init__( 

72 self, 

73 *, 

74 host: str = "longrunning.googleapis.com", 

75 credentials: Optional[ga_credentials.Credentials] = None, 

76 credentials_file: Optional[str] = None, 

77 scopes: Optional[Sequence[str]] = None, 

78 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

79 quota_project_id: Optional[str] = None, 

80 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

81 always_use_jwt_access: Optional[bool] = False, 

82 url_scheme: str = "https", 

83 http_options: Optional[Dict] = None, 

84 path_prefix: str = "v1", 

85 ) -> None: 

86 """Instantiate the transport. 

87 

88 Args: 

89 host (Optional[str]): 

90 The hostname to connect to. 

91 credentials (Optional[google.auth.credentials.Credentials]): The 

92 authorization credentials to attach to requests. These 

93 credentials identify the application to the service; if none 

94 are specified, the client will attempt to ascertain the 

95 credentials from the environment. 

96 

97 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

98 be loaded with :func:`google.auth.load_credentials_from_file`. 

99 This argument is ignored if ``channel`` is provided. This argument will be 

100 removed in the next major version of `google-api-core`. 

101 

102 .. warning:: 

103 Important: If you accept a credential configuration (credential JSON/File/Stream) 

104 from an external source for authentication to Google Cloud Platform, you must 

105 validate it before providing it to any Google API or client library. Providing an 

106 unvalidated credential configuration to Google APIs or libraries can compromise 

107 the security of your systems and data. For more information, refer to 

108 `Validate credential configuration from external sources`_. 

109 

110 .. _Validate credential configuration from external sources: 

111 

112 https://cloud.google.com/docs/authentication/external/externally-sourced-credentials 

113 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

114 ignored if ``channel`` is provided. 

115 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

116 certificate to configure mutual TLS HTTP channel. It is ignored 

117 if ``channel`` is provided. 

118 quota_project_id (Optional[str]): An optional project to use for billing 

119 and quota. 

120 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

121 The client info used to send a user-agent string along with 

122 API requests. If ``None``, then default info will be used. 

123 Generally, you only need to set this if you're developing 

124 your own client library. 

125 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

126 be used for service account credentials. 

127 url_scheme: the protocol scheme for the API endpoint. Normally 

128 "https", but for testing or local servers, 

129 "http" can be specified. 

130 http_options: a dictionary of http_options for transcoding, to override 

131 the defaults from operations.proto. Each method has an entry 

132 with the corresponding http rules as value. 

133 path_prefix: path prefix (usually represents API version). Set to 

134 "v1" by default. 

135 

136 """ 

137 if credentials_file is not None: 

138 warnings.warn(general_helpers._CREDENTIALS_FILE_WARNING, DeprecationWarning) 

139 

140 # Run the base constructor 

141 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

142 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

143 # credentials object 

144 super().__init__( 

145 host=host, 

146 credentials=credentials, 

147 client_info=client_info, 

148 always_use_jwt_access=always_use_jwt_access, 

149 ) 

150 self._session = AuthorizedSession( 

151 self._credentials, default_host=self.DEFAULT_HOST 

152 ) 

153 if client_cert_source_for_mtls: 

154 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

155 # TODO(https://github.com/googleapis/python-api-core/issues/720): Add wrap logic directly to the property methods for callables. 

156 self._prep_wrapped_messages(client_info) 

157 self._http_options = http_options or {} 

158 self._path_prefix = path_prefix 

159 

160 def _list_operations( 

161 self, 

162 request: operations_pb2.ListOperationsRequest, 

163 *, 

164 # TODO(https://github.com/googleapis/python-api-core/issues/723): Leverage `retry` 

165 # to allow configuring retryable error codes. 

166 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

167 timeout: Optional[float] = None, 

168 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, 

169 metadata: Sequence[Tuple[str, str]] = (), 

170 ) -> operations_pb2.ListOperationsResponse: 

171 r"""Call the list operations method over HTTP. 

172 

173 Args: 

174 request (~.operations_pb2.ListOperationsRequest): 

175 The request object. The request message for 

176 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

177 

178 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

179 should be retried. 

180 timeout (float): The timeout for this request. 

181 metadata (Sequence[Tuple[str, str]]): Strings which should be 

182 sent along with the request as metadata. 

183 

184 Returns: 

185 ~.operations_pb2.ListOperationsResponse: 

186 The response message for 

187 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

188 

189 """ 

190 

191 http_options = [ 

192 { 

193 "method": "get", 

194 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix), 

195 }, 

196 ] 

197 if "google.longrunning.Operations.ListOperations" in self._http_options: 

198 http_options = self._http_options[ 

199 "google.longrunning.Operations.ListOperations" 

200 ] 

201 

202 request_kwargs = self._convert_protobuf_message_to_dict(request) 

203 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

204 

205 uri = transcoded_request["uri"] 

206 method = transcoded_request["method"] 

207 

208 # Jsonify the query params 

209 query_params_request = operations_pb2.ListOperationsRequest() 

210 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

211 query_params = json_format.MessageToDict( 

212 query_params_request, 

213 preserving_proto_field_name=False, 

214 use_integers_for_enums=False, 

215 ) 

216 

217 # Send the request 

218 headers = dict(metadata) 

219 headers["Content-Type"] = "application/json" 

220 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

221 response = getattr(self._session, method)( 

222 "{host}{uri}".format(host=self._host, uri=uri), 

223 timeout=timeout, 

224 headers=headers, 

225 params=rest_helpers.flatten_query_params(query_params), 

226 ) 

227 

228 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

229 # subclass. 

230 if response.status_code >= 400: 

231 raise core_exceptions.from_http_response(response) 

232 

233 # Return the response 

234 api_response = operations_pb2.ListOperationsResponse() 

235 json_format.Parse(response.content, api_response, ignore_unknown_fields=False) 

236 return api_response 

237 

238 def _get_operation( 

239 self, 

240 request: operations_pb2.GetOperationRequest, 

241 *, 

242 # TODO(https://github.com/googleapis/python-api-core/issues/723): Leverage `retry` 

243 # to allow configuring retryable error codes. 

244 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

245 timeout: Optional[float] = None, 

246 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, 

247 metadata: Sequence[Tuple[str, str]] = (), 

248 ) -> operations_pb2.Operation: 

249 r"""Call the get operation method over HTTP. 

250 

251 Args: 

252 request (~.operations_pb2.GetOperationRequest): 

253 The request object. The request message for 

254 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]. 

255 

256 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

257 should be retried. 

258 timeout (float): The timeout for this request. 

259 metadata (Sequence[Tuple[str, str]]): Strings which should be 

260 sent along with the request as metadata. 

261 

262 Returns: 

263 ~.operations_pb2.Operation: 

264 This resource represents a long- 

265 running operation that is the result of a 

266 network API call. 

267 

268 """ 

269 

270 http_options = [ 

271 { 

272 "method": "get", 

273 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

274 }, 

275 ] 

276 if "google.longrunning.Operations.GetOperation" in self._http_options: 

277 http_options = self._http_options[ 

278 "google.longrunning.Operations.GetOperation" 

279 ] 

280 

281 request_kwargs = self._convert_protobuf_message_to_dict(request) 

282 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

283 

284 uri = transcoded_request["uri"] 

285 method = transcoded_request["method"] 

286 

287 # Jsonify the query params 

288 query_params_request = operations_pb2.GetOperationRequest() 

289 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

290 query_params = json_format.MessageToDict( 

291 query_params_request, 

292 preserving_proto_field_name=False, 

293 use_integers_for_enums=False, 

294 ) 

295 

296 # Send the request 

297 headers = dict(metadata) 

298 headers["Content-Type"] = "application/json" 

299 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

300 response = getattr(self._session, method)( 

301 "{host}{uri}".format(host=self._host, uri=uri), 

302 timeout=timeout, 

303 headers=headers, 

304 params=rest_helpers.flatten_query_params(query_params), 

305 ) 

306 

307 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

308 # subclass. 

309 if response.status_code >= 400: 

310 raise core_exceptions.from_http_response(response) 

311 

312 # Return the response 

313 api_response = operations_pb2.Operation() 

314 json_format.Parse(response.content, api_response, ignore_unknown_fields=False) 

315 return api_response 

316 

317 def _delete_operation( 

318 self, 

319 request: operations_pb2.DeleteOperationRequest, 

320 *, 

321 # TODO(https://github.com/googleapis/python-api-core/issues/723): Leverage `retry` 

322 # to allow configuring retryable error codes. 

323 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

324 timeout: Optional[float] = None, 

325 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, 

326 metadata: Sequence[Tuple[str, str]] = (), 

327 ) -> empty_pb2.Empty: 

328 r"""Call the delete operation method over HTTP. 

329 

330 Args: 

331 request (~.operations_pb2.DeleteOperationRequest): 

332 The request object. The request message for 

333 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation]. 

334 

335 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

336 should be retried. 

337 timeout (float): The timeout for this request. 

338 metadata (Sequence[Tuple[str, str]]): Strings which should be 

339 sent along with the request as metadata. 

340 """ 

341 

342 http_options = [ 

343 { 

344 "method": "delete", 

345 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

346 }, 

347 ] 

348 if "google.longrunning.Operations.DeleteOperation" in self._http_options: 

349 http_options = self._http_options[ 

350 "google.longrunning.Operations.DeleteOperation" 

351 ] 

352 

353 request_kwargs = self._convert_protobuf_message_to_dict(request) 

354 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

355 

356 uri = transcoded_request["uri"] 

357 method = transcoded_request["method"] 

358 

359 # Jsonify the query params 

360 query_params_request = operations_pb2.DeleteOperationRequest() 

361 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

362 query_params = json_format.MessageToDict( 

363 query_params_request, 

364 preserving_proto_field_name=False, 

365 use_integers_for_enums=False, 

366 ) 

367 

368 # Send the request 

369 headers = dict(metadata) 

370 headers["Content-Type"] = "application/json" 

371 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

372 response = getattr(self._session, method)( 

373 "{host}{uri}".format(host=self._host, uri=uri), 

374 timeout=timeout, 

375 headers=headers, 

376 params=rest_helpers.flatten_query_params(query_params), 

377 ) 

378 

379 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

380 # subclass. 

381 if response.status_code >= 400: 

382 raise core_exceptions.from_http_response(response) 

383 

384 return empty_pb2.Empty() 

385 

386 def _cancel_operation( 

387 self, 

388 request: operations_pb2.CancelOperationRequest, 

389 *, 

390 # TODO(https://github.com/googleapis/python-api-core/issues/723): Leverage `retry` 

391 # to allow configuring retryable error codes. 

392 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

393 timeout: Optional[float] = None, 

394 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, 

395 metadata: Sequence[Tuple[str, str]] = (), 

396 ) -> empty_pb2.Empty: 

397 r"""Call the cancel operation method over HTTP. 

398 

399 Args: 

400 request (~.operations_pb2.CancelOperationRequest): 

401 The request object. The request message for 

402 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation]. 

403 

404 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

405 should be retried. 

406 timeout (float): The timeout for this request. 

407 metadata (Sequence[Tuple[str, str]]): Strings which should be 

408 sent along with the request as metadata. 

409 """ 

410 

411 http_options = [ 

412 { 

413 "method": "post", 

414 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix), 

415 "body": "*", 

416 }, 

417 ] 

418 if "google.longrunning.Operations.CancelOperation" in self._http_options: 

419 http_options = self._http_options[ 

420 "google.longrunning.Operations.CancelOperation" 

421 ] 

422 

423 request_kwargs = self._convert_protobuf_message_to_dict(request) 

424 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

425 

426 # Jsonify the request body 

427 body_request = operations_pb2.CancelOperationRequest() 

428 json_format.ParseDict(transcoded_request["body"], body_request) 

429 body = json_format.MessageToDict( 

430 body_request, 

431 preserving_proto_field_name=False, 

432 use_integers_for_enums=False, 

433 ) 

434 uri = transcoded_request["uri"] 

435 method = transcoded_request["method"] 

436 

437 # Jsonify the query params 

438 query_params_request = operations_pb2.CancelOperationRequest() 

439 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

440 query_params = json_format.MessageToDict( 

441 query_params_request, 

442 preserving_proto_field_name=False, 

443 use_integers_for_enums=False, 

444 ) 

445 

446 # Send the request 

447 headers = dict(metadata) 

448 headers["Content-Type"] = "application/json" 

449 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. 

450 response = getattr(self._session, method)( 

451 "{host}{uri}".format(host=self._host, uri=uri), 

452 timeout=timeout, 

453 headers=headers, 

454 params=rest_helpers.flatten_query_params(query_params), 

455 data=body, 

456 ) 

457 

458 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

459 # subclass. 

460 if response.status_code >= 400: 

461 raise core_exceptions.from_http_response(response) 

462 

463 return empty_pb2.Empty() 

464 

465 @property 

466 def list_operations( 

467 self, 

468 ) -> Callable[ 

469 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse 

470 ]: 

471 return self._list_operations 

472 

473 @property 

474 def get_operation( 

475 self, 

476 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]: 

477 return self._get_operation 

478 

479 @property 

480 def delete_operation( 

481 self, 

482 ) -> Callable[[operations_pb2.DeleteOperationRequest], empty_pb2.Empty]: 

483 return self._delete_operation 

484 

485 @property 

486 def cancel_operation( 

487 self, 

488 ) -> Callable[[operations_pb2.CancelOperationRequest], empty_pb2.Empty]: 

489 return self._cancel_operation 

490 

491 

492__all__ = ("OperationsRestTransport",)