Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/operations_v1/transports/rest.py: 27%

118 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2020 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16 

17import re 

18from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

19 

20from requests import __version__ as requests_version 

21 

22from google.api_core import exceptions as core_exceptions # type: ignore 

23from google.api_core import gapic_v1 # type: ignore 

24from google.api_core import path_template # type: ignore 

25from google.api_core import rest_helpers # type: ignore 

26from google.api_core import retry as retries # type: ignore 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.requests import AuthorizedSession # type: ignore 

29from google.longrunning import operations_pb2 # type: ignore 

30from google.protobuf import empty_pb2 # type: ignore 

31from google.protobuf import json_format # type: ignore 

32import grpc 

33from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport 

34 

35OptionalRetry = Union[retries.Retry, object] 

36 

37DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

38 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

39 grpc_version=None, 

40 rest_version=requests_version, 

41) 

42 

43 

44class OperationsRestTransport(OperationsTransport): 

45 """REST backend transport for Operations. 

46 

47 Manages long-running operations with an API service. 

48 

49 When an API method normally takes long time to complete, it can be 

50 designed to return [Operation][google.api_core.operations_v1.Operation] to the 

51 client, and the client can use this interface to receive the real 

52 response asynchronously by polling the operation resource, or pass 

53 the operation resource to another API (such as Google Cloud Pub/Sub 

54 API) to receive the response. Any API service that returns 

55 long-running operations should implement the ``Operations`` 

56 interface so developers can have a consistent client experience. 

57 

58 This class defines the same methods as the primary client, so the 

59 primary client can load the underlying transport implementation 

60 and call it. 

61 

62 It sends JSON representations of protocol buffers over HTTP/1.1 

63 """ 

64 

65 def __init__( 

66 self, 

67 *, 

68 host: str = "longrunning.googleapis.com", 

69 credentials: ga_credentials.Credentials = None, 

70 credentials_file: Optional[str] = None, 

71 scopes: Optional[Sequence[str]] = None, 

72 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

73 quota_project_id: Optional[str] = None, 

74 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

75 always_use_jwt_access: Optional[bool] = False, 

76 url_scheme: str = "https", 

77 http_options: Optional[Dict] = None, 

78 path_prefix: str = "v1", 

79 ) -> None: 

80 """Instantiate the transport. 

81 

82 Args: 

83 host (Optional[str]): 

84 The hostname to connect to. 

85 credentials (Optional[google.auth.credentials.Credentials]): The 

86 authorization credentials to attach to requests. These 

87 credentials identify the application to the service; if none 

88 are specified, the client will attempt to ascertain the 

89 credentials from the environment. 

90 

91 credentials_file (Optional[str]): A file with credentials that can 

92 be loaded with :func:`google.auth.load_credentials_from_file`. 

93 This argument is ignored if ``channel`` is provided. 

94 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

95 ignored if ``channel`` is provided. 

96 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

97 certificate to configure mutual TLS HTTP channel. It is ignored 

98 if ``channel`` is provided. 

99 quota_project_id (Optional[str]): An optional project to use for billing 

100 and quota. 

101 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

102 The client info used to send a user-agent string along with 

103 API requests. If ``None``, then default info will be used. 

104 Generally, you only need to set this if you're developing 

105 your own client library. 

106 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

107 be used for service account credentials. 

108 url_scheme: the protocol scheme for the API endpoint. Normally 

109 "https", but for testing or local servers, 

110 "http" can be specified. 

111 http_options: a dictionary of http_options for transcoding, to override 

112 the defaults from operations.proto. Each method has an entry 

113 with the corresponding http rules as value. 

114 path_prefix: path prefix (usually represents API version). Set to 

115 "v1" by default. 

116 

117 """ 

118 # Run the base constructor 

119 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

120 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

121 # credentials object 

122 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host) 

123 if maybe_url_match is None: 

124 raise ValueError( 

125 f"Unexpected hostname structure: {host}" 

126 ) # pragma: NO COVER 

127 

128 url_match_items = maybe_url_match.groupdict() 

129 

130 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host 

131 

132 super().__init__( 

133 host=host, 

134 credentials=credentials, 

135 client_info=client_info, 

136 always_use_jwt_access=always_use_jwt_access, 

137 ) 

138 self._session = AuthorizedSession( 

139 self._credentials, default_host=self.DEFAULT_HOST 

140 ) 

141 if client_cert_source_for_mtls: 

142 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

143 self._prep_wrapped_messages(client_info) 

144 self._http_options = http_options or {} 

145 self._path_prefix = path_prefix 

146 

147 def _list_operations( 

148 self, 

149 request: operations_pb2.ListOperationsRequest, 

150 *, 

151 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

152 timeout: Optional[float] = None, 

153 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, 

154 metadata: Sequence[Tuple[str, str]] = (), 

155 ) -> operations_pb2.ListOperationsResponse: 

