Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/subscriber/transports/base.py: 61%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

107 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.pubsub_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28import google.protobuf 

29 

30from google.iam.v1 import iam_policy_pb2 # type: ignore 

31from google.iam.v1 import policy_pb2 # type: ignore 

32from google.protobuf import empty_pb2 # type: ignore 

33from google.pubsub_v1.types import pubsub 

34 

35DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

36 client_library_version=package_version.__version__ 

37) 

38 

39if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

40 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

41 

42 

43class SubscriberTransport(abc.ABC): 

44 """Abstract transport class for Subscriber.""" 

45 

46 AUTH_SCOPES = ( 

47 "https://www.googleapis.com/auth/cloud-platform", 

48 "https://www.googleapis.com/auth/pubsub", 

49 ) 

50 

51 DEFAULT_HOST: str = "pubsub.googleapis.com" 

52 

53 def __init__( 

54 self, 

55 *, 

56 host: str = DEFAULT_HOST, 

57 credentials: Optional[ga_credentials.Credentials] = None, 

58 credentials_file: Optional[str] = None, 

59 scopes: Optional[Sequence[str]] = None, 

60 quota_project_id: Optional[str] = None, 

61 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

62 always_use_jwt_access: Optional[bool] = False, 

63 api_audience: Optional[str] = None, 

64 **kwargs, 

65 ) -> None: 

66 """Instantiate the transport. 

67 

68 Args: 

69 host (Optional[str]): 

70 The hostname to connect to (default: 'pubsub.googleapis.com'). 

71 credentials (Optional[google.auth.credentials.Credentials]): The 

72 authorization credentials to attach to requests. These 

73 credentials identify the application to the service; if none 

74 are specified, the client will attempt to ascertain the 

75 credentials from the environment. 

76 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

77 be loaded with :func:`google.auth.load_credentials_from_file`. 

78 This argument is mutually exclusive with credentials. This argument will be 

79 removed in the next major version of this library. 

80 scopes (Optional[Sequence[str]]): A list of scopes. 

81 quota_project_id (Optional[str]): An optional project to use for billing 

82 and quota. 

83 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

84 The client info used to send a user-agent string along with 

85 API requests. If ``None``, then default info will be used. 

86 Generally, you only need to set this if you're developing 

87 your own client library. 

88 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

89 be used for service account credentials. 

90 """ 

91 

92 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

93 

94 # Save the scopes. 

95 self._scopes = scopes 

96 if not hasattr(self, "_ignore_credentials"): 

97 self._ignore_credentials: bool = False 

98 

99 # If no credentials are provided, then determine the appropriate 

100 # defaults. 

101 if credentials and credentials_file: 

102 raise core_exceptions.DuplicateCredentialArgs( 

103 "'credentials_file' and 'credentials' are mutually exclusive" 

104 ) 

105 

106 if credentials_file is not None: 

107 credentials, _ = google.auth.load_credentials_from_file( 

108 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

109 ) 

110 elif credentials is None and not self._ignore_credentials: 

111 credentials, _ = google.auth.default( 

112 **scopes_kwargs, quota_project_id=quota_project_id 

113 ) 

114 # Don't apply audience if the credentials file passed from user. 

115 if hasattr(credentials, "with_gdch_audience"): 

116 credentials = credentials.with_gdch_audience( 

117 api_audience if api_audience else host 

118 ) 

119 

120 # If the credentials are service account credentials, then always try to use self signed JWT. 

121 if ( 

122 always_use_jwt_access 

123 and isinstance(credentials, service_account.Credentials) 

124 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

125 ): 

126 credentials = credentials.with_always_use_jwt_access(True) 

127 

128 # Save the credentials. 

129 self._credentials = credentials 

130 

131 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

132 if ":" not in host: 

133 host += ":443" 

134 self._host = host 

135 

136 @property 

137 def host(self): 

138 return self._host 

139 

140 def _prep_wrapped_messages(self, client_info): 

141 # Precompute the wrapped methods. 

142 self._wrapped_methods = { 

143 self.create_subscription: gapic_v1.method.wrap_method( 

144 self.create_subscription, 

145 default_retry=retries.Retry( 

146 initial=0.1, 

147 maximum=60.0, 

148 multiplier=1.3, 

149 predicate=retries.if_exception_type( 

150 core_exceptions.Aborted, 

151 core_exceptions.ServiceUnavailable, 

152 core_exceptions.Unknown, 

153 ), 

154 deadline=60.0, 

155 ), 

156 default_timeout=60.0, 

157 client_info=client_info, 

158 ), 

159 self.get_subscription: gapic_v1.method.wrap_method( 

160 self.get_subscription, 

161 default_retry=retries.Retry( 

162 initial=0.1, 

163 maximum=60.0, 

164 multiplier=1.3, 

165 predicate=retries.if_exception_type( 

166 core_exceptions.Aborted, 

167 core_exceptions.ServiceUnavailable, 

168 core_exceptions.Unknown, 

169 ), 

170 deadline=60.0, 

171 ), 

172 default_timeout=60.0, 

173 client_info=client_info, 

174 ), 

175 self.update_subscription: gapic_v1.method.wrap_method( 

176 self.update_subscription, 

177 default_retry=retries.Retry( 

178 initial=0.1, 

179 maximum=60.0, 

180 multiplier=1.3, 

181 predicate=retries.if_exception_type( 

182 core_exceptions.ServiceUnavailable, 

183 ), 

184 deadline=60.0, 

185 ), 

186 default_timeout=60.0, 

187 client_info=client_info, 

188 ), 

189 self.list_subscriptions: gapic_v1.method.wrap_method( 

190 self.list_subscriptions, 

191 default_retry=retries.Retry( 

192 initial=0.1, 

193 maximum=60.0, 

194 multiplier=1.3, 

195 predicate=retries.if_exception_type( 

196 core_exceptions.Aborted, 

197 core_exceptions.ServiceUnavailable, 

198 core_exceptions.Unknown, 

199 ), 

200 deadline=60.0, 

201 ), 

202 default_timeout=60.0, 

203 client_info=client_info, 

204 ), 

205 self.delete_subscription: gapic_v1.method.wrap_method( 

206 self.delete_subscription, 

207 default_retry=retries.Retry( 

208 initial=0.1, 

209 maximum=60.0, 

210 multiplier=1.3, 

211 predicate=retries.if_exception_type( 

212 core_exceptions.ServiceUnavailable, 

213 ), 

214 deadline=60.0, 

215 ), 

216 default_timeout=60.0, 

217 client_info=client_info, 

218 ), 

219 self.modify_ack_deadline: gapic_v1.method.wrap_method( 

220 self.modify_ack_deadline, 

221 default_retry=retries.Retry( 

222 initial=0.1, 

223 maximum=60.0, 

224 multiplier=1.3, 

225 predicate=retries.if_exception_type( 

226 core_exceptions.ServiceUnavailable, 

227 ), 

228 deadline=60.0, 

229 ), 

230 default_timeout=60.0, 

231 client_info=client_info, 

232 ), 

233 self.acknowledge: gapic_v1.method.wrap_method( 

234 self.acknowledge, 

235 default_retry=retries.Retry( 

236 initial=0.1, 

237 maximum=60.0, 

238 multiplier=1.3, 

239 predicate=retries.if_exception_type( 

240 core_exceptions.ServiceUnavailable, 

241 ), 

242 deadline=60.0, 

243 ), 

244 default_timeout=60.0, 

245 client_info=client_info, 

246 ), 

247 self.pull: gapic_v1.method.wrap_method( 

248 self.pull, 

249 default_retry=retries.Retry( 

250 initial=0.1, 

251 maximum=60.0, 

252 multiplier=1.3, 

253 predicate=retries.if_exception_type( 

254 core_exceptions.Aborted, 

255 core_exceptions.InternalServerError, 

256 core_exceptions.ServiceUnavailable, 

257 core_exceptions.Unknown, 

258 ), 

259 deadline=60.0, 

260 ), 

261 default_timeout=60.0, 

262 client_info=client_info, 

263 ), 

264 self.streaming_pull: gapic_v1.method.wrap_method( 

265 self.streaming_pull, 

266 default_retry=retries.Retry( 

267 initial=0.1, 

268 maximum=60.0, 

269 multiplier=4, 

270 predicate=retries.if_exception_type( 

271 core_exceptions.Aborted, 

272 core_exceptions.DeadlineExceeded, 

273 core_exceptions.InternalServerError, 

274 core_exceptions.ResourceExhausted, 

275 core_exceptions.ServiceUnavailable, 

276 ), 

277 deadline=900.0, 

278 ), 

279 default_timeout=900.0, 

280 client_info=client_info, 

281 ), 

282 self.modify_push_config: gapic_v1.method.wrap_method( 

283 self.modify_push_config, 

284 default_retry=retries.Retry( 

285 initial=0.1, 

286 maximum=60.0, 

287 multiplier=1.3, 

288 predicate=retries.if_exception_type( 

289 core_exceptions.ServiceUnavailable, 

290 ), 

291 deadline=60.0, 

292 ), 

293 default_timeout=60.0, 

294 client_info=client_info, 

295 ), 

296 self.get_snapshot: gapic_v1.method.wrap_method( 

297 self.get_snapshot, 

298 default_retry=retries.Retry( 

299 initial=0.1, 

300 maximum=60.0, 

301 multiplier=1.3, 

302 predicate=retries.if_exception_type( 

303 core_exceptions.Aborted, 

304 core_exceptions.ServiceUnavailable, 

305 core_exceptions.Unknown, 

306 ), 

307 deadline=60.0, 

308 ), 

309 default_timeout=60.0, 

310 client_info=client_info, 

311 ), 

312 self.list_snapshots: gapic_v1.method.wrap_method( 

313 self.list_snapshots, 

314 default_retry=retries.Retry( 

315 initial=0.1, 

316 maximum=60.0, 

317 multiplier=1.3, 

318 predicate=retries.if_exception_type( 

319 core_exceptions.Aborted, 

320 core_exceptions.ServiceUnavailable, 

321 core_exceptions.Unknown, 

322 ), 

323 deadline=60.0, 

324 ), 

325 default_timeout=60.0, 

326 client_info=client_info, 

327 ), 

328 self.create_snapshot: gapic_v1.method.wrap_method( 

329 self.create_snapshot, 

330 default_retry=retries.Retry( 

331 initial=0.1, 

332 maximum=60.0, 

333 multiplier=1.3, 

334 predicate=retries.if_exception_type( 

335 core_exceptions.ServiceUnavailable, 

336 ), 

337 deadline=60.0, 

338 ), 

339 default_timeout=60.0, 

340 client_info=client_info, 

341 ), 

342 self.update_snapshot: gapic_v1.method.wrap_method( 

343 self.update_snapshot, 

344 default_retry=retries.Retry( 

345 initial=0.1, 

346 maximum=60.0, 

347 multiplier=1.3, 

348 predicate=retries.if_exception_type( 

349 core_exceptions.ServiceUnavailable, 

350 ), 

351 deadline=60.0, 

352 ), 

353 default_timeout=60.0, 

354 client_info=client_info, 

355 ), 

356 self.delete_snapshot: gapic_v1.method.wrap_method( 

357 self.delete_snapshot, 

358 default_retry=retries.Retry( 

359 initial=0.1, 

360 maximum=60.0, 

361 multiplier=1.3, 

362 predicate=retries.if_exception_type( 

363 core_exceptions.ServiceUnavailable, 

364 ), 

365 deadline=60.0, 

366 ), 

367 default_timeout=60.0, 

368 client_info=client_info, 

369 ), 

370 self.seek: gapic_v1.method.wrap_method( 

371 self.seek, 

372 default_retry=retries.Retry( 

373 initial=0.1, 

374 maximum=60.0, 

375 multiplier=1.3, 

376 predicate=retries.if_exception_type( 

377 core_exceptions.Aborted, 

378 core_exceptions.ServiceUnavailable, 

379 core_exceptions.Unknown, 

380 ), 

381 deadline=60.0, 

382 ), 

383 default_timeout=60.0, 

384 client_info=client_info, 

385 ), 

386 self.get_iam_policy: gapic_v1.method.wrap_method( 

387 self.get_iam_policy, 

388 default_timeout=None, 

389 client_info=client_info, 

390 ), 

391 self.set_iam_policy: gapic_v1.method.wrap_method( 

392 self.set_iam_policy, 

393 default_timeout=None, 

394 client_info=client_info, 

395 ), 

396 self.test_iam_permissions: gapic_v1.method.wrap_method( 

397 self.test_iam_permissions, 

398 default_timeout=None, 

399 client_info=client_info, 

400 ), 

401 } 

402 

403 def close(self): 

404 """Closes resources associated with the transport. 

405 

406 .. warning:: 

407 Only call this method if the transport is NOT shared 

408 with other clients - this may cause errors in other clients! 

409 """ 

410 raise NotImplementedError() 

411 

412 @property 

413 def create_subscription( 

414 self, 

415 ) -> Callable[ 

416 [pubsub.Subscription], 

417 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

418 ]: 

419 raise NotImplementedError() 

420 

421 @property 

422 def get_subscription( 

423 self, 

424 ) -> Callable[ 

425 [pubsub.GetSubscriptionRequest], 

426 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

427 ]: 

428 raise NotImplementedError() 

429 

430 @property 

431 def update_subscription( 

432 self, 

433 ) -> Callable[ 

434 [pubsub.UpdateSubscriptionRequest], 

435 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]], 

436 ]: 

437 raise NotImplementedError() 

438 

439 @property 

440 def list_subscriptions( 

441 self, 

442 ) -> Callable[ 

443 [pubsub.ListSubscriptionsRequest], 

444 Union[ 

445 pubsub.ListSubscriptionsResponse, 

446 Awaitable[pubsub.ListSubscriptionsResponse], 

447 ], 

448 ]: 

449 raise NotImplementedError() 

450 

451 @property 

452 def delete_subscription( 

453 self, 

454 ) -> Callable[ 

455 [pubsub.DeleteSubscriptionRequest], 

456 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

457 ]: 

458 raise NotImplementedError() 

459 

460 @property 

461 def modify_ack_deadline( 

462 self, 

463 ) -> Callable[ 

464 [pubsub.ModifyAckDeadlineRequest], 

465 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

466 ]: 

467 raise NotImplementedError() 

468 

469 @property 

470 def acknowledge( 

471 self, 

472 ) -> Callable[ 

473 [pubsub.AcknowledgeRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]] 

474 ]: 

475 raise NotImplementedError() 

476 

477 @property 

478 def pull( 

479 self, 

480 ) -> Callable[ 

481 [pubsub.PullRequest], Union[pubsub.PullResponse, Awaitable[pubsub.PullResponse]] 

482 ]: 

483 raise NotImplementedError() 

484 

485 @property 

486 def streaming_pull( 

487 self, 

488 ) -> Callable[ 

489 [pubsub.StreamingPullRequest], 

490 Union[pubsub.StreamingPullResponse, Awaitable[pubsub.StreamingPullResponse]], 

491 ]: 

492 raise NotImplementedError() 

493 

494 @property 

495 def modify_push_config( 

496 self, 

497 ) -> Callable[ 

498 [pubsub.ModifyPushConfigRequest], 

499 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

500 ]: 

501 raise NotImplementedError() 

502 

503 @property 

504 def get_snapshot( 

505 self, 

506 ) -> Callable[ 

507 [pubsub.GetSnapshotRequest], Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]] 

508 ]: 

509 raise NotImplementedError() 

510 

511 @property 

512 def list_snapshots( 

513 self, 

514 ) -> Callable[ 

515 [pubsub.ListSnapshotsRequest], 

516 Union[pubsub.ListSnapshotsResponse, Awaitable[pubsub.ListSnapshotsResponse]], 

517 ]: 

518 raise NotImplementedError() 

519 

520 @property 

521 def create_snapshot( 

522 self, 

523 ) -> Callable[ 

524 [pubsub.CreateSnapshotRequest], 

525 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]], 

526 ]: 

527 raise NotImplementedError() 

528 

529 @property 

530 def update_snapshot( 

531 self, 

532 ) -> Callable[ 

533 [pubsub.UpdateSnapshotRequest], 

534 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]], 

535 ]: 

536 raise NotImplementedError() 

537 

538 @property 

539 def delete_snapshot( 

540 self, 

541 ) -> Callable[ 

542 [pubsub.DeleteSnapshotRequest], 

543 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

544 ]: 

545 raise NotImplementedError() 

546 

547 @property 

548 def seek( 

549 self, 

550 ) -> Callable[ 

551 [pubsub.SeekRequest], Union[pubsub.SeekResponse, Awaitable[pubsub.SeekResponse]] 

552 ]: 

553 raise NotImplementedError() 

554 

555 @property 

556 def set_iam_policy( 

557 self, 

558 ) -> Callable[ 

559 [iam_policy_pb2.SetIamPolicyRequest], 

560 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

561 ]: 

562 raise NotImplementedError() 

563 

564 @property 

565 def get_iam_policy( 

566 self, 

567 ) -> Callable[ 

568 [iam_policy_pb2.GetIamPolicyRequest], 

569 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

570 ]: 

571 raise NotImplementedError() 

572 

573 @property 

574 def test_iam_permissions( 

575 self, 

576 ) -> Callable[ 

577 [iam_policy_pb2.TestIamPermissionsRequest], 

578 Union[ 

579 iam_policy_pb2.TestIamPermissionsResponse, 

580 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

581 ], 

582 ]: 

583 raise NotImplementedError() 

584 

585 @property 

586 def kind(self) -> str: 

587 raise NotImplementedError() 

588 

589 

590__all__ = ("SubscriberTransport",)