Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/operations_v1/transports/rest.py: 26%

117 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2020 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16 

17import re 

18from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

19 

20from requests import __version__ as requests_version 

21 

22from google.api_core import exceptions as core_exceptions # type: ignore 

23from google.api_core import gapic_v1 # type: ignore 

24from google.api_core import path_template # type: ignore 

25from google.api_core import rest_helpers # type: ignore 

26from google.api_core import retry as retries # type: ignore 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.requests import AuthorizedSession # type: ignore 

29from google.longrunning import operations_pb2 # type: ignore 

30from google.protobuf import empty_pb2 # type: ignore 

31from google.protobuf import json_format # type: ignore 

32from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport 

33 

34OptionalRetry = Union[retries.Retry, object] 

35 

36DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

37 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

38 grpc_version=None, 

39 rest_version=requests_version, 

40) 

41 

42 

43class OperationsRestTransport(OperationsTransport): 

44 """REST backend transport for Operations. 

45 

46 Manages long-running operations with an API service. 

47 

48 When an API method normally takes long time to complete, it can be 

49 designed to return [Operation][google.api_core.operations_v1.Operation] to the 

50 client, and the client can use this interface to receive the real 

51 response asynchronously by polling the operation resource, or pass 

52 the operation resource to another API (such as Google Cloud Pub/Sub 

53 API) to receive the response. Any API service that returns 

54 long-running operations should implement the ``Operations`` 

55 interface so developers can have a consistent client experience. 

56 

57 This class defines the same methods as the primary client, so the 

58 primary client can load the underlying transport implementation 

59 and call it. 

60 

61 It sends JSON representations of protocol buffers over HTTP/1.1 

62 """ 

63 

64 def __init__( 

65 self, 

66 *, 

67 host: str = "longrunning.googleapis.com", 

68 credentials: ga_credentials.Credentials = None, 

69 credentials_file: Optional[str] = None, 

70 scopes: Optional[Sequence[str]] = None, 

71 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

72 quota_project_id: Optional[str] = None, 

73 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

74 always_use_jwt_access: Optional[bool] = False, 

75 url_scheme: str = "https", 

76 http_options: Optional[Dict] = None, 

77 path_prefix: str = "v1", 

78 ) -> None: 

79 """Instantiate the transport. 

80 

81 Args: 

82 host (Optional[str]): 

83 The hostname to connect to. 

84 credentials (Optional[google.auth.credentials.Credentials]): The 

85 authorization credentials to attach to requests. These 

86 credentials identify the application to the service; if none 

87 are specified, the client will attempt to ascertain the 

88 credentials from the environment. 

89 

90 credentials_file (Optional[str]): A file with credentials that can 

91 be loaded with :func:`google.auth.load_credentials_from_file`. 

92 This argument is ignored if ``channel`` is provided. 

93 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

94 ignored if ``channel`` is provided. 

95 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

96 certificate to configure mutual TLS HTTP channel. It is ignored 

97 if ``channel`` is provided. 

98 quota_project_id (Optional[str]): An optional project to use for billing 

99 and quota. 

100 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

101 The client info used to send a user-agent string along with 

102 API requests. If ``None``, then default info will be used. 

103 Generally, you only need to set this if you're developing 

104 your own client library. 

105 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

106 be used for service account credentials. 

107 url_scheme: the protocol scheme for the API endpoint. Normally 

108 "https", but for testing or local servers, 

109 "http" can be specified. 

110 http_options: a dictionary of http_options for transcoding, to override 

111 the defaults from operatons.proto. Each method has an entry 

112 with the corresponding http rules as value. 

113 path_prefix: path prefix (usually represents API version). Set to 

114 "v1" by default. 

115 

116 """ 

117 # Run the base constructor 

118 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

119 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

120 # credentials object 

121 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host) 

122 if maybe_url_match is None: 

123 raise ValueError( 

124 f"Unexpected hostname structure: {host}" 

125 ) # pragma: NO COVER 

126 

127 url_match_items = maybe_url_match.groupdict() 

128 

129 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host 

130 

131 super().__init__( 

132 host=host, 

133 credentials=credentials, 

134 client_info=client_info, 

135 always_use_jwt_access=always_use_jwt_access, 

136 ) 

137 self._session = AuthorizedSession( 

138 self._credentials, default_host=self.DEFAULT_HOST 

139 ) 

140 if client_cert_source_for_mtls: 

141 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

142 self._prep_wrapped_messages(client_info) 

143 self._http_options = http_options or {} 

144 self._path_prefix = path_prefix 

145 

146 def _list_operations( 

147 self, 

148 request: operations_pb2.ListOperationsRequest, 

149 *, 

150 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

151 timeout: Optional[float] = None, 

152 metadata: Sequence[Tuple[str, str]] = (), 

153 ) -> operations_pb2.ListOperationsResponse: 