156 r"""Call the list operations method over HTTP. 

157 

158 Args: 

159 request (~.operations_pb2.ListOperationsRequest): 

160 The request object. The request message for 

161 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

162 

163 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

164 should be retried. 

165 timeout (float): The timeout for this request. 

166 metadata (Sequence[Tuple[str, str]]): Strings which should be 

167 sent along with the request as metadata. 

168 

169 Returns: 

170 ~.operations_pb2.ListOperationsResponse: 

171 The response message for 

172 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

173 

174 """ 

175 

176 http_options = [ 

177 { 

178 "method": "get", 

179 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix), 

180 }, 

181 ] 

182 if "google.longrunning.Operations.ListOperations" in self._http_options: 

183 http_options = self._http_options[ 

184 "google.longrunning.Operations.ListOperations" 

185 ] 

186 

187 request_kwargs = json_format.MessageToDict( 

188 request, 

189 preserving_proto_field_name=True, 

190 including_default_value_fields=True, 

191 ) 

192 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

193 

194 uri = transcoded_request["uri"] 

195 method = transcoded_request["method"] 

196 

197 # Jsonify the query params 

198 query_params_request = operations_pb2.ListOperationsRequest() 

199 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

200 query_params = json_format.MessageToDict( 

201 query_params_request, 

202 including_default_value_fields=False, 

203 preserving_proto_field_name=False, 

204 use_integers_for_enums=False, 

205 ) 

206 

207 # Send the request 

208 headers = dict(metadata) 

209 headers["Content-Type"] = "application/json" 

210 response = getattr(self._session, method)( 

211 "{host}{uri}".format(host=self._host, uri=uri), 

212 timeout=timeout, 

213 headers=headers, 

214 params=rest_helpers.flatten_query_params(query_params), 

215 ) 

216 

217 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

218 # subclass. 

219 if response.status_code >= 400: 

220 raise core_exceptions.from_http_response(response) 

221 

222 # Return the response 

223 api_response = operations_pb2.ListOperationsResponse() 

224 json_format.Parse(response.content, api_response, ignore_unknown_fields=False) 

225 return api_response 

226 

227 def _get_operation( 

228 self, 

229 request: operations_pb2.GetOperationRequest, 

230 *, 

231 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

232 timeout: Optional[float] = None, 

233 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, 

234 metadata: Sequence[Tuple[str, str]] = (), 

235 ) -> operations_pb2.Operation: 

236 r"""Call the get operation method over HTTP. 

237 

238 Args: 

239 request (~.operations_pb2.GetOperationRequest): 

240 The request object. The request message for 

241 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]. 

242 

243 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

244 should be retried. 

245 timeout (float): The timeout for this request. 

246 metadata (Sequence[Tuple[str, str]]): Strings which should be 

247 sent along with the request as metadata. 

248 

249 Returns: 

250 ~.operations_pb2.Operation: 

251 This resource represents a long- 

252 running operation that is the result of a 

253 network API call. 

254 

255 """ 

256 

257 http_options = [ 

258 { 

259 "method": "get", 

260 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

261 }, 

262 ] 

263 if "google.longrunning.Operations.GetOperation" in self._http_options: 

264 http_options = self._http_options[ 

265 "google.longrunning.Operations.GetOperation" 

266 ] 

267 

268 request_kwargs = json_format.MessageToDict( 

269 request, 

270 preserving_proto_field_name=True, 

271 including_default_value_fields=True, 

272 ) 

273 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

274 

275 uri = transcoded_request["uri"] 

276 method = transcoded_request["method"] 

277 

278 # Jsonify the query params 

279 query_params_request = operations_pb2.GetOperationRequest() 

280 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

281 query_params = json_format.MessageToDict( 

282 query_params_request, 

283 including_default_value_fields=False, 

284 preserving_proto_field_name=False, 

285 use_integers_for_enums=False, 

286 ) 

287 

288 # Send the request 

289 headers = dict(metadata) 

290 headers["Content-Type"] = "application/json" 

291 response = getattr(self._session, method)( 

292 "{host}{uri}".format(host=self._host, uri=uri), 

293 timeout=timeout, 

294 headers=headers, 

295 params=rest_helpers.flatten_query_params(query_params), 

296 ) 

297 

298 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

299 # subclass. 

300 if response.status_code >= 400: 

301 raise core_exceptions.from_http_response(response) 

302 

303 # Return the response 

304 api_response = operations_pb2.Operation() 

305 json_format.Parse(response.content, api_response, ignore_unknown_fields=False) 

306 return api_response 

307 

308 def _delete_operation( 

309 self, 

310 request: operations_pb2.DeleteOperationRequest, 

311 *, 

312 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

313 timeout: Optional[float] = None, 

314 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, 

315 metadata: Sequence[Tuple[str, str]] = (), 

316 ) -> empty_pb2.Empty: 

317 r"""Call the delete operation method over HTTP. 

318 

319 Args: 

320 request (~.operations_pb2.DeleteOperationRequest): 

321 The request object. The request message for 

322 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation]. 

323 

324 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

325 should be retried. 

326 timeout (float): The timeout for this request. 

327 metadata (Sequence[Tuple[str, str]]): Strings which should be 

328 sent along with the request as metadata. 

329 """ 

