Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/pubsub_v1/services/subscriber/transports/base.py: 62%
100 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
19from google.pubsub_v1 import gapic_version as package_version
21import google.auth # type: ignore
22import google.api_core
23from google.api_core import exceptions as core_exceptions
24from google.api_core import gapic_v1
25from google.api_core import retry as retries
26from google.auth import credentials as ga_credentials # type: ignore
27from google.oauth2 import service_account # type: ignore
29from google.iam.v1 import iam_policy_pb2 # type: ignore
30from google.iam.v1 import policy_pb2 # type: ignore
31from google.protobuf import empty_pb2 # type: ignore
32from google.pubsub_v1.types import pubsub
34DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
35 client_library_version=package_version.__version__
36)
39class SubscriberTransport(abc.ABC):
40 """Abstract transport class for Subscriber."""
42 AUTH_SCOPES = (
43 "https://www.googleapis.com/auth/cloud-platform",
44 "https://www.googleapis.com/auth/pubsub",
45 )
47 DEFAULT_HOST: str = "pubsub.googleapis.com"
49 def __init__(
50 self,
51 *,
52 host: str = DEFAULT_HOST,
53 credentials: Optional[ga_credentials.Credentials] = None,
54 credentials_file: Optional[str] = None,
55 scopes: Optional[Sequence[str]] = None,
56 quota_project_id: Optional[str] = None,
57 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
58 always_use_jwt_access: Optional[bool] = False,
59 api_audience: Optional[str] = None,
60 **kwargs,
61 ) -> None:
62 """Instantiate the transport.
64 Args:
65 host (Optional[str]):
66 The hostname to connect to.
67 credentials (Optional[google.auth.credentials.Credentials]): The
68 authorization credentials to attach to requests. These
69 credentials identify the application to the service; if none
70 are specified, the client will attempt to ascertain the
71 credentials from the environment.
72 credentials_file (Optional[str]): A file with credentials that can
73 be loaded with :func:`google.auth.load_credentials_from_file`.
74 This argument is mutually exclusive with credentials.
75 scopes (Optional[Sequence[str]]): A list of scopes.
76 quota_project_id (Optional[str]): An optional project to use for billing
77 and quota.
78 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
79 The client info used to send a user-agent string along with
80 API requests. If ``None``, then default info will be used.
81 Generally, you only need to set this if you're developing
82 your own client library.
83 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
84 be used for service account credentials.
85 """
87 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
89 # Save the scopes.
90 self._scopes = scopes
92 # If no credentials are provided, then determine the appropriate
93 # defaults.
94 if credentials and credentials_file:
95 raise core_exceptions.DuplicateCredentialArgs(
96 "'credentials_file' and 'credentials' are mutually exclusive"
97 )
99 if credentials_file is not None:
100 credentials, _ = google.auth.load_credentials_from_file(
101 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
102 )
103 elif credentials is None:
104 credentials, _ = google.auth.default(
105 **scopes_kwargs, quota_project_id=quota_project_id
106 )
107 # Don't apply audience if the credentials file passed from user.
108 if hasattr(credentials, "with_gdch_audience"):
109 credentials = credentials.with_gdch_audience(
110 api_audience if api_audience else host
111 )
113 # If the credentials are service account credentials, then always try to use self signed JWT.
114 if (
115 always_use_jwt_access
116 and isinstance(credentials, service_account.Credentials)
117 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
118 ):
119 credentials = credentials.with_always_use_jwt_access(True)
121 # Save the credentials.
122 self._credentials = credentials
124 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
125 if ":" not in host:
126 host += ":443"
127 self._host = host
129 def _prep_wrapped_messages(self, client_info):
130 # Precompute the wrapped methods.
131 self._wrapped_methods = {
132 self.create_subscription: gapic_v1.method.wrap_method(
133 self.create_subscription,
134 default_retry=retries.Retry(
135 initial=0.1,
136 maximum=60.0,
137 multiplier=1.3,
138 predicate=retries.if_exception_type(
139 core_exceptions.Aborted,
140 core_exceptions.ServiceUnavailable,
141 core_exceptions.Unknown,
142 ),
143 deadline=60.0,
144 ),
145 default_timeout=60.0,
146 client_info=client_info,
147 ),
148 self.get_subscription: gapic_v1.method.wrap_method(
149 self.get_subscription,
150 default_retry=retries.Retry(
151 initial=0.1,
152 maximum=60.0,
153 multiplier=1.3,
154 predicate=retries.if_exception_type(
155 core_exceptions.Aborted,
156 core_exceptions.ServiceUnavailable,
157 core_exceptions.Unknown,
158 ),
159 deadline=60.0,
160 ),
161 default_timeout=60.0,
162 client_info=client_info,
163 ),
164 self.update_subscription: gapic_v1.method.wrap_method(
165 self.update_subscription,
166 default_retry=retries.Retry(
167 initial=0.1,
168 maximum=60.0,
169 multiplier=1.3,
170 predicate=retries.if_exception_type(
171 core_exceptions.ServiceUnavailable,
172 ),
173 deadline=60.0,
174 ),
175 default_timeout=60.0,
176 client_info=client_info,
177 ),
178 self.list_subscriptions: gapic_v1.method.wrap_method(
179 self.list_subscriptions,
180 default_retry=retries.Retry(
181 initial=0.1,
182 maximum=60.0,
183 multiplier=1.3,
184 predicate=retries.if_exception_type(
185 core_exceptions.Aborted,
186 core_exceptions.ServiceUnavailable,
187 core_exceptions.Unknown,
188 ),
189 deadline=60.0,
190 ),
191 default_timeout=60.0,
192 client_info=client_info,
193 ),
194 self.delete_subscription: gapic_v1.method.wrap_method(
195 self.delete_subscription,
196 default_retry=retries.Retry(
197 initial=0.1,
198 maximum=60.0,
199 multiplier=1.3,
200 predicate=retries.if_exception_type(
201 core_exceptions.ServiceUnavailable,
202 ),
203 deadline=60.0,
204 ),
205 default_timeout=60.0,
206 client_info=client_info,
207 ),
208 self.modify_ack_deadline: gapic_v1.method.wrap_method(
209 self.modify_ack_deadline,
210 default_retry=retries.Retry(
211 initial=0.1,
212 maximum=60.0,
213 multiplier=1.3,
214 predicate=retries.if_exception_type(
215 core_exceptions.ServiceUnavailable,
216 ),
217 deadline=60.0,
218 ),
219 default_timeout=60.0,
220 client_info=client_info,
221 ),
222 self.acknowledge: gapic_v1.method.wrap_method(
223 self.acknowledge,
224 default_retry=retries.Retry(
225 initial=0.1,
226 maximum=60.0,
227 multiplier=1.3,
228 predicate=retries.if_exception_type(
229 core_exceptions.ServiceUnavailable,
230 ),
231 deadline=60.0,
232 ),
233 default_timeout=60.0,
234 client_info=client_info,
235 ),
236 self.pull: gapic_v1.method.wrap_method(
237 self.pull,
238 default_retry=retries.Retry(
239 initial=0.1,
240 maximum=60.0,
241 multiplier=1.3,
242 predicate=retries.if_exception_type(
243 core_exceptions.Aborted,
244 core_exceptions.InternalServerError,
245 core_exceptions.ServiceUnavailable,
246 core_exceptions.Unknown,
247 ),
248 deadline=60.0,
249 ),
250 default_timeout=60.0,
251 client_info=client_info,
252 ),
253 self.streaming_pull: gapic_v1.method.wrap_method(
254 self.streaming_pull,
255 default_retry=retries.Retry(
256 initial=0.1,
257 maximum=60.0,
258 multiplier=1.3,
259 predicate=retries.if_exception_type(
260 core_exceptions.Aborted,
261 core_exceptions.DeadlineExceeded,
262 core_exceptions.InternalServerError,
263 core_exceptions.ResourceExhausted,
264 core_exceptions.ServiceUnavailable,
265 ),
266 deadline=900.0,
267 ),
268 default_timeout=900.0,
269 client_info=client_info,
270 ),
271 self.modify_push_config: gapic_v1.method.wrap_method(
272 self.modify_push_config,
273 default_retry=retries.Retry(
274 initial=0.1,
275 maximum=60.0,
276 multiplier=1.3,
277 predicate=retries.if_exception_type(
278 core_exceptions.ServiceUnavailable,
279 ),
280 deadline=60.0,
281 ),
282 default_timeout=60.0,
283 client_info=client_info,
284 ),
285 self.get_snapshot: gapic_v1.method.wrap_method(
286 self.get_snapshot,
287 default_retry=retries.Retry(
288 initial=0.1,
289 maximum=60.0,
290 multiplier=1.3,
291 predicate=retries.if_exception_type(
292 core_exceptions.Aborted,
293 core_exceptions.ServiceUnavailable,
294 core_exceptions.Unknown,
295 ),
296 deadline=60.0,
297 ),
298 default_timeout=60.0,
299 client_info=client_info,
300 ),
301 self.list_snapshots: gapic_v1.method.wrap_method(
302 self.list_snapshots,
303 default_retry=retries.Retry(
304 initial=0.1,
305 maximum=60.0,
306 multiplier=1.3,
307 predicate=retries.if_exception_type(
308 core_exceptions.Aborted,
309 core_exceptions.ServiceUnavailable,
310 core_exceptions.Unknown,
311 ),
312 deadline=60.0,
313 ),
314 default_timeout=60.0,
315 client_info=client_info,
316 ),
317 self.create_snapshot: gapic_v1.method.wrap_method(
318 self.create_snapshot,
319 default_retry=retries.Retry(
320 initial=0.1,
321 maximum=60.0,
322 multiplier=1.3,
323 predicate=retries.if_exception_type(
324 core_exceptions.ServiceUnavailable,
325 ),
326 deadline=60.0,
327 ),
328 default_timeout=60.0,
329 client_info=client_info,
330 ),
331 self.update_snapshot: gapic_v1.method.wrap_method(
332 self.update_snapshot,
333 default_retry=retries.Retry(
334 initial=0.1,
335 maximum=60.0,
336 multiplier=1.3,
337 predicate=retries.if_exception_type(
338 core_exceptions.ServiceUnavailable,
339 ),
340 deadline=60.0,
341 ),
342 default_timeout=60.0,
343 client_info=client_info,
344 ),
345 self.delete_snapshot: gapic_v1.method.wrap_method(
346 self.delete_snapshot,
347 default_retry=retries.Retry(
348 initial=0.1,
349 maximum=60.0,
350 multiplier=1.3,
351 predicate=retries.if_exception_type(
352 core_exceptions.ServiceUnavailable,
353 ),
354 deadline=60.0,
355 ),
356 default_timeout=60.0,
357 client_info=client_info,
358 ),
359 self.seek: gapic_v1.method.wrap_method(
360 self.seek,
361 default_retry=retries.Retry(
362 initial=0.1,
363 maximum=60.0,
364 multiplier=1.3,
365 predicate=retries.if_exception_type(
366 core_exceptions.Aborted,
367 core_exceptions.ServiceUnavailable,
368 core_exceptions.Unknown,
369 ),
370 deadline=60.0,
371 ),
372 default_timeout=60.0,
373 client_info=client_info,
374 ),
375 }
377 def close(self):
378 """Closes resources associated with the transport.
380 .. warning::
381 Only call this method if the transport is NOT shared
382 with other clients - this may cause errors in other clients!
383 """
384 raise NotImplementedError()
386 @property
387 def create_subscription(
388 self,
389 ) -> Callable[
390 [pubsub.Subscription],
391 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]],
392 ]:
393 raise NotImplementedError()
395 @property
396 def get_subscription(
397 self,
398 ) -> Callable[
399 [pubsub.GetSubscriptionRequest],
400 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]],
401 ]:
402 raise NotImplementedError()
404 @property
405 def update_subscription(
406 self,
407 ) -> Callable[
408 [pubsub.UpdateSubscriptionRequest],
409 Union[pubsub.Subscription, Awaitable[pubsub.Subscription]],
410 ]:
411 raise NotImplementedError()
413 @property
414 def list_subscriptions(
415 self,
416 ) -> Callable[
417 [pubsub.ListSubscriptionsRequest],
418 Union[
419 pubsub.ListSubscriptionsResponse,
420 Awaitable[pubsub.ListSubscriptionsResponse],
421 ],
422 ]:
423 raise NotImplementedError()
425 @property
426 def delete_subscription(
427 self,
428 ) -> Callable[
429 [pubsub.DeleteSubscriptionRequest],
430 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
431 ]:
432 raise NotImplementedError()
434 @property
435 def modify_ack_deadline(
436 self,
437 ) -> Callable[
438 [pubsub.ModifyAckDeadlineRequest],
439 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
440 ]:
441 raise NotImplementedError()
443 @property
444 def acknowledge(
445 self,
446 ) -> Callable[
447 [pubsub.AcknowledgeRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]]
448 ]:
449 raise NotImplementedError()
451 @property
452 def pull(
453 self,
454 ) -> Callable[
455 [pubsub.PullRequest], Union[pubsub.PullResponse, Awaitable[pubsub.PullResponse]]
456 ]:
457 raise NotImplementedError()
459 @property
460 def streaming_pull(
461 self,
462 ) -> Callable[
463 [pubsub.StreamingPullRequest],
464 Union[pubsub.StreamingPullResponse, Awaitable[pubsub.StreamingPullResponse]],
465 ]:
466 raise NotImplementedError()
468 @property
469 def modify_push_config(
470 self,
471 ) -> Callable[
472 [pubsub.ModifyPushConfigRequest],
473 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
474 ]:
475 raise NotImplementedError()
477 @property
478 def get_snapshot(
479 self,
480 ) -> Callable[
481 [pubsub.GetSnapshotRequest], Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]]
482 ]:
483 raise NotImplementedError()
485 @property
486 def list_snapshots(
487 self,
488 ) -> Callable[
489 [pubsub.ListSnapshotsRequest],
490 Union[pubsub.ListSnapshotsResponse, Awaitable[pubsub.ListSnapshotsResponse]],
491 ]:
492 raise NotImplementedError()
494 @property
495 def create_snapshot(
496 self,
497 ) -> Callable[
498 [pubsub.CreateSnapshotRequest],
499 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]],
500 ]:
501 raise NotImplementedError()
503 @property
504 def update_snapshot(
505 self,
506 ) -> Callable[
507 [pubsub.UpdateSnapshotRequest],
508 Union[pubsub.Snapshot, Awaitable[pubsub.Snapshot]],
509 ]:
510 raise NotImplementedError()
512 @property
513 def delete_snapshot(
514 self,
515 ) -> Callable[
516 [pubsub.DeleteSnapshotRequest],
517 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
518 ]:
519 raise NotImplementedError()
521 @property
522 def seek(
523 self,
524 ) -> Callable[
525 [pubsub.SeekRequest], Union[pubsub.SeekResponse, Awaitable[pubsub.SeekResponse]]
526 ]:
527 raise NotImplementedError()
529 @property
530 def set_iam_policy(
531 self,
532 ) -> Callable[
533 [iam_policy_pb2.SetIamPolicyRequest],
534 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]],
535 ]:
536 raise NotImplementedError()
538 @property
539 def get_iam_policy(
540 self,
541 ) -> Callable[
542 [iam_policy_pb2.GetIamPolicyRequest],
543 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]],
544 ]:
545 raise NotImplementedError()
547 @property
548 def test_iam_permissions(
549 self,
550 ) -> Callable[
551 [iam_policy_pb2.TestIamPermissionsRequest],
552 Union[
553 iam_policy_pb2.TestIamPermissionsResponse,
554 Awaitable[iam_policy_pb2.TestIamPermissionsResponse],
555 ],
556 ]:
557 raise NotImplementedError()
559 @property
560 def kind(self) -> str:
561 raise NotImplementedError()
564__all__ = ("SubscriberTransport",)