154 r"""Call the list operations method over HTTP. 

155 

156 Args: 

157 request (~.operations_pb2.ListOperationsRequest): 

158 The request object. The request message for 

159 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

160 

161 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

162 should be retried. 

163 timeout (float): The timeout for this request. 

164 metadata (Sequence[Tuple[str, str]]): Strings which should be 

165 sent along with the request as metadata. 

166 

167 Returns: 

168 ~.operations_pb2.ListOperationsResponse: 

169 The response message for 

170 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 

171 

172 """ 

173 

174 http_options = [ 

175 { 

176 "method": "get", 

177 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix), 

178 }, 

179 ] 

180 if "google.longrunning.Operations.ListOperations" in self._http_options: 

181 http_options = self._http_options[ 

182 "google.longrunning.Operations.ListOperations" 

183 ] 

184 

185 request_kwargs = json_format.MessageToDict( 

186 request, 

187 preserving_proto_field_name=True, 

188 including_default_value_fields=True, 

189 ) 

190 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

191 

192 uri = transcoded_request["uri"] 

193 method = transcoded_request["method"] 

194 

195 # Jsonify the query params 

196 query_params_request = operations_pb2.ListOperationsRequest() 

197 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

198 query_params = json_format.MessageToDict( 

199 query_params_request, 

200 including_default_value_fields=False, 

201 preserving_proto_field_name=False, 

202 use_integers_for_enums=False, 

203 ) 

204 

205 # Send the request 

206 headers = dict(metadata) 

207 headers["Content-Type"] = "application/json" 

208 response = getattr(self._session, method)( 

209 "{host}{uri}".format(host=self._host, uri=uri), 

210 timeout=timeout, 

211 headers=headers, 

212 params=rest_helpers.flatten_query_params(query_params), 

213 ) 

214 

215 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

216 # subclass. 

217 if response.status_code >= 400: 

218 raise core_exceptions.from_http_response(response) 

219 

220 # Return the response 

221 api_response = operations_pb2.ListOperationsResponse() 

222 json_format.Parse(response.content, api_response, ignore_unknown_fields=False) 

223 return api_response 

224 

225 def _get_operation( 

226 self, 

227 request: operations_pb2.GetOperationRequest, 

228 *, 

229 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

230 timeout: Optional[float] = None, 

231 metadata: Sequence[Tuple[str, str]] = (), 

232 ) -> operations_pb2.Operation: 

233 r"""Call the get operation method over HTTP. 

234 

235 Args: 

236 request (~.operations_pb2.GetOperationRequest): 

237 The request object. The request message for 

238 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]. 

239 

240 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

241 should be retried. 

242 timeout (float): The timeout for this request. 

243 metadata (Sequence[Tuple[str, str]]): Strings which should be 

244 sent along with the request as metadata. 

245 

246 Returns: 

247 ~.operations_pb2.Operation: 

248 This resource represents a long- 

249 unning operation that is the result of a 

250 network API call. 

251 

252 """ 

253 

254 http_options = [ 

255 { 

256 "method": "get", 

257 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

258 }, 

259 ] 

260 if "google.longrunning.Operations.GetOperation" in self._http_options: 

261 http_options = self._http_options[ 

262 "google.longrunning.Operations.GetOperation" 

263 ] 

264 

265 request_kwargs = json_format.MessageToDict( 

266 request, 

267 preserving_proto_field_name=True, 

268 including_default_value_fields=True, 

269 ) 

270 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

271 

272 uri = transcoded_request["uri"] 

273 method = transcoded_request["method"] 

274 

275 # Jsonify the query params 

276 query_params_request = operations_pb2.GetOperationRequest() 

277 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

278 query_params = json_format.MessageToDict( 

279 query_params_request, 

280 including_default_value_fields=False, 

281 preserving_proto_field_name=False, 

282 use_integers_for_enums=False, 

283 ) 

284 

285 # Send the request 

286 headers = dict(metadata) 

287 headers["Content-Type"] = "application/json" 

288 response = getattr(self._session, method)( 

289 "{host}{uri}".format(host=self._host, uri=uri), 

290 timeout=timeout, 

291 headers=headers, 

292 params=rest_helpers.flatten_query_params(query_params), 

293 ) 

294 

295 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

296 # subclass. 

297 if response.status_code >= 400: 

298 raise core_exceptions.from_http_response(response) 

299 

300 # Return the response 

301 api_response = operations_pb2.Operation() 

302 json_format.Parse(response.content, api_response, ignore_unknown_fields=False) 

303 return api_response 

304 

305 def _delete_operation( 

306 self, 

307 request: operations_pb2.DeleteOperationRequest, 

308 *, 

309 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

310 timeout: Optional[float] = None, 

311 metadata: Sequence[Tuple[str, str]] = (), 

312 ) -> empty_pb2.Empty: 

313 r"""Call the delete operation method over HTTP. 

314 

315 Args: 

316 request (~.operations_pb2.DeleteOperationRequest): 

317 The request object. The request message for 

318 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation]. 

319 

320 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

321 should be retried. 

322 timeout (float): The timeout for this request. 

323 metadata (Sequence[Tuple[str, str]]): Strings which should be 

324 sent along with the request as metadata. 

325 """ 