330 

331 http_options = [ 

332 { 

333 "method": "delete", 

334 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

335 }, 

336 ] 

337 if "google.longrunning.Operations.DeleteOperation" in self._http_options: 

338 http_options = self._http_options[ 

339 "google.longrunning.Operations.DeleteOperation" 

340 ] 

341 

342 request_kwargs = json_format.MessageToDict( 

343 request, 

344 preserving_proto_field_name=True, 

345 including_default_value_fields=True, 

346 ) 

347 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

348 

349 uri = transcoded_request["uri"] 

350 method = transcoded_request["method"] 

351 

352 # Jsonify the query params 

353 query_params_request = operations_pb2.DeleteOperationRequest() 

354 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

355 query_params = json_format.MessageToDict( 

356 query_params_request, 

357 including_default_value_fields=False, 

358 preserving_proto_field_name=False, 

359 use_integers_for_enums=False, 

360 ) 

361 

362 # Send the request 

363 headers = dict(metadata) 

364 headers["Content-Type"] = "application/json" 

365 response = getattr(self._session, method)( 

366 "{host}{uri}".format(host=self._host, uri=uri), 

367 timeout=timeout, 

368 headers=headers, 

369 params=rest_helpers.flatten_query_params(query_params), 

370 ) 

371 

372 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

373 # subclass. 

374 if response.status_code >= 400: 

375 raise core_exceptions.from_http_response(response) 

376 

377 return empty_pb2.Empty() 

378 

379 def _cancel_operation( 

380 self, 

381 request: operations_pb2.CancelOperationRequest, 

382 *, 

383 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

384 timeout: Optional[float] = None, 

385 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, 

386 metadata: Sequence[Tuple[str, str]] = (), 

387 ) -> empty_pb2.Empty: 

388 r"""Call the cancel operation method over HTTP. 

389 

390 Args: 

391 request (~.operations_pb2.CancelOperationRequest): 

392 The request object. The request message for 

393 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation]. 

394 

395 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

396 should be retried. 

397 timeout (float): The timeout for this request. 

398 metadata (Sequence[Tuple[str, str]]): Strings which should be 

399 sent along with the request as metadata. 

400 """ 

401 

402 http_options = [ 

403 { 

404 "method": "post", 

405 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix), 

406 "body": "*", 

407 }, 

408 ] 

409 if "google.longrunning.Operations.CancelOperation" in self._http_options: 

410 http_options = self._http_options[ 

411 "google.longrunning.Operations.CancelOperation" 

412 ] 

413 

414 request_kwargs = json_format.MessageToDict( 

415 request, 

416 preserving_proto_field_name=True, 

417 including_default_value_fields=True, 

418 ) 

419 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

420 

421 # Jsonify the request body 

422 body_request = operations_pb2.CancelOperationRequest() 

423 json_format.ParseDict(transcoded_request["body"], body_request) 

424 body = json_format.MessageToDict( 

425 body_request, 

426 including_default_value_fields=False, 

427 preserving_proto_field_name=False, 

428 use_integers_for_enums=False, 

429 ) 

430 uri = transcoded_request["uri"] 

431 method = transcoded_request["method"] 

432 

433 # Jsonify the query params 

434 query_params_request = operations_pb2.CancelOperationRequest() 

435 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

436 query_params = json_format.MessageToDict( 

437 query_params_request, 

438 including_default_value_fields=False, 

439 preserving_proto_field_name=False, 

440 use_integers_for_enums=False, 

441 ) 

442 

443 # Send the request 

444 headers = dict(metadata) 

445 headers["Content-Type"] = "application/json" 

446 response = getattr(self._session, method)( 

447 "{host}{uri}".format(host=self._host, uri=uri), 

448 timeout=timeout, 

449 headers=headers, 

450 params=rest_helpers.flatten_query_params(query_params), 

451 data=body, 

452 ) 

453 

454 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

455 # subclass. 

456 if response.status_code >= 400: 

457 raise core_exceptions.from_http_response(response) 

458 

459 return empty_pb2.Empty() 

460 

461 @property 

462 def list_operations( 

463 self, 

464 ) -> Callable[ 

465 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse 

466 ]: 

467 return self._list_operations 

468 

469 @property 

470 def get_operation( 

471 self, 

472 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]: 

473 return self._get_operation 

474 

475 @property 

476 def delete_operation( 

477 self, 

478 ) -> Callable[[operations_pb2.DeleteOperationRequest], empty_pb2.Empty]: 

479 return self._delete_operation 

480 

481 @property 

482 def cancel_operation( 

483 self, 

484 ) -> Callable[[operations_pb2.CancelOperationRequest], empty_pb2.Empty]: 

485 return self._cancel_operation 

486 

487 

488__all__ = ("OperationsRestTransport",)