Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/subscriber/transports/base.py: 61%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

105 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.pubsub_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28 

29from google.iam.v1 import iam_policy_pb2 # type: ignore 

30from google.iam.v1 import policy_pb2 # type: ignore 

31from google.protobuf import empty_pb2 # type: ignore 

32from google.pubsub_v1.types import pubsub 

33 

34DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

35 client_library_version=package_version.__version__ 

36) 

37 

38 

39class SubscriberTransport(abc.ABC): 

40 """Abstract transport class for Subscriber.""" 

41 

42 AUTH_SCOPES = ( 

43 "https://www.googleapis.com/auth/cloud-platform", 

44 "https://www.googleapis.com/auth/pubsub", 

45 ) 

46 

47 DEFAULT_HOST: str = "pubsub.googleapis.com" 

48 

49 def __init__( 

50 self, 

51 *, 

52 host: str = DEFAULT_HOST, 

53 credentials: Optional[ga_credentials.Credentials] = None, 

54 credentials_file: Optional[str] = None, 

55 scopes: Optional[Sequence[str]] = None, 

56 quota_project_id: Optional[str] = None, 

57 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

58 always_use_jwt_access: Optional[bool] = False, 

59 api_audience: Optional[str] = None, 

60 **kwargs, 

61 ) -> None: 

62 """Instantiate the transport. 

63 

64 Args: 

65 host (Optional[str]): 

66 The hostname to connect to (default: 'pubsub.googleapis.com'). 

67 credentials (Optional[google.auth.credentials.Credentials]): The 

68 authorization credentials to attach to requests. These 

69 credentials identify the application to the service; if none 

70 are specified, the client will attempt to ascertain the 

71 credentials from the environment. 

72 credentials_file (Optional[str]): A file with credentials that can 

73 be loaded with :func:`google.auth.load_credentials_from_file`. 

74 This argument is mutually exclusive with credentials. 

75 scopes (Optional[Sequence[str]]): A list of scopes. 

76 quota_project_id (Optional[str]): An optional project to use for billing 

77 and quota. 

78 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

79 The client info used to send a user-agent string along with 

80 API requests. If ``None``, then default info will be used. 

81 Generally, you only need to set this if you're developing 

82 your own client library. 

83 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

84 be used for service account credentials. 

85 """ 

86 

87 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

88 

89 # Save the scopes. 

90 self._scopes = scopes 

91 if not hasattr(self, "_ignore_credentials"): 

92 self._ignore_credentials: bool = False 

93 

94 # If no credentials are provided, then determine the appropriate 

95 # defaults. 

96 if credentials and credentials_file: 

97 raise core_exceptions.DuplicateCredentialArgs( 

98 "'credentials_file' and 'credentials' are mutually exclusive" 

99 ) 

100 

101 if credentials_file is not None: 

102 credentials, _ = google.auth.load_credentials_from_file( 

103 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

104 ) 

105 elif credentials is None and not self._ignore_credentials: 

106 credentials, _ = google.auth.default( 

107 **scopes_kwargs, quota_project_id=quota_project_id 

108 ) 

109 # Don't apply audience if the credentials file passed from user. 

110 if hasattr(credentials, "with_gdch_audience"): 

111 credentials = credentials.with_gdch_audience( 

112 api_audience if api_audience else host 

113 ) 

114 

115 # If the credentials are service account credentials, then always try to use self signed JWT. 

116 if ( 

117 always_use_jwt_access 

118 and isinstance(credentials, service_account.Credentials) 

119 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

120 ): 

121 credentials = credentials.with_always_use_jwt_access(True) 

122 

123 # Save the credentials. 

124 self._credentials = credentials 

125 

126 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

127 if ":" not in host: 

128 host += ":443" 

129 self._host = host 

130 

131 @property 

132 def host(self): 

133 return self._host 

134 

135 def _prep_wrapped_messages(self, client_info): 

136 # Precompute the wrapped methods. 

137 self._wrapped_methods = { 

138 self.create_subscription: gapic_v1.method.wrap_method( 

139 self.create_subscription, 

140 default_retry=retries.Retry( 

141 initial=0.1, 

142 maximum=60.0, 

143 multiplier=1.3, 

144 predicate=retries.if_exception_type( 

145 core_exceptions.Aborted, 

146 core_exceptions.ServiceUnavailable, 

147 core_exceptions.Unknown, 

148 ), 

149 deadline=60.0, 

150 ), 

151 default_timeout=60.0, 

152 client_info=client_info, 

153 ), 

154 self.get_subscription: gapic_v1.method.wrap_method( 

155 self.get_subscription, 

156 default_retry=retries.Retry( 

157 initial=0.1, 

158 maximum=60.0, 

159 multiplier=1.3, 

160 predicate=retries.if_exception_type( 

161 core_exceptions.Aborted, 

162 core_exceptions.ServiceUnavailable, 

163 core_exceptions.Unknown, 

164 ), 

165 deadline=60.0, 

166 ), 

167 default_timeout=60.0, 

168 client_info=client_info, 

169 ), 

170 self.update_subscription: gapic_v1.method.wrap_method( 

171 self.update_subscription, 

172 default_retry=retries.Retry( 

173 initial=0.1, 

174 maximum=60.0, 

175 multiplier=1.3, 

176 predicate=retries.if_exception_type( 

177 core_exceptions.ServiceUnavailable, 

178 ), 

179 deadline=60.0, 

180 ), 

181 default_timeout=60.0, 

182 client_info=client_info, 

183 ), 

184 self.list_subscriptions: gapic_v1.method.wrap_method( 

185 self.list_subscriptions, 

186 default_retry=retries.Retry( 

187 initial=0.1, 

188 maximum=60.0, 

189 multiplier=1.3, 

190 predicate=retries.if_exception_type( 

191 core_exceptions.Aborted, 

192 core_exceptions.ServiceUnavailable, 

193 core_exceptions.Unknown, 

194 ), 

195 deadline=60.0, 

196 ), 

197 default_timeout=60.0, 

198 client_info=client_info, 

199 ), 

200 self.delete_subscription: gapic_v1.method.wrap_method( 

201 self.delete_subscription, 

202 default_retry=retries.Retry( 

203 initial=0.1, 

204 maximum=60.0, 

205 multiplier=1.3, 

206 predicate=retries.if_exception_type( 

207 core_exceptions.ServiceUnavailable, 

208 ), 

209 deadline=60.0, 

210 ), 

211 default_timeout=60.0, 

212 client_info=client_info, 

213 ), 

214 self.modify_ack_deadline: gapic_v1.method.wrap_method( 

215 self.modify_ack_deadline, 

216 default_retry=retries.Retry( 

217 initial=0.1, 

218 maximum=60.0, 

219 multiplier=1.3, 

220 predicate=retries.if_exception_type( 

221 core_exceptions.ServiceUnavailable, 

222 ), 

223 deadline=60.0, 

224 ), 

225 default_timeout=60.0, 

226 client_info=client_info, 

227 ), 

228 self.acknowledge: gapic_v1.method.wrap_method( 

229 self.acknowledge, 

230 default_retry=retries.Retry( 

231 initial=0.1, 

232 maximum=60.0, 

233 multiplier=1.3, 

234 predicate=retries.if_exception_type( 

235 core_exceptions.ServiceUnavailable, 

236 ), 

237 deadline=60.0, 

238 ), 

239 default_timeout=60.0, 

240 client_info=client_info, 

241 ), 

242 self.pull: gapic_v1.method.wrap_method( 

243 self.pull, 

244 default_retry=retries.Retry( 

245 initial=0.1, 

246 maximum=60.0, 

247 multiplier=1.3, 

248 predicate=retries.if_exception_type( 

249 core_exceptions.Aborted, 

250 core_exceptions.InternalServerError, 

251 core_exceptions.ServiceUnavailable, 

252 core_exceptions.Unknown, 

253 ), 

254 deadline=60.0, 

255 ), 

256 default_timeout=60.0, 

257 client_info=client_info, 

258 ), 

259 self.streaming_pull: gapic_v1.method.wrap_method( 

260 self.streaming_pull, 

261 default_retry=retries.Retry( 

262 initial=0.1, 

263 maximum=60.0, 

264 multiplier=4, 

265 predicate=retries.if_exception_type( 

266 core_exceptions.Aborted, 

267 core_exceptions.DeadlineExceeded, 

268 core_exceptions.InternalServerError, 

269 core_exceptions.ResourceExhausted, 

270 core_exceptions.ServiceUnavailable, 

271 ), 

272 deadline=900.0, 

273 ), 

274 default_timeout=900.0, 

275 client_info=client_info, 

276 ), 

277 self.modify_push_config: gapic_v1.method.wrap_method( 

278 self.modify_push_config, 

279 default_retry=retries.Retry( 

280 initial=0.1, 

281 maximum=60.0, 

282 multiplier=1.3, 

283 predicate=retries.if_exception_type( 

284 core_exceptions.ServiceUnavailable, 

285 ), 

286 deadline=60.0, 

287 ), 

288 default_timeout=60.0, 

289 client_info=client_info, 

290 ), 

291 self.get_snapshot: gapic_v1.method.wrap_method( 

292 self.get_snapshot, 

293 default_retry=retries.Retry( 

294 initial=0.1, 

295 maximum=60.0, 

296 multiplier=1.3, 

297 predicate=retries.if_exception_type( 

298 core_exceptions.Aborted, 

299 core_exceptions.ServiceUnavailable, 

300 core_exceptions.Unknown, 

301 ), 

302 deadline=60.0, 

303 ), 

304 default_timeout=60.0, 

305 client_info=client_info, 

306 ), 

307 self.list_snapshots: gapic_v1.method.wrap_method( 

308 self.list_snapshots, 

309 default_retry=retries.Retry( 

310 initial=0.1, 

311 maximum=60.0, 

312 multiplier=1.3, 

313 predicate=retries.if_exception_type( 

314 core_exceptions.Aborted, 

315 core_exceptions.ServiceUnavailable, 

316 core_exceptions.Unknown, 

317 ), 

318 deadline=60.0, 

319 ), 

320 default_timeout=60.0, 

321 client_info=client_info, 

322 ), 

323 self.create_snapshot: gapic_v1.method.wrap_method( 

324 self.create_snapshot, 

325 default_retry=retries.Retry( 

326 initial=0.1, 

327 maximum=60.0, 

328 multiplier=1.3, 

329 predicate=retries.if_exception_type( 

330 core_exceptions.ServiceUnavailable, 

331 ), 

332 deadline=60.0, 

333 ), 

334 default_timeout=60.0, 

335 client_info=client_info, 

336 ), 

337 self.update_snapshot: gapic_v1.method.wrap_method( 

338 self.update_snapshot, 

339 default_retry=retries.Retry( 

340 initial=0.1, 

341 maximum=60.0, 

342 multiplier=1.3, 

343 predicate=retries.if_exception_type( 

344 core_exceptions.ServiceUnavailable, 

345 ), 

346 deadline=60.0, 

347 ), 

348 default_timeout=60.0, 

349 client_info=client_info, 

350 ), 

351 self.delete_snapshot: gapic_v1.method.wrap_method( 

352 self.delete_snapshot, 

353 default_retry=retries.Retry( 

354 initial=0.1, 

355 maximum=60.0, 

356 multiplier=1.3, 

357 predicate=retries.if_exception_type( 

358 core_exceptions.ServiceUnavailable, 

359 ), 

360 deadline=60.0, 

361 ), 

362 default_timeout=60.0, 

363 client_info=client_info, 

364 ), 

365 self.seek: gapic_v1.method.wrap_method( 

366 self.seek, 

367 default_retry=retries.Retry( 

368 initial=0.1, 

369 maximum=60.0, 

370 multiplier=1.3, 

371 predicate=retries.if_exception_type( 

372 core_exceptions.Aborted, 

373 core_exceptions.ServiceUnavailable, 

374 core_exceptions.Unknown, 

375 ), 

376 deadline=60.0, 

377 ), 

378 default_timeout=60.0, 

379 client_info=client_info, 

380 ), 

381 self.get_iam_policy: gapic_v1.method.wrap_method( 

382 self.get_iam_policy, 

383 default_timeout=None, 

384 client_info=client_info, 

385 ), 

386 self.set_iam_policy: gapic_v1.method.wrap_method( 

387 self.set_iam_policy, 

388 default_timeout=None, 

389 client_info=client_info, 

390 ), 

391 self.test_iam_permissions: gapic_v1.method.wrap_method( 

392 self.test_iam_permissions, 

393 default_timeout=None, 

394 client_info=client_info, 

395 ), 

396 } 

397 

398 def close(self): 

399 """Closes resources associated with the transport. 

400 

401 .. warning:: 

402 Only call this method if the transport is NOT shared 

403 with other clients - this may cause errors in other clients! 

404 """ 

405 raise NotImplementedError() 

406 

407 @property 

408 def create_subscription( 

409 self, 

410 ) -> Callable[ 

411 [pubsub.Subscription], 

412 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

413 ]: 

414 raise NotImplementedError() 

415 

416 @property 

417 def get_subscription( 

418 self, 

419 ) -> Callable[ 

420 [pubsub.GetSubscriptionRequest], 

421 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

422 ]: 

423 raise NotImplementedError() 

424 

425 @property 

426 def update_subscription( 

427 self, 

428 ) -> Callable[ 

429 [pubsub.UpdateSubscriptionRequest], 

430 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

431 ]: 

432 raise NotImplementedError() 

433 

434 @property 

435 def list_subscriptions( 

436 self, 

437 ) -> Callable[ 

438 [pubsub.ListSubscriptionsRequest], 

439 Union[ 

440 pubsub.ListSubscriptionsResponse, 

441 Awaitable[pubsub.ListSubscriptionsResponse], 

442 ], 

443 ]: 

444 raise NotImplementedError() 

445 

446 @property 

447 def delete_subscription( 

448 self, 

449 ) -> Callable[ 

450 [pubsub.DeleteSubscriptionRequest], 

451 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

452 ]: 

453 raise NotImplementedError() 

454 

455 @property 

456 def modify_ack_deadline( 

457 self, 

458 ) -> Callable[ 

459 [pubsub.ModifyAckDeadlineRequest], 

460 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

461 ]: 

462 raise NotImplementedError() 

463 

464 @property 

465 def acknowledge( 

466 self, 

467 ) -> Callable[ 

468 [pubsub.AcknowledgeRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]] 

469 ]: 

470 raise NotImplementedError() 

471 

472 @property 

473 def pull( 

474 self, 

475 ) -> Callable[ 

476 [pubsub.PullRequest], Union[pubsub.PullResponse, Awaitable[pubsub.PullResponse]] 

477 ]: 

478 raise NotImplementedError() 

479 

480 @property 

481 def streaming_pull( 

482 self, 

483 ) -> Callable[ 

484 [pubsub.StreamingPullRequest], 

485 Union[pubsub.StreamingPullResponse, Awaitable[pubsub.StreamingPullResponse]], 

486 ]: 

487 raise NotImplementedError() 

488 

489 @property 

490 def modify_push_config( 

491 self, 

492 ) -> Callable[ 

493 [pubsub.ModifyPushConfigRequest], 

494 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

495 ]: 

496 raise NotImplementedError() 

497 

498 @property 

499 def get_snapshot( 

500 self, 

501 ) -> Callable[ 

502 [pubsub.GetSnapshotRequest], Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]] 

503 ]: 

504 raise NotImplementedError() 

505 

506 @property 

507 def list_snapshots( 

508 self, 

509 ) -> Callable[ 

510 [pubsub.ListSnapshotsRequest], 

511 Union[pubsub.ListSnapshotsResponse, Awaitable[pubsub.ListSnapshotsResponse]], 

512 ]: 

513 raise NotImplementedError() 

514 

515 @property 

516 def create_snapshot( 

517 self, 

518 ) -> Callable[ 

519 [pubsub.CreateSnapshotRequest], 

520 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]], 

521 ]: 

522 raise NotImplementedError() 

523 

524 @property 

525 def update_snapshot( 

526 self, 

527 ) -> Callable[ 

528 [pubsub.UpdateSnapshotRequest], 

529 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]], 

530 ]: 

531 raise NotImplementedError() 

532 

533 @property 

534 def delete_snapshot( 

535 self, 

536 ) -> Callable[ 

537 [pubsub.DeleteSnapshotRequest], 

538 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

539 ]: 

540 raise NotImplementedError() 

541 

542 @property 

543 def seek( 

544 self, 

545 ) -> Callable[ 

546 [pubsub.SeekRequest], Union[pubsub.SeekResponse, Awaitable[pubsub.SeekResponse]] 

547 ]: 

548 raise NotImplementedError() 

549 

550 @property 

551 def set_iam_policy( 

552 self, 

553 ) -> Callable[ 

554 [iam_policy_pb2.SetIamPolicyRequest], 

555 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

556 ]: 

557 raise NotImplementedError() 

558 

559 @property 

560 def get_iam_policy( 

561 self, 

562 ) -> Callable[ 

563 [iam_policy_pb2.GetIamPolicyRequest], 

564 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

565 ]: 

566 raise NotImplementedError() 

567 

568 @property 

569 def test_iam_permissions( 

570 self, 

571 ) -> Callable[ 

572 [iam_policy_pb2.TestIamPermissionsRequest], 

573 Union[ 

574 iam_policy_pb2.TestIamPermissionsResponse, 

575 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

576 ], 

577 ]: 

578 raise NotImplementedError() 

579 

580 @property 

581 def kind(self) -> str: 

582 raise NotImplementedError() 

583 

584 

585__all__ = ("SubscriberTransport",)