326 

327 http_options = [ 

328 { 

329 "method": "delete", 

330 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), 

331 }, 

332 ] 

333 if "google.longrunning.Operations.DeleteOperation" in self._http_options: 

334 http_options = self._http_options[ 

335 "google.longrunning.Operations.DeleteOperation" 

336 ] 

337 

338 request_kwargs = json_format.MessageToDict( 

339 request, 

340 preserving_proto_field_name=True, 

341 including_default_value_fields=True, 

342 ) 

343 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

344 

345 uri = transcoded_request["uri"] 

346 method = transcoded_request["method"] 

347 

348 # Jsonify the query params 

349 query_params_request = operations_pb2.DeleteOperationRequest() 

350 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

351 query_params = json_format.MessageToDict( 

352 query_params_request, 

353 including_default_value_fields=False, 

354 preserving_proto_field_name=False, 

355 use_integers_for_enums=False, 

356 ) 

357 

358 # Send the request 

359 headers = dict(metadata) 

360 headers["Content-Type"] = "application/json" 

361 response = getattr(self._session, method)( 

362 "{host}{uri}".format(host=self._host, uri=uri), 

363 timeout=timeout, 

364 headers=headers, 

365 params=rest_helpers.flatten_query_params(query_params), 

366 ) 

367 

368 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

369 # subclass. 

370 if response.status_code >= 400: 

371 raise core_exceptions.from_http_response(response) 

372 

373 return empty_pb2.Empty() 

374 

375 def _cancel_operation( 

376 self, 

377 request: operations_pb2.CancelOperationRequest, 

378 *, 

379 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

380 timeout: Optional[float] = None, 

381 metadata: Sequence[Tuple[str, str]] = (), 

382 ) -> empty_pb2.Empty: 

383 r"""Call the cancel operation method over HTTP. 

384 

385 Args: 

386 request (~.operations_pb2.CancelOperationRequest): 

387 The request object. The request message for 

388 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation]. 

389 

390 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

391 should be retried. 

392 timeout (float): The timeout for this request. 

393 metadata (Sequence[Tuple[str, str]]): Strings which should be 

394 sent along with the request as metadata. 

395 """ 

396 

397 http_options = [ 

398 { 

399 "method": "post", 

400 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix), 

401 "body": "*", 

402 }, 

403 ] 

404 if "google.longrunning.Operations.CancelOperation" in self._http_options: 

405 http_options = self._http_options[ 

406 "google.longrunning.Operations.CancelOperation" 

407 ] 

408 

409 request_kwargs = json_format.MessageToDict( 

410 request, 

411 preserving_proto_field_name=True, 

412 including_default_value_fields=True, 

413 ) 

414 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

415 

416 # Jsonify the request body 

417 body_request = operations_pb2.CancelOperationRequest() 

418 json_format.ParseDict(transcoded_request["body"], body_request) 

419 body = json_format.MessageToDict( 

420 body_request, 

421 including_default_value_fields=False, 

422 preserving_proto_field_name=False, 

423 use_integers_for_enums=False, 

424 ) 

425 uri = transcoded_request["uri"] 

426 method = transcoded_request["method"] 

427 

428 # Jsonify the query params 

429 query_params_request = operations_pb2.CancelOperationRequest() 

430 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 

431 query_params = json_format.MessageToDict( 

432 query_params_request, 

433 including_default_value_fields=False, 

434 preserving_proto_field_name=False, 

435 use_integers_for_enums=False, 

436 ) 

437 

438 # Send the request 

439 headers = dict(metadata) 

440 headers["Content-Type"] = "application/json" 

441 response = getattr(self._session, method)( 

442 "{host}{uri}".format(host=self._host, uri=uri), 

443 timeout=timeout, 

444 headers=headers, 

445 params=rest_helpers.flatten_query_params(query_params), 

446 data=body, 

447 ) 

448 

449 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

450 # subclass. 

451 if response.status_code >= 400: 

452 raise core_exceptions.from_http_response(response) 

453 

454 return empty_pb2.Empty() 

455 

456 @property 

457 def list_operations( 

458 self, 

459 ) -> Callable[ 

460 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse 

461 ]: 

462 return self._list_operations 

463 

464 @property 

465 def get_operation( 

466 self, 

467 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]: 

468 return self._get_operation 

469 

470 @property 

471 def delete_operation( 

472 self, 

473 ) -> Callable[[operations_pb2.DeleteOperationRequest], empty_pb2.Empty]: 

474 return self._delete_operation 

475 

476 @property 

477 def cancel_operation( 

478 self, 

479 ) -> Callable[[operations_pb2.CancelOperationRequest], empty_pb2.Empty]: 

480 return self._cancel_operation 

481 

482 

483__all__ = ("OperationsRestTransport",)