Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/subscriber/transports/base.py: 61%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

107 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.pubsub_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28import google.protobuf 

29 

30from google.iam.v1 import iam_policy_pb2 # type: ignore 

31from google.iam.v1 import policy_pb2 # type: ignore 

32from google.protobuf import empty_pb2 # type: ignore 

33from google.pubsub_v1.types import pubsub 

34 

35DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

36 client_library_version=package_version.__version__ 

37) 

38 

39if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

40 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

41 

42 

43class SubscriberTransport(abc.ABC): 

44 """Abstract transport class for Subscriber.""" 

45 

46 AUTH_SCOPES = ( 

47 "https://www.googleapis.com/auth/cloud-platform", 

48 "https://www.googleapis.com/auth/pubsub", 

49 ) 

50 

51 DEFAULT_HOST: str = "pubsub.googleapis.com" 

52 

53 def __init__( 

54 self, 

55 *, 

56 host: str = DEFAULT_HOST, 

57 credentials: Optional[ga_credentials.Credentials] = None, 

58 credentials_file: Optional[str] = None, 

59 scopes: Optional[Sequence[str]] = None, 

60 quota_project_id: Optional[str] = None, 

61 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

62 always_use_jwt_access: Optional[bool] = False, 

63 api_audience: Optional[str] = None, 

64 **kwargs, 

65 ) -> None: 

66 """Instantiate the transport. 

67 

68 Args: 

69 host (Optional[str]): 

70 The hostname to connect to (default: 'pubsub.googleapis.com'). 

71 credentials (Optional[google.auth.credentials.Credentials]): The 

72 authorization credentials to attach to requests. These 

73 credentials identify the application to the service; if none 

74 are specified, the client will attempt to ascertain the 

75 credentials from the environment. 

76 credentials_file (Optional[str]): A file with credentials that can 

77 be loaded with :func:`google.auth.load_credentials_from_file`. 

78 This argument is mutually exclusive with credentials. 

79 scopes (Optional[Sequence[str]]): A list of scopes. 

80 quota_project_id (Optional[str]): An optional project to use for billing 

81 and quota. 

82 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

83 The client info used to send a user-agent string along with 

84 API requests. If ``None``, then default info will be used. 

85 Generally, you only need to set this if you're developing 

86 your own client library. 

87 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

88 be used for service account credentials. 

89 """ 

90 

91 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

92 

93 # Save the scopes. 

94 self._scopes = scopes 

95 if not hasattr(self, "_ignore_credentials"): 

96 self._ignore_credentials: bool = False 

97 

98 # If no credentials are provided, then determine the appropriate 

99 # defaults. 

100 if credentials and credentials_file: 

101 raise core_exceptions.DuplicateCredentialArgs( 

102 "'credentials_file' and 'credentials' are mutually exclusive" 

103 ) 

104 

105 if credentials_file is not None: 

106 credentials, _ = google.auth.load_credentials_from_file( 

107 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

108 ) 

109 elif credentials is None and not self._ignore_credentials: 

110 credentials, _ = google.auth.default( 

111 **scopes_kwargs, quota_project_id=quota_project_id 

112 ) 

113 # Don't apply audience if the credentials file passed from user. 

114 if hasattr(credentials, "with_gdch_audience"): 

115 credentials = credentials.with_gdch_audience( 

116 api_audience if api_audience else host 

117 ) 

118 

119 # If the credentials are service account credentials, then always try to use self signed JWT. 

120 if ( 

121 always_use_jwt_access 

122 and isinstance(credentials, service_account.Credentials) 

123 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

124 ): 

125 credentials = credentials.with_always_use_jwt_access(True) 

126 

127 # Save the credentials. 

128 self._credentials = credentials 

129 

130 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

131 if ":" not in host: 

132 host += ":443" 

133 self._host = host 

134 

135 @property 

136 def host(self): 

137 return self._host 

138 

139 def _prep_wrapped_messages(self, client_info): 

140 # Precompute the wrapped methods. 

141 self._wrapped_methods = { 

142 self.create_subscription: gapic_v1.method.wrap_method( 

143 self.create_subscription, 

144 default_retry=retries.Retry( 

145 initial=0.1, 

146 maximum=60.0, 

147 multiplier=1.3, 

148 predicate=retries.if_exception_type( 

149 core_exceptions.Aborted, 

150 core_exceptions.ServiceUnavailable, 

151 core_exceptions.Unknown, 

152 ), 

153 deadline=60.0, 

154 ), 

155 default_timeout=60.0, 

156 client_info=client_info, 

157 ), 

158 self.get_subscription: gapic_v1.method.wrap_method( 

159 self.get_subscription, 

160 default_retry=retries.Retry( 

161 initial=0.1, 

162 maximum=60.0, 

163 multiplier=1.3, 

164 predicate=retries.if_exception_type( 

165 core_exceptions.Aborted, 

166 core_exceptions.ServiceUnavailable, 

167 core_exceptions.Unknown, 

168 ), 

169 deadline=60.0, 

170 ), 

171 default_timeout=60.0, 

172 client_info=client_info, 

173 ), 

174 self.update_subscription: gapic_v1.method.wrap_method( 

175 self.update_subscription, 

176 default_retry=retries.Retry( 

177 initial=0.1, 

178 maximum=60.0, 

179 multiplier=1.3, 

180 predicate=retries.if_exception_type( 

181 core_exceptions.ServiceUnavailable, 

182 ), 

183 deadline=60.0, 

184 ), 

185 default_timeout=60.0, 

186 client_info=client_info, 

187 ), 

188 self.list_subscriptions: gapic_v1.method.wrap_method( 

189 self.list_subscriptions, 

190 default_retry=retries.Retry( 

191 initial=0.1, 

192 maximum=60.0, 

193 multiplier=1.3, 

194 predicate=retries.if_exception_type( 

195 core_exceptions.Aborted, 

196 core_exceptions.ServiceUnavailable, 

197 core_exceptions.Unknown, 

198 ), 

199 deadline=60.0, 

200 ), 

201 default_timeout=60.0, 

202 client_info=client_info, 

203 ), 

204 self.delete_subscription: gapic_v1.method.wrap_method( 

205 self.delete_subscription, 

206 default_retry=retries.Retry( 

207 initial=0.1, 

208 maximum=60.0, 

209 multiplier=1.3, 

210 predicate=retries.if_exception_type( 

211 core_exceptions.ServiceUnavailable, 

212 ), 

213 deadline=60.0, 

214 ), 

215 default_timeout=60.0, 

216 client_info=client_info, 

217 ), 

218 self.modify_ack_deadline: gapic_v1.method.wrap_method( 

219 self.modify_ack_deadline, 

220 default_retry=retries.Retry( 

221 initial=0.1, 

222 maximum=60.0, 

223 multiplier=1.3, 

224 predicate=retries.if_exception_type( 

225 core_exceptions.ServiceUnavailable, 

226 ), 

227 deadline=60.0, 

228 ), 

229 default_timeout=60.0, 

230 client_info=client_info, 

231 ), 

232 self.acknowledge: gapic_v1.method.wrap_method( 

233 self.acknowledge, 

234 default_retry=retries.Retry( 

235 initial=0.1, 

236 maximum=60.0, 

237 multiplier=1.3, 

238 predicate=retries.if_exception_type( 

239 core_exceptions.ServiceUnavailable, 

240 ), 

241 deadline=60.0, 

242 ), 

243 default_timeout=60.0, 

244 client_info=client_info, 

245 ), 

246 self.pull: gapic_v1.method.wrap_method( 

247 self.pull, 

248 default_retry=retries.Retry( 

249 initial=0.1, 

250 maximum=60.0, 

251 multiplier=1.3, 

252 predicate=retries.if_exception_type( 

253 core_exceptions.Aborted, 

254 core_exceptions.InternalServerError, 

255 core_exceptions.ServiceUnavailable, 

256 core_exceptions.Unknown, 

257 ), 

258 deadline=60.0, 

259 ), 

260 default_timeout=60.0, 

261 client_info=client_info, 

262 ), 

263 self.streaming_pull: gapic_v1.method.wrap_method( 

264 self.streaming_pull, 

265 default_retry=retries.Retry( 

266 initial=0.1, 

267 maximum=60.0, 

268 multiplier=4, 

269 predicate=retries.if_exception_type( 

270 core_exceptions.Aborted, 

271 core_exceptions.DeadlineExceeded, 

272 core_exceptions.InternalServerError, 

273 core_exceptions.ResourceExhausted, 

274 core_exceptions.ServiceUnavailable, 

275 ), 

276 deadline=900.0, 

277 ), 

278 default_timeout=900.0, 

279 client_info=client_info, 

280 ), 

281 self.modify_push_config: gapic_v1.method.wrap_method( 

282 self.modify_push_config, 

283 default_retry=retries.Retry( 

284 initial=0.1, 

285 maximum=60.0, 

286 multiplier=1.3, 

287 predicate=retries.if_exception_type( 

288 core_exceptions.ServiceUnavailable, 

289 ), 

290 deadline=60.0, 

291 ), 

292 default_timeout=60.0, 

293 client_info=client_info, 

294 ), 

295 self.get_snapshot: gapic_v1.method.wrap_method( 

296 self.get_snapshot, 

297 default_retry=retries.Retry( 

298 initial=0.1, 

299 maximum=60.0, 

300 multiplier=1.3, 

301 predicate=retries.if_exception_type( 

302 core_exceptions.Aborted, 

303 core_exceptions.ServiceUnavailable, 

304 core_exceptions.Unknown, 

305 ), 

306 deadline=60.0, 

307 ), 

308 default_timeout=60.0, 

309 client_info=client_info, 

310 ), 

311 self.list_snapshots: gapic_v1.method.wrap_method( 

312 self.list_snapshots, 

313 default_retry=retries.Retry( 

314 initial=0.1, 

315 maximum=60.0, 

316 multiplier=1.3, 

317 predicate=retries.if_exception_type( 

318 core_exceptions.Aborted, 

319 core_exceptions.ServiceUnavailable, 

320 core_exceptions.Unknown, 

321 ), 

322 deadline=60.0, 

323 ), 

324 default_timeout=60.0, 

325 client_info=client_info, 

326 ), 

327 self.create_snapshot: gapic_v1.method.wrap_method( 

328 self.create_snapshot, 

329 default_retry=retries.Retry( 

330 initial=0.1, 

331 maximum=60.0, 

332 multiplier=1.3, 

333 predicate=retries.if_exception_type( 

334 core_exceptions.ServiceUnavailable, 

335 ), 

336 deadline=60.0, 

337 ), 

338 default_timeout=60.0, 

339 client_info=client_info, 

340 ), 

341 self.update_snapshot: gapic_v1.method.wrap_method( 

342 self.update_snapshot, 

343 default_retry=retries.Retry( 

344 initial=0.1, 

345 maximum=60.0, 

346 multiplier=1.3, 

347 predicate=retries.if_exception_type( 

348 core_exceptions.ServiceUnavailable, 

349 ), 

350 deadline=60.0, 

351 ), 

352 default_timeout=60.0, 

353 client_info=client_info, 

354 ), 

355 self.delete_snapshot: gapic_v1.method.wrap_method( 

356 self.delete_snapshot, 

357 default_retry=retries.Retry( 

358 initial=0.1, 

359 maximum=60.0, 

360 multiplier=1.3, 

361 predicate=retries.if_exception_type( 

362 core_exceptions.ServiceUnavailable, 

363 ), 

364 deadline=60.0, 

365 ), 

366 default_timeout=60.0, 

367 client_info=client_info, 

368 ), 

369 self.seek: gapic_v1.method.wrap_method( 

370 self.seek, 

371 default_retry=retries.Retry( 

372 initial=0.1, 

373 maximum=60.0, 

374 multiplier=1.3, 

375 predicate=retries.if_exception_type( 

376 core_exceptions.Aborted, 

377 core_exceptions.ServiceUnavailable, 

378 core_exceptions.Unknown, 

379 ), 

380 deadline=60.0, 

381 ), 

382 default_timeout=60.0, 

383 client_info=client_info, 

384 ), 

385 self.get_iam_policy: gapic_v1.method.wrap_method( 

386 self.get_iam_policy, 

387 default_timeout=None, 

388 client_info=client_info, 

389 ), 

390 self.set_iam_policy: gapic_v1.method.wrap_method( 

391 self.set_iam_policy, 

392 default_timeout=None, 

393 client_info=client_info, 

394 ), 

395 self.test_iam_permissions: gapic_v1.method.wrap_method( 

396 self.test_iam_permissions, 

397 default_timeout=None, 

398 client_info=client_info, 

399 ), 

400 } 

401 

402 def close(self): 

403 """Closes resources associated with the transport. 

404 

405 .. warning:: 

406 Only call this method if the transport is NOT shared 

407 with other clients - this may cause errors in other clients! 

408 """ 

409 raise NotImplementedError() 

410 

411 @property 

412 def create_subscription( 

413 self, 

414 ) -> Callable[ 

415 [pubsub.Subscription], 

416 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

417 ]: 

418 raise NotImplementedError() 

419 

420 @property 

421 def get_subscription( 

422 self, 

423 ) -> Callable[ 

424 [pubsub.GetSubscriptionRequest], 

425 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

426 ]: 

427 raise NotImplementedError() 

428 

429 @property 

430 def update_subscription( 

431 self, 

432 ) -> Callable[ 

433 [pubsub.UpdateSubscriptionRequest], 

434 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

435 ]: 

436 raise NotImplementedError() 

437 

438 @property 

439 def list_subscriptions( 

440 self, 

441 ) -> Callable[ 

442 [pubsub.ListSubscriptionsRequest], 

443 Union[ 

444 pubsub.ListSubscriptionsResponse, 

445 Awaitable[pubsub.ListSubscriptionsResponse], 

446 ], 

447 ]: 

448 raise NotImplementedError() 

449 

450 @property 

451 def delete_subscription( 

452 self, 

453 ) -> Callable[ 

454 [pubsub.DeleteSubscriptionRequest], 

455 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

456 ]: 

457 raise NotImplementedError() 

458 

459 @property 

460 def modify_ack_deadline( 

461 self, 

462 ) -> Callable[ 

463 [pubsub.ModifyAckDeadlineRequest], 

464 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

465 ]: 

466 raise NotImplementedError() 

467 

468 @property 

469 def acknowledge( 

470 self, 

471 ) -> Callable[ 

472 [pubsub.AcknowledgeRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]] 

473 ]: 

474 raise NotImplementedError() 

475 

476 @property 

477 def pull( 

478 self, 

479 ) -> Callable[ 

480 [pubsub.PullRequest], Union[pubsub.PullResponse, Awaitable[pubsub.PullResponse]] 

481 ]: 

482 raise NotImplementedError() 

483 

484 @property 

485 def streaming_pull( 

486 self, 

487 ) -> Callable[ 

488 [pubsub.StreamingPullRequest], 

489 Union[pubsub.StreamingPullResponse, Awaitable[pubsub.StreamingPullResponse]], 

490 ]: 

491 raise NotImplementedError() 

492 

493 @property 

494 def modify_push_config( 

495 self, 

496 ) -> Callable[ 

497 [pubsub.ModifyPushConfigRequest], 

498 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

499 ]: 

500 raise NotImplementedError() 

501 

502 @property 

503 def get_snapshot( 

504 self, 

505 ) -> Callable[ 

506 [pubsub.GetSnapshotRequest], Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]] 

507 ]: 

508 raise NotImplementedError() 

509 

510 @property 

511 def list_snapshots( 

512 self, 

513 ) -> Callable[ 

514 [pubsub.ListSnapshotsRequest], 

515 Union[pubsub.ListSnapshotsResponse, Awaitable[pubsub.ListSnapshotsResponse]], 

516 ]: 

517 raise NotImplementedError() 

518 

519 @property 

520 def create_snapshot( 

521 self, 

522 ) -> Callable[ 

523 [pubsub.CreateSnapshotRequest], 

524 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]], 

525 ]: 

526 raise NotImplementedError() 

527 

528 @property 

529 def update_snapshot( 

530 self, 

531 ) -> Callable[ 

532 [pubsub.UpdateSnapshotRequest], 

533 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]], 

534 ]: 

535 raise NotImplementedError() 

536 

537 @property 

538 def delete_snapshot( 

539 self, 

540 ) -> Callable[ 

541 [pubsub.DeleteSnapshotRequest], 

542 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

543 ]: 

544 raise NotImplementedError() 

545 

546 @property 

547 def seek( 

548 self, 

549 ) -> Callable[ 

550 [pubsub.SeekRequest], Union[pubsub.SeekResponse, Awaitable[pubsub.SeekResponse]] 

551 ]: 

552 raise NotImplementedError() 

553 

554 @property 

555 def set_iam_policy( 

556 self, 

557 ) -> Callable[ 

558 [iam_policy_pb2.SetIamPolicyRequest], 

559 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

560 ]: 

561 raise NotImplementedError() 

562 

563 @property 

564 def get_iam_policy( 

565 self, 

566 ) -> Callable[ 

567 [iam_policy_pb2.GetIamPolicyRequest], 

568 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

569 ]: 

570 raise NotImplementedError() 

571 

572 @property 

573 def test_iam_permissions( 

574 self, 

575 ) -> Callable[ 

576 [iam_policy_pb2.TestIamPermissionsRequest], 

577 Union[ 

578 iam_policy_pb2.TestIamPermissionsResponse, 

579 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

580 ], 

581 ]: 

582 raise NotImplementedError() 

583 

584 @property 

585 def kind(self) -> str: 

586 raise NotImplementedError() 

587 

588 

589__all__ = ("SubscriberTransport",)