Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/pubsub_v1/services/subscriber/transports/base.py: 62%

100 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.pubsub_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28 

29from google.iam.v1 import iam_policy_pb2 # type: ignore 

30from google.iam.v1 import policy_pb2 # type: ignore 

31from google.protobuf import empty_pb2 # type: ignore 

32from google.pubsub_v1.types import pubsub 

33 

34DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

35 client_library_version=package_version.__version__ 

36) 

37 

38 

39class SubscriberTransport(abc.ABC): 

40 """Abstract transport class for Subscriber.""" 

41 

42 AUTH_SCOPES = ( 

43 "https://www.googleapis.com/auth/cloud-platform", 

44 "https://www.googleapis.com/auth/pubsub", 

45 ) 

46 

47 DEFAULT_HOST: str = "pubsub.googleapis.com" 

48 

49 def __init__( 

50 self, 

51 *, 

52 host: str = DEFAULT_HOST, 

53 credentials: Optional[ga_credentials.Credentials] = None, 

54 credentials_file: Optional[str] = None, 

55 scopes: Optional[Sequence[str]] = None, 

56 quota_project_id: Optional[str] = None, 

57 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

58 always_use_jwt_access: Optional[bool] = False, 

59 api_audience: Optional[str] = None, 

60 **kwargs, 

61 ) -> None: 

62 """Instantiate the transport. 

63 

64 Args: 

65 host (Optional[str]): 

66 The hostname to connect to. 

67 credentials (Optional[google.auth.credentials.Credentials]): The 

68 authorization credentials to attach to requests. These 

69 credentials identify the application to the service; if none 

70 are specified, the client will attempt to ascertain the 

71 credentials from the environment. 

72 credentials_file (Optional[str]): A file with credentials that can 

73 be loaded with :func:`google.auth.load_credentials_from_file`. 

74 This argument is mutually exclusive with credentials. 

75 scopes (Optional[Sequence[str]]): A list of scopes. 

76 quota_project_id (Optional[str]): An optional project to use for billing 

77 and quota. 

78 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

79 The client info used to send a user-agent string along with 

80 API requests. If ``None``, then default info will be used. 

81 Generally, you only need to set this if you're developing 

82 your own client library. 

83 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

84 be used for service account credentials. 

85 """ 

86 

87 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

88 

89 # Save the scopes. 

90 self._scopes = scopes 

91 

92 # If no credentials are provided, then determine the appropriate 

93 # defaults. 

94 if credentials and credentials_file: 

95 raise core_exceptions.DuplicateCredentialArgs( 

96 "'credentials_file' and 'credentials' are mutually exclusive" 

97 ) 

98 

99 if credentials_file is not None: 

100 credentials, _ = google.auth.load_credentials_from_file( 

101 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

102 ) 

103 elif credentials is None: 

104 credentials, _ = google.auth.default( 

105 **scopes_kwargs, quota_project_id=quota_project_id 

106 ) 

107 # Don't apply audience if the credentials file passed from user. 

108 if hasattr(credentials, "with_gdch_audience"): 

109 credentials = credentials.with_gdch_audience( 

110 api_audience if api_audience else host 

111 ) 

112 

113 # If the credentials are service account credentials, then always try to use self signed JWT. 

114 if ( 

115 always_use_jwt_access 

116 and isinstance(credentials, service_account.Credentials) 

117 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

118 ): 

119 credentials = credentials.with_always_use_jwt_access(True) 

120 

121 # Save the credentials. 

122 self._credentials = credentials 

123 

124 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

125 if ":" not in host: 

126 host += ":443" 

127 self._host = host 

128 

129 def _prep_wrapped_messages(self, client_info): 

130 # Precompute the wrapped methods. 

131 self._wrapped_methods = { 

132 self.create_subscription: gapic_v1.method.wrap_method( 

133 self.create_subscription, 

134 default_retry=retries.Retry( 

135 initial=0.1, 

136 maximum=60.0, 

137 multiplier=1.3, 

138 predicate=retries.if_exception_type( 

139 core_exceptions.Aborted, 

140 core_exceptions.ServiceUnavailable, 

141 core_exceptions.Unknown, 

142 ), 

143 deadline=60.0, 

144 ), 

145 default_timeout=60.0, 

146 client_info=client_info, 

147 ), 

148 self.get_subscription: gapic_v1.method.wrap_method( 

149 self.get_subscription, 

150 default_retry=retries.Retry( 

151 initial=0.1, 

152 maximum=60.0, 

153 multiplier=1.3, 

154 predicate=retries.if_exception_type( 

155 core_exceptions.Aborted, 

156 core_exceptions.ServiceUnavailable, 

157 core_exceptions.Unknown, 

158 ), 

159 deadline=60.0, 

160 ), 

161 default_timeout=60.0, 

162 client_info=client_info, 

163 ), 

164 self.update_subscription: gapic_v1.method.wrap_method( 

165 self.update_subscription, 

166 default_retry=retries.Retry( 

167 initial=0.1, 

168 maximum=60.0, 

169 multiplier=1.3, 

170 predicate=retries.if_exception_type( 

171 core_exceptions.ServiceUnavailable, 

172 ), 

173 deadline=60.0, 

174 ), 

175 default_timeout=60.0, 

176 client_info=client_info, 

177 ), 

178 self.list_subscriptions: gapic_v1.method.wrap_method( 

179 self.list_subscriptions, 

180 default_retry=retries.Retry( 

181 initial=0.1, 

182 maximum=60.0, 

183 multiplier=1.3, 

184 predicate=retries.if_exception_type( 

185 core_exceptions.Aborted, 

186 core_exceptions.ServiceUnavailable, 

187 core_exceptions.Unknown, 

188 ), 

189 deadline=60.0, 

190 ), 

191 default_timeout=60.0, 

192 client_info=client_info, 

193 ), 

194 self.delete_subscription: gapic_v1.method.wrap_method( 

195 self.delete_subscription, 

196 default_retry=retries.Retry( 

197 initial=0.1, 

198 maximum=60.0, 

199 multiplier=1.3, 

200 predicate=retries.if_exception_type( 

201 core_exceptions.ServiceUnavailable, 

202 ), 

203 deadline=60.0, 

204 ), 

205 default_timeout=60.0, 

206 client_info=client_info, 

207 ), 

208 self.modify_ack_deadline: gapic_v1.method.wrap_method( 

209 self.modify_ack_deadline, 

210 default_retry=retries.Retry( 

211 initial=0.1, 

212 maximum=60.0, 

213 multiplier=1.3, 

214 predicate=retries.if_exception_type( 

215 core_exceptions.ServiceUnavailable, 

216 ), 

217 deadline=60.0, 

218 ), 

219 default_timeout=60.0, 

220 client_info=client_info, 

221 ), 

222 self.acknowledge: gapic_v1.method.wrap_method( 

223 self.acknowledge, 

224 default_retry=retries.Retry( 

225 initial=0.1, 

226 maximum=60.0, 

227 multiplier=1.3, 

228 predicate=retries.if_exception_type( 

229 core_exceptions.ServiceUnavailable, 

230 ), 

231 deadline=60.0, 

232 ), 

233 default_timeout=60.0, 

234 client_info=client_info, 

235 ), 

236 self.pull: gapic_v1.method.wrap_method( 

237 self.pull, 

238 default_retry=retries.Retry( 

239 initial=0.1, 

240 maximum=60.0, 

241 multiplier=1.3, 

242 predicate=retries.if_exception_type( 

243 core_exceptions.Aborted, 

244 core_exceptions.InternalServerError, 

245 core_exceptions.ServiceUnavailable, 

246 core_exceptions.Unknown, 

247 ), 

248 deadline=60.0, 

249 ), 

250 default_timeout=60.0, 

251 client_info=client_info, 

252 ), 

253 self.streaming_pull: gapic_v1.method.wrap_method( 

254 self.streaming_pull, 

255 default_retry=retries.Retry( 

256 initial=0.1, 

257 maximum=60.0, 

258 multiplier=1.3, 

259 predicate=retries.if_exception_type( 

260 core_exceptions.Aborted, 

261 core_exceptions.DeadlineExceeded, 

262 core_exceptions.InternalServerError, 

263 core_exceptions.ResourceExhausted, 

264 core_exceptions.ServiceUnavailable, 

265 ), 

266 deadline=900.0, 

267 ), 

268 default_timeout=900.0, 

269 client_info=client_info, 

270 ), 

271 self.modify_push_config: gapic_v1.method.wrap_method( 

272 self.modify_push_config, 

273 default_retry=retries.Retry( 

274 initial=0.1, 

275 maximum=60.0, 

276 multiplier=1.3, 

277 predicate=retries.if_exception_type( 

278 core_exceptions.ServiceUnavailable, 

279 ), 

280 deadline=60.0, 

281 ), 

282 default_timeout=60.0, 

283 client_info=client_info, 

284 ), 

285 self.get_snapshot: gapic_v1.method.wrap_method( 

286 self.get_snapshot, 

287 default_retry=retries.Retry( 

288 initial=0.1, 

289 maximum=60.0, 

290 multiplier=1.3, 

291 predicate=retries.if_exception_type( 

292 core_exceptions.Aborted, 

293 core_exceptions.ServiceUnavailable, 

294 core_exceptions.Unknown, 

295 ), 

296 deadline=60.0, 

297 ), 

298 default_timeout=60.0, 

299 client_info=client_info, 

300 ), 

301 self.list_snapshots: gapic_v1.method.wrap_method( 

302 self.list_snapshots, 

303 default_retry=retries.Retry( 

304 initial=0.1, 

305 maximum=60.0, 

306 multiplier=1.3, 

307 predicate=retries.if_exception_type( 

308 core_exceptions.Aborted, 

309 core_exceptions.ServiceUnavailable, 

310 core_exceptions.Unknown, 

311 ), 

312 deadline=60.0, 

313 ), 

314 default_timeout=60.0, 

315 client_info=client_info, 

316 ), 

317 self.create_snapshot: gapic_v1.method.wrap_method( 

318 self.create_snapshot, 

319 default_retry=retries.Retry( 

320 initial=0.1, 

321 maximum=60.0, 

322 multiplier=1.3, 

323 predicate=retries.if_exception_type( 

324 core_exceptions.ServiceUnavailable, 

325 ), 

326 deadline=60.0, 

327 ), 

328 default_timeout=60.0, 

329 client_info=client_info, 

330 ), 

331 self.update_snapshot: gapic_v1.method.wrap_method( 

332 self.update_snapshot, 

333 default_retry=retries.Retry( 

334 initial=0.1, 

335 maximum=60.0, 

336 multiplier=1.3, 

337 predicate=retries.if_exception_type( 

338 core_exceptions.ServiceUnavailable, 

339 ), 

340 deadline=60.0, 

341 ), 

342 default_timeout=60.0, 

343 client_info=client_info, 

344 ), 

345 self.delete_snapshot: gapic_v1.method.wrap_method( 

346 self.delete_snapshot, 

347 default_retry=retries.Retry( 

348 initial=0.1, 

349 maximum=60.0, 

350 multiplier=1.3, 

351 predicate=retries.if_exception_type( 

352 core_exceptions.ServiceUnavailable, 

353 ), 

354 deadline=60.0, 

355 ), 

356 default_timeout=60.0, 

357 client_info=client_info, 

358 ), 

359 self.seek: gapic_v1.method.wrap_method( 

360 self.seek, 

361 default_retry=retries.Retry( 

362 initial=0.1, 

363 maximum=60.0, 

364 multiplier=1.3, 

365 predicate=retries.if_exception_type( 

366 core_exceptions.Aborted, 

367 core_exceptions.ServiceUnavailable, 

368 core_exceptions.Unknown, 

369 ), 

370 deadline=60.0, 

371 ), 

372 default_timeout=60.0, 

373 client_info=client_info, 

374 ), 

375 } 

376 

377 def close(self): 

378 """Closes resources associated with the transport. 

379 

380 .. warning:: 

381 Only call this method if the transport is NOT shared 

382 with other clients - this may cause errors in other clients! 

383 """ 

384 raise NotImplementedError() 

385 

386 @property 

387 def create_subscription( 

388 self, 

389 ) -> Callable[ 

390 [pubsub.Subscription], 

391 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

392 ]: 

393 raise NotImplementedError() 

394 

395 @property 

396 def get_subscription( 

397 self, 

398 ) -> Callable[ 

399 [pubsub.GetSubscriptionRequest], 

400 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

401 ]: 

402 raise NotImplementedError() 

403 

404 @property 

405 def update_subscription( 

406 self, 

407 ) -> Callable[ 

408 [pubsub.UpdateSubscriptionRequest], 

409 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

410 ]: 

411 raise NotImplementedError() 

412 

413 @property 

414 def list_subscriptions( 

415 self, 

416 ) -> Callable[ 

417 [pubsub.ListSubscriptionsRequest], 

418 Union[ 

419 pubsub.ListSubscriptionsResponse, 

420 Awaitable[pubsub.ListSubscriptionsResponse], 

421 ], 

422 ]: 

423 raise NotImplementedError() 

424 

425 @property 

426 def delete_subscription( 

427 self, 

428 ) -> Callable[ 

429 [pubsub.DeleteSubscriptionRequest], 

430 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

431 ]: 

432 raise NotImplementedError() 

433 

434 @property 

435 def modify_ack_deadline( 

436 self, 

437 ) -> Callable[ 

438 [pubsub.ModifyAckDeadlineRequest], 

439 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

440 ]: 

441 raise NotImplementedError() 

442 

443 @property 

444 def acknowledge( 

445 self, 

446 ) -> Callable[ 

447 [pubsub.AcknowledgeRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]] 

448 ]: 

449 raise NotImplementedError() 

450 

451 @property 

452 def pull( 

453 self, 

454 ) -> Callable[ 

455 [pubsub.PullRequest], Union[pubsub.PullResponse, Awaitable[pubsub.PullResponse]] 

456 ]: 

457 raise NotImplementedError() 

458 

459 @property 

460 def streaming_pull( 

461 self, 

462 ) -> Callable[ 

463 [pubsub.StreamingPullRequest], 

464 Union[pubsub.StreamingPullResponse, Awaitable[pubsub.StreamingPullResponse]], 

465 ]: 

466 raise NotImplementedError() 

467 

468 @property 

469 def modify_push_config( 

470 self, 

471 ) -> Callable[ 

472 [pubsub.ModifyPushConfigRequest], 

473 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

474 ]: 

475 raise NotImplementedError() 

476 

477 @property 

478 def get_snapshot( 

479 self, 

480 ) -> Callable[ 

481 [pubsub.GetSnapshotRequest], Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]] 

482 ]: 

483 raise NotImplementedError() 

484 

485 @property 

486 def list_snapshots( 

487 self, 

488 ) -> Callable[ 

489 [pubsub.ListSnapshotsRequest], 

490 Union[pubsub.ListSnapshotsResponse, Awaitable[pubsub.ListSnapshotsResponse]], 

491 ]: 

492 raise NotImplementedError() 

493 

494 @property 

495 def create_snapshot( 

496 self, 

497 ) -> Callable[ 

498 [pubsub.CreateSnapshotRequest], 

499 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]], 

500 ]: 

501 raise NotImplementedError() 

502 

503 @property 

504 def update_snapshot( 

505 self, 

506 ) -> Callable[ 

507 [pubsub.UpdateSnapshotRequest], 

508 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]], 

509 ]: 

510 raise NotImplementedError() 

511 

512 @property 

513 def delete_snapshot( 

514 self, 

515 ) -> Callable[ 

516 [pubsub.DeleteSnapshotRequest], 

517 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

518 ]: 

519 raise NotImplementedError() 

520 

521 @property 

522 def seek( 

523 self, 

524 ) -> Callable[ 

525 [pubsub.SeekRequest], Union[pubsub.SeekResponse, Awaitable[pubsub.SeekResponse]] 

526 ]: 

527 raise NotImplementedError() 

528 

529 @property 

530 def set_iam_policy( 

531 self, 

532 ) -> Callable[ 

533 [iam_policy_pb2.SetIamPolicyRequest], 

534 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

535 ]: 

536 raise NotImplementedError() 

537 

538 @property 

539 def get_iam_policy( 

540 self, 

541 ) -> Callable[ 

542 [iam_policy_pb2.GetIamPolicyRequest], 

543 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

544 ]: 

545 raise NotImplementedError() 

546 

547 @property 

548 def test_iam_permissions( 

549 self, 

550 ) -> Callable[ 

551 [iam_policy_pb2.TestIamPermissionsRequest], 

552 Union[ 

553 iam_policy_pb2.TestIamPermissionsResponse, 

554 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

555 ], 

556 ]: 

557 raise NotImplementedError() 

558 

559 @property 

560 def kind(self) -> str: 

561 raise NotImplementedError() 

562 

563 

564__all__ = ("SubscriberTransport",)