1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19from google.pubsub_v1 import gapic_version as package_version
20
21import google.auth # type: ignore
22import google.api_core
23from google.api_core import exceptions as core_exceptions
24from google.api_core import gapic_v1
25from google.api_core import retry as retries
26from google.auth import credentials as ga_credentials # type: ignore
27from google.oauth2 import service_account # type: ignore
28import google.protobuf
29
30from google.iam.v1 import iam_policy_pb2 # type: ignore
31from google.iam.v1 import policy_pb2 # type: ignore
32from google.protobuf import empty_pb2 # type: ignore
33from google.pubsub_v1.types import pubsub
34
35DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
36 client_library_version=package_version.__version__
37)
38
39if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
40 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
41
42
43class SubscriberTransport(abc.ABC):
44 """Abstract transport class for Subscriber."""
45
46 AUTH_SCOPES = (
47 "https://www.googleapis.com/auth/cloud-platform",
48 "https://www.googleapis.com/auth/pubsub",
49 )
50
51 DEFAULT_HOST: str = "pubsub.googleapis.com"
52
53 def __init__(
54 self,
55 *,
56 host: str = DEFAULT_HOST,
57 credentials: Optional[ga_credentials.Credentials] = None,
58 credentials_file: Optional[str] = None,
59 scopes: Optional[Sequence[str]] = None,
60 quota_project_id: Optional[str] = None,
61 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
62 always_use_jwt_access: Optional[bool] = False,
63 api_audience: Optional[str] = None,
64 **kwargs,
65 ) -> None:
66 """Instantiate the transport.
67
68 Args:
69 host (Optional[str]):
70 The hostname to connect to (default: 'pubsub.googleapis.com').
71 credentials (Optional[google.auth.credentials.Credentials]): The
72 authorization credentials to attach to requests. These
73 credentials identify the application to the service; if none
74 are specified, the client will attempt to ascertain the
75 credentials from the environment.
76 credentials_file (Optional[str]): Deprecated. A file with credentials that can
77 be loaded with :func:`google.auth.load_credentials_from_file`.
78 This argument is mutually exclusive with credentials. This argument will be
79 removed in the next major version of this library.
80 scopes (Optional[Sequence[str]]): A list of scopes.
81 quota_project_id (Optional[str]): An optional project to use for billing
82 and quota.
83 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
84 The client info used to send a user-agent string along with
85 API requests. If ``None``, then default info will be used.
86 Generally, you only need to set this if you're developing
87 your own client library.
88 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
89 be used for service account credentials.
90 """
91
92 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
93
94 # Save the scopes.
95 self._scopes = scopes
96 if not hasattr(self, "_ignore_credentials"):
97 self._ignore_credentials: bool = False
98
99 # If no credentials are provided, then determine the appropriate
100 # defaults.
101 if credentials and credentials_file:
102 raise core_exceptions.DuplicateCredentialArgs(
103 "'credentials_file' and 'credentials' are mutually exclusive"
104 )
105
106 if credentials_file is not None:
107 credentials, _ = google.auth.load_credentials_from_file(
108 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
109 )
110 elif credentials is None and not self._ignore_credentials:
111 credentials, _ = google.auth.default(
112 **scopes_kwargs, quota_project_id=quota_project_id
113 )
114 # Don't apply audience if the credentials file passed from user.
115 if hasattr(credentials, "with_gdch_audience"):
116 credentials = credentials.with_gdch_audience(
117 api_audience if api_audience else host
118 )
119
120 # If the credentials are service account credentials, then always try to use self signed JWT.
121 if (
122 always_use_jwt_access
123 and isinstance(credentials, service_account.Credentials)
124 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
125 ):
126 credentials = credentials.with_always_use_jwt_access(True)
127
128 # Save the credentials.
129 self._credentials = credentials
130
131 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
132 if ":" not in host:
133 host += ":443"
134 self._host = host
135
136 @property
137 def host(self):
138 return self._host
139
140 def _prep_wrapped_messages(self, client_info):
141 # Precompute the wrapped methods.
142 self._wrapped_methods = {
143 self.create_subscription: gapic_v1.method.wrap_method(
144 self.create_subscription,
145 default_retry=retries.Retry(
146 initial=0.1,
147 maximum=60.0,
148 multiplier=1.3,
149 predicate=retries.if_exception_type(
150 core_exceptions.Aborted,
151 core_exceptions.ServiceUnavailable,
152 core_exceptions.Unknown,
153 ),
154 deadline=60.0,
155 ),
156 default_timeout=60.0,
157 client_info=client_info,
158 ),
159 self.get_subscription: gapic_v1.method.wrap_method(
160 self.get_subscription,
161 default_retry=retries.Retry(
162 initial=0.1,
163 maximum=60.0,
164 multiplier=1.3,
165 predicate=retries.if_exception_type(
166 core_exceptions.Aborted,
167 core_exceptions.ServiceUnavailable,
168 core_exceptions.Unknown,
169 ),
170 deadline=60.0,
171 ),
172 default_timeout=60.0,
173 client_info=client_info,
174 ),
175 self.update_subscription: gapic_v1.method.wrap_method(
176 self.update_subscription,
177 default_retry=retries.Retry(
178 initial=0.1,
179 maximum=60.0,
180 multiplier=1.3,
181 predicate=retries.if_exception_type(
182 core_exceptions.ServiceUnavailable,
183 ),
184 deadline=60.0,
185 ),
186 default_timeout=60.0,
187 client_info=client_info,
188 ),
189 self.list_subscriptions: gapic_v1.method.wrap_method(
190 self.list_subscriptions,
191 default_retry=retries.Retry(
192 initial=0.1,
193 maximum=60.0,
194 multiplier=1.3,
195 predicate=retries.if_exception_type(
196 core_exceptions.Aborted,
197 core_exceptions.ServiceUnavailable,
198 core_exceptions.Unknown,
199 ),
200 deadline=60.0,
201 ),
202 default_timeout=60.0,
203 client_info=client_info,
204 ),
205 self.delete_subscription: gapic_v1.method.wrap_method(
206 self.delete_subscription,
207 default_retry=retries.Retry(
208 initial=0.1,
209 maximum=60.0,
210 multiplier=1.3,
211 predicate=retries.if_exception_type(
212 core_exceptions.ServiceUnavailable,
213 ),
214 deadline=60.0,
215 ),
216 default_timeout=60.0,
217 client_info=client_info,
218 ),
219 self.modify_ack_deadline: gapic_v1.method.wrap_method(
220 self.modify_ack_deadline,
221 default_retry=retries.Retry(
222 initial=0.1,
223 maximum=60.0,
224 multiplier=1.3,
225 predicate=retries.if_exception_type(
226 core_exceptions.ServiceUnavailable,
227 ),
228 deadline=60.0,
229 ),
230 default_timeout=60.0,
231 client_info=client_info,
232 ),
233 self.acknowledge: gapic_v1.method.wrap_method(
234 self.acknowledge,
235 default_retry=retries.Retry(
236 initial=0.1,
237 maximum=60.0,
238 multiplier=1.3,
239 predicate=retries.if_exception_type(
240 core_exceptions.ServiceUnavailable,
241 ),
242 deadline=60.0,
243 ),
244 default_timeout=60.0,
245 client_info=client_info,
246 ),
247 self.pull: gapic_v1.method.wrap_method(
248 self.pull,
249 default_retry=retries.Retry(
250 initial=0.1,
251 maximum=60.0,
252 multiplier=1.3,
253 predicate=retries.if_exception_type(
254 core_exceptions.Aborted,
255 core_exceptions.InternalServerError,
256 core_exceptions.ServiceUnavailable,
257 core_exceptions.Unknown,
258 ),
259 deadline=60.0,
260 ),
261 default_timeout=60.0,
262 client_info=client_info,
263 ),
264 self.streaming_pull: gapic_v1.method.wrap_method(
265 self.streaming_pull,
266 default_retry=retries.Retry(
267 initial=0.1,
268 maximum=60.0,
269 multiplier=4,
270 predicate=retries.if_exception_type(
271 core_exceptions.Aborted,
272 core_exceptions.DeadlineExceeded,
273 core_exceptions.InternalServerError,
274 core_exceptions.ResourceExhausted,
275 core_exceptions.ServiceUnavailable,
276 ),
277 deadline=900.0,
278 ),
279 default_timeout=900.0,
280 client_info=client_info,
281 ),
282 self.modify_push_config: gapic_v1.method.wrap_method(
283 self.modify_push_config,
284 default_retry=retries.Retry(
285 initial=0.1,
286 maximum=60.0,
287 multiplier=1.3,
288 predicate=retries.if_exception_type(
289 core_exceptions.ServiceUnavailable,
290 ),
291 deadline=60.0,
292 ),
293 default_timeout=60.0,
294 client_info=client_info,
295 ),
296 self.get_snapshot: gapic_v1.method.wrap_method(
297 self.get_snapshot,
298 default_retry=retries.Retry(
299 initial=0.1,
300 maximum=60.0,
301 multiplier=1.3,
302 predicate=retries.if_exception_type(
303 core_exceptions.Aborted,
304 core_exceptions.ServiceUnavailable,
305 core_exceptions.Unknown,
306 ),
307 deadline=60.0,
308 ),
309 default_timeout=60.0,
310 client_info=client_info,
311 ),
312 self.list_snapshots: gapic_v1.method.wrap_method(
313 self.list_snapshots,
314 default_retry=retries.Retry(
315 initial=0.1,
316 maximum=60.0,
317 multiplier=1.3,
318 predicate=retries.if_exception_type(
319 core_exceptions.Aborted,
320 core_exceptions.ServiceUnavailable,
321 core_exceptions.Unknown,
322 ),
323 deadline=60.0,
324 ),
325 default_timeout=60.0,
326 client_info=client_info,
327 ),
328 self.create_snapshot: gapic_v1.method.wrap_method(
329 self.create_snapshot,
330 default_retry=retries.Retry(
331 initial=0.1,
332 maximum=60.0,
333 multiplier=1.3,
334 predicate=retries.if_exception_type(
335 core_exceptions.ServiceUnavailable,
336 ),
337 deadline=60.0,
338 ),
339 default_timeout=60.0,
340 client_info=client_info,
341 ),
342 self.update_snapshot: gapic_v1.method.wrap_method(
343 self.update_snapshot,
344 default_retry=retries.Retry(
345 initial=0.1,
346 maximum=60.0,
347 multiplier=1.3,
348 predicate=retries.if_exception_type(
349 core_exceptions.ServiceUnavailable,
350 ),
351 deadline=60.0,
352 ),
353 default_timeout=60.0,
354 client_info=client_info,
355 ),
356 self.delete_snapshot: gapic_v1.method.wrap_method(
357 self.delete_snapshot,
358 default_retry=retries.Retry(
359 initial=0.1,
360 maximum=60.0,
361 multiplier=1.3,
362 predicate=retries.if_exception_type(
363 core_exceptions.ServiceUnavailable,
364 ),
365 deadline=60.0,
366 ),
367 default_timeout=60.0,
368 client_info=client_info,
369 ),
370 self.seek: gapic_v1.method.wrap_method(
371 self.seek,
372 default_retry=retries.Retry(
373 initial=0.1,
374 maximum=60.0,
375 multiplier=1.3,
376 predicate=retries.if_exception_type(
377 core_exceptions.Aborted,
378 core_exceptions.ServiceUnavailable,
379 core_exceptions.Unknown,
380 ),
381 deadline=60.0,
382 ),
383 default_timeout=60.0,
384 client_info=client_info,
385 ),
386 self.get_iam_policy: gapic_v1.method.wrap_method(
387 self.get_iam_policy,
388 default_timeout=None,
389 client_info=client_info,
390 ),
391 self.set_iam_policy: gapic_v1.method.wrap_method(
392 self.set_iam_policy,
393 default_timeout=None,
394 client_info=client_info,
395 ),
396 self.test_iam_permissions: gapic_v1.method.wrap_method(
397 self.test_iam_permissions,
398 default_timeout=None,
399 client_info=client_info,
400 ),
401 }
402
403 def close(self):
404 """Closes resources associated with the transport.
405
406 .. warning::
407 Only call this method if the transport is NOT shared
408 with other clients - this may cause errors in other clients!
409 """
410 raise NotImplementedError()
411
412 @property
413 def create_subscription(
414 self,
415 ) -> Callable[
416 [pubsub.Subscription],
417 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]],
418 ]:
419 raise NotImplementedError()
420
421 @property
422 def get_subscription(
423 self,
424 ) -> Callable[
425 [pubsub.GetSubscriptionRequest],
426 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]],
427 ]:
428 raise NotImplementedError()
429
430 @property
431 def update_subscription(
432 self,
433 ) -> Callable[
434 [pubsub.UpdateSubscriptionRequest],
435 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]],
436 ]:
437 raise NotImplementedError()
438
439 @property
440 def list_subscriptions(
441 self,
442 ) -> Callable[
443 [pubsub.ListSubscriptionsRequest],
444 Union[
445 pubsub.ListSubscriptionsResponse,
446 Awaitable[pubsub.ListSubscriptionsResponse],
447 ],
448 ]:
449 raise NotImplementedError()
450
451 @property
452 def delete_subscription(
453 self,
454 ) -> Callable[
455 [pubsub.DeleteSubscriptionRequest],
456 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
457 ]:
458 raise NotImplementedError()
459
460 @property
461 def modify_ack_deadline(
462 self,
463 ) -> Callable[
464 [pubsub.ModifyAckDeadlineRequest],
465 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
466 ]:
467 raise NotImplementedError()
468
469 @property
470 def acknowledge(
471 self,
472 ) -> Callable[
473 [pubsub.AcknowledgeRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]]
474 ]:
475 raise NotImplementedError()
476
477 @property
478 def pull(
479 self,
480 ) -> Callable[
481 [pubsub.PullRequest], Union[pubsub.PullResponse, Awaitable[pubsub.PullResponse]]
482 ]:
483 raise NotImplementedError()
484
485 @property
486 def streaming_pull(
487 self,
488 ) -> Callable[
489 [pubsub.StreamingPullRequest],
490 Union[pubsub.StreamingPullResponse, Awaitable[pubsub.StreamingPullResponse]],
491 ]:
492 raise NotImplementedError()
493
494 @property
495 def modify_push_config(
496 self,
497 ) -> Callable[
498 [pubsub.ModifyPushConfigRequest],
499 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
500 ]:
501 raise NotImplementedError()
502
503 @property
504 def get_snapshot(
505 self,
506 ) -> Callable[
507 [pubsub.GetSnapshotRequest], Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]]
508 ]:
509 raise NotImplementedError()
510
511 @property
512 def list_snapshots(
513 self,
514 ) -> Callable[
515 [pubsub.ListSnapshotsRequest],
516 Union[pubsub.ListSnapshotsResponse, Awaitable[pubsub.ListSnapshotsResponse]],
517 ]:
518 raise NotImplementedError()
519
520 @property
521 def create_snapshot(
522 self,
523 ) -> Callable[
524 [pubsub.CreateSnapshotRequest],
525 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]],
526 ]:
527 raise NotImplementedError()
528
529 @property
530 def update_snapshot(
531 self,
532 ) -> Callable[
533 [pubsub.UpdateSnapshotRequest],
534 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]],
535 ]:
536 raise NotImplementedError()
537
538 @property
539 def delete_snapshot(
540 self,
541 ) -> Callable[
542 [pubsub.DeleteSnapshotRequest],
543 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
544 ]:
545 raise NotImplementedError()
546
547 @property
548 def seek(
549 self,
550 ) -> Callable[
551 [pubsub.SeekRequest], Union[pubsub.SeekResponse, Awaitable[pubsub.SeekResponse]]
552 ]:
553 raise NotImplementedError()
554
555 @property
556 def set_iam_policy(
557 self,
558 ) -> Callable[
559 [iam_policy_pb2.SetIamPolicyRequest],
560 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]],
561 ]:
562 raise NotImplementedError()
563
564 @property
565 def get_iam_policy(
566 self,
567 ) -> Callable[
568 [iam_policy_pb2.GetIamPolicyRequest],
569 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]],
570 ]:
571 raise NotImplementedError()
572
573 @property
574 def test_iam_permissions(
575 self,
576 ) -> Callable[
577 [iam_policy_pb2.TestIamPermissionsRequest],
578 Union[
579 iam_policy_pb2.TestIamPermissionsResponse,
580 Awaitable[iam_policy_pb2.TestIamPermissionsResponse],
581 ],
582 ]:
583 raise NotImplementedError()
584
585 @property
586 def kind(self) -> str:
587 raise NotImplementedError()
588
589
590__all__ = ("SubscriberTransport",)