1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import inspect
17import json
18import pickle
19import logging as std_logging
20import warnings
21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
22
23from google.api_core import gapic_v1
24from google.api_core import grpc_helpers_async
25from google.api_core import exceptions as core_exceptions
26from google.api_core import retry_async as retries
27from google.auth import credentials as ga_credentials # type: ignore
28from google.auth.transport.grpc import SslCredentials # type: ignore
29from google.protobuf.json_format import MessageToJson
30import google.protobuf.message
31
32import grpc # type: ignore
33import proto # type: ignore
34from grpc.experimental import aio # type: ignore
35
36from google.iam.v1 import iam_policy_pb2 # type: ignore
37from google.iam.v1 import policy_pb2 # type: ignore
38from google.protobuf import empty_pb2 # type: ignore
39from google.pubsub_v1.types import pubsub
40from .base import SubscriberTransport, DEFAULT_CLIENT_INFO
41from .grpc import SubscriberGrpcTransport
42
43try:
44 from google.api_core import client_logging # type: ignore
45
46 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
47except ImportError: # pragma: NO COVER
48 CLIENT_LOGGING_SUPPORTED = False
49
50_LOGGER = std_logging.getLogger(__name__)
51
52
53class _LoggingClientAIOInterceptor(
54 grpc.aio.UnaryUnaryClientInterceptor
55): # pragma: NO COVER
56 async def intercept_unary_unary(self, continuation, client_call_details, request):
57 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
58 std_logging.DEBUG
59 )
60 if logging_enabled: # pragma: NO COVER
61 request_metadata = client_call_details.metadata
62 if isinstance(request, proto.Message):
63 request_payload = type(request).to_json(request)
64 elif isinstance(request, google.protobuf.message.Message):
65 request_payload = MessageToJson(request)
66 else:
67 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
68
69 request_metadata = {
70 key: value.decode("utf-8") if isinstance(value, bytes) else value
71 for key, value in request_metadata
72 }
73 grpc_request = {
74 "payload": request_payload,
75 "requestMethod": "grpc",
76 "metadata": dict(request_metadata),
77 }
78 _LOGGER.debug(
79 f"Sending request for {client_call_details.method}",
80 extra={
81 "serviceName": "google.pubsub.v1.Subscriber",
82 "rpcName": str(client_call_details.method),
83 "request": grpc_request,
84 "metadata": grpc_request["metadata"],
85 },
86 )
87 response = await continuation(client_call_details, request)
88 if logging_enabled: # pragma: NO COVER
89 response_metadata = await response.trailing_metadata()
90 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
91 metadata = (
92 dict([(k, str(v)) for k, v in response_metadata])
93 if response_metadata
94 else None
95 )
96 result = await response
97 if isinstance(result, proto.Message):
98 response_payload = type(result).to_json(result)
99 elif isinstance(result, google.protobuf.message.Message):
100 response_payload = MessageToJson(result)
101 else:
102 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
103 grpc_response = {
104 "payload": response_payload,
105 "metadata": metadata,
106 "status": "OK",
107 }
108 _LOGGER.debug(
109 f"Received response to rpc {client_call_details.method}.",
110 extra={
111 "serviceName": "google.pubsub.v1.Subscriber",
112 "rpcName": str(client_call_details.method),
113 "response": grpc_response,
114 "metadata": grpc_response["metadata"],
115 },
116 )
117 return response
118
119
120class SubscriberGrpcAsyncIOTransport(SubscriberTransport):
121 """gRPC AsyncIO backend transport for Subscriber.
122
123 The service that an application uses to manipulate subscriptions and
124 to consume messages from a subscription via the ``Pull`` method or
125 by establishing a bi-directional stream using the ``StreamingPull``
126 method.
127
128 This class defines the same methods as the primary client, so the
129 primary client can load the underlying transport implementation
130 and call it.
131
132 It sends protocol buffers over the wire using gRPC (which is built on
133 top of HTTP/2); the ``grpcio`` package must be installed.
134 """
135
136 _grpc_channel: aio.Channel
137 _stubs: Dict[str, Callable] = {}
138
139 @classmethod
140 def create_channel(
141 cls,
142 host: str = "pubsub.googleapis.com",
143 credentials: Optional[ga_credentials.Credentials] = None,
144 credentials_file: Optional[str] = None,
145 scopes: Optional[Sequence[str]] = None,
146 quota_project_id: Optional[str] = None,
147 **kwargs,
148 ) -> aio.Channel:
149 """Create and return a gRPC AsyncIO channel object.
150 Args:
151 host (Optional[str]): The host for the channel to use.
152 credentials (Optional[~.Credentials]): The
153 authorization credentials to attach to requests. These
154 credentials identify this application to the service. If
155 none are specified, the client will attempt to ascertain
156 the credentials from the environment.
157 credentials_file (Optional[str]): Deprecated. A file with credentials that can
158 be loaded with :func:`google.auth.load_credentials_from_file`. This argument will be
159 removed in the next major version of this library.
160 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
161 service. These are only used when credentials are not specified and
162 are passed to :func:`google.auth.default`.
163 quota_project_id (Optional[str]): An optional project to use for billing
164 and quota.
165 kwargs (Optional[dict]): Keyword arguments, which are passed to the
166 channel creation.
167 Returns:
168 aio.Channel: A gRPC AsyncIO channel object.
169 """
170
171 return grpc_helpers_async.create_channel(
172 host,
173 credentials=credentials,
174 credentials_file=credentials_file,
175 quota_project_id=quota_project_id,
176 default_scopes=cls.AUTH_SCOPES,
177 scopes=scopes,
178 default_host=cls.DEFAULT_HOST,
179 **kwargs,
180 )
181
182 def __init__(
183 self,
184 *,
185 host: str = "pubsub.googleapis.com",
186 credentials: Optional[ga_credentials.Credentials] = None,
187 credentials_file: Optional[str] = None,
188 scopes: Optional[Sequence[str]] = None,
189 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None,
190 api_mtls_endpoint: Optional[str] = None,
191 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
192 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
193 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
194 quota_project_id: Optional[str] = None,
195 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
196 always_use_jwt_access: Optional[bool] = False,
197 api_audience: Optional[str] = None,
198 ) -> None:
199 """Instantiate the transport.
200
201 Args:
202 host (Optional[str]):
203 The hostname to connect to (default: 'pubsub.googleapis.com').
204 credentials (Optional[google.auth.credentials.Credentials]): The
205 authorization credentials to attach to requests. These
206 credentials identify the application to the service; if none
207 are specified, the client will attempt to ascertain the
208 credentials from the environment.
209 This argument is ignored if a ``channel`` instance is provided.
210 credentials_file (Optional[str]): Deprecated. A file with credentials that can
211 be loaded with :func:`google.auth.load_credentials_from_file`.
212 This argument is ignored if a ``channel`` instance is provided.
213 This argument will be removed in the next major version of this library.
214 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
215 service. These are only used when credentials are not specified and
216 are passed to :func:`google.auth.default`.
217 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]):
218 A ``Channel`` instance through which to make calls, or a Callable
219 that constructs and returns one. If set to None, ``self.create_channel``
220 is used to create the channel. If a Callable is given, it will be called
221 with the same arguments as used in ``self.create_channel``.
222 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
223 If provided, it overrides the ``host`` argument and tries to create
224 a mutual TLS channel with client SSL credentials from
225 ``client_cert_source`` or application default SSL credentials.
226 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
227 Deprecated. A callback to provide client SSL certificate bytes and
228 private key bytes, both in PEM format. It is ignored if
229 ``api_mtls_endpoint`` is None.
230 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
231 for the grpc channel. It is ignored if a ``channel`` instance is provided.
232 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
233 A callback to provide client certificate bytes and private key bytes,
234 both in PEM format. It is used to configure a mutual TLS channel. It is
235 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
236 quota_project_id (Optional[str]): An optional project to use for billing
237 and quota.
238 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
239 The client info used to send a user-agent string along with
240 API requests. If ``None``, then default info will be used.
241 Generally, you only need to set this if you're developing
242 your own client library.
243 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
244 be used for service account credentials.
245
246 Raises:
247 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
248 creation failed for any reason.
249 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
250 and ``credentials_file`` are passed.
251 """
252 self._grpc_channel = None
253 self._ssl_channel_credentials = ssl_channel_credentials
254 self._stubs: Dict[str, Callable] = {}
255
256 if api_mtls_endpoint:
257 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
258 if client_cert_source:
259 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
260
261 if isinstance(channel, aio.Channel):
262 # Ignore credentials if a channel was passed.
263 credentials = None
264 self._ignore_credentials = True
265 # If a channel was explicitly provided, set it.
266 self._grpc_channel = channel
267 self._ssl_channel_credentials = None
268 else:
269 if api_mtls_endpoint:
270 host = api_mtls_endpoint
271
272 # Create SSL credentials with client_cert_source or application
273 # default SSL credentials.
274 if client_cert_source:
275 cert, key = client_cert_source()
276 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
277 certificate_chain=cert, private_key=key
278 )
279 else:
280 self._ssl_channel_credentials = SslCredentials().ssl_credentials
281
282 else:
283 if client_cert_source_for_mtls and not ssl_channel_credentials:
284 cert, key = client_cert_source_for_mtls()
285 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
286 certificate_chain=cert, private_key=key
287 )
288
289 # The base transport sets the host, credentials and scopes
290 super().__init__(
291 host=host,
292 credentials=credentials,
293 credentials_file=credentials_file,
294 scopes=scopes,
295 quota_project_id=quota_project_id,
296 client_info=client_info,
297 always_use_jwt_access=always_use_jwt_access,
298 api_audience=api_audience,
299 )
300
301 if not self._grpc_channel:
302 # initialize with the provided callable or the default channel
303 channel_init = channel or type(self).create_channel
304 self._grpc_channel = channel_init(
305 self._host,
306 # use the credentials which are saved
307 credentials=self._credentials,
308 # Set ``credentials_file`` to ``None`` here as
309 # the credentials that we saved earlier should be used.
310 credentials_file=None,
311 scopes=self._scopes,
312 ssl_credentials=self._ssl_channel_credentials,
313 quota_project_id=quota_project_id,
314 options=[
315 ("grpc.max_send_message_length", -1),
316 ("grpc.max_receive_message_length", -1),
317 ("grpc.max_metadata_size", 4 * 1024 * 1024),
318 ("grpc.keepalive_time_ms", 30000),
319 ],
320 )
321
322 self._interceptor = _LoggingClientAIOInterceptor()
323 self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
324 self._logged_channel = self._grpc_channel
325 self._wrap_with_kind = (
326 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
327 )
328 # Wrap messages. This must be done after self._logged_channel exists
329 self._prep_wrapped_messages(client_info)
330
331 @property
332 def grpc_channel(self) -> aio.Channel:
333 """Create the channel designed to connect to this service.
334
335 This property caches on the instance; repeated calls return
336 the same channel.
337 """
338 # Return the channel from cache.
339 return self._grpc_channel
340
341 @property
342 def create_subscription(
343 self,
344 ) -> Callable[[pubsub.Subscription], Awaitable[pubsub.Subscription]]:
345 r"""Return a callable for the create subscription method over gRPC.
346
347 Creates a subscription to a given topic. See the [resource name
348 rules]
349 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
350 If the subscription already exists, returns ``ALREADY_EXISTS``.
351 If the corresponding topic doesn't exist, returns ``NOT_FOUND``.
352
353 If the name is not provided in the request, the server will
354 assign a random name for this subscription on the same project
355 as the topic, conforming to the [resource name format]
356 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
357 The generated name is populated in the returned Subscription
358 object. Note that for REST API requests, you must specify a name
359 in the request.
360
361 Returns:
362 Callable[[~.Subscription],
363 Awaitable[~.Subscription]]:
364 A function that, when called, will call the underlying RPC
365 on the server.
366 """
367 # Generate a "stub function" on-the-fly which will actually make
368 # the request.
369 # gRPC handles serialization and deserialization, so we just need
370 # to pass in the functions for each.
371 if "create_subscription" not in self._stubs:
372 self._stubs["create_subscription"] = self._logged_channel.unary_unary(
373 "/google.pubsub.v1.Subscriber/CreateSubscription",
374 request_serializer=pubsub.Subscription.serialize,
375 response_deserializer=pubsub.Subscription.deserialize,
376 )
377 return self._stubs["create_subscription"]
378
379 @property
380 def get_subscription(
381 self,
382 ) -> Callable[[pubsub.GetSubscriptionRequest], Awaitable[pubsub.Subscription]]:
383 r"""Return a callable for the get subscription method over gRPC.
384
385 Gets the configuration details of a subscription.
386
387 Returns:
388 Callable[[~.GetSubscriptionRequest],
389 Awaitable[~.Subscription]]:
390 A function that, when called, will call the underlying RPC
391 on the server.
392 """
393 # Generate a "stub function" on-the-fly which will actually make
394 # the request.
395 # gRPC handles serialization and deserialization, so we just need
396 # to pass in the functions for each.
397 if "get_subscription" not in self._stubs:
398 self._stubs["get_subscription"] = self._logged_channel.unary_unary(
399 "/google.pubsub.v1.Subscriber/GetSubscription",
400 request_serializer=pubsub.GetSubscriptionRequest.serialize,
401 response_deserializer=pubsub.Subscription.deserialize,
402 )
403 return self._stubs["get_subscription"]
404
405 @property
406 def update_subscription(
407 self,
408 ) -> Callable[[pubsub.UpdateSubscriptionRequest], Awaitable[pubsub.Subscription]]:
409 r"""Return a callable for the update subscription method over gRPC.
410
411 Updates an existing subscription by updating the
412 fields specified in the update mask. Note that certain
413 properties of a subscription, such as its topic, are not
414 modifiable.
415
416 Returns:
417 Callable[[~.UpdateSubscriptionRequest],
418 Awaitable[~.Subscription]]:
419 A function that, when called, will call the underlying RPC
420 on the server.
421 """
422 # Generate a "stub function" on-the-fly which will actually make
423 # the request.
424 # gRPC handles serialization and deserialization, so we just need
425 # to pass in the functions for each.
426 if "update_subscription" not in self._stubs:
427 self._stubs["update_subscription"] = self._logged_channel.unary_unary(
428 "/google.pubsub.v1.Subscriber/UpdateSubscription",
429 request_serializer=pubsub.UpdateSubscriptionRequest.serialize,
430 response_deserializer=pubsub.Subscription.deserialize,
431 )
432 return self._stubs["update_subscription"]
433
434 @property
435 def list_subscriptions(
436 self,
437 ) -> Callable[
438 [pubsub.ListSubscriptionsRequest], Awaitable[pubsub.ListSubscriptionsResponse]
439 ]:
440 r"""Return a callable for the list subscriptions method over gRPC.
441
442 Lists matching subscriptions.
443
444 Returns:
445 Callable[[~.ListSubscriptionsRequest],
446 Awaitable[~.ListSubscriptionsResponse]]:
447 A function that, when called, will call the underlying RPC
448 on the server.
449 """
450 # Generate a "stub function" on-the-fly which will actually make
451 # the request.
452 # gRPC handles serialization and deserialization, so we just need
453 # to pass in the functions for each.
454 if "list_subscriptions" not in self._stubs:
455 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary(
456 "/google.pubsub.v1.Subscriber/ListSubscriptions",
457 request_serializer=pubsub.ListSubscriptionsRequest.serialize,
458 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize,
459 )
460 return self._stubs["list_subscriptions"]
461
462 @property
463 def delete_subscription(
464 self,
465 ) -> Callable[[pubsub.DeleteSubscriptionRequest], Awaitable[empty_pb2.Empty]]:
466 r"""Return a callable for the delete subscription method over gRPC.
467
468 Deletes an existing subscription. All messages retained in the
469 subscription are immediately dropped. Calls to ``Pull`` after
470 deletion will return ``NOT_FOUND``. After a subscription is
471 deleted, a new one may be created with the same name, but the
472 new one has no association with the old subscription or its
473 topic unless the same topic is specified.
474
475 Returns:
476 Callable[[~.DeleteSubscriptionRequest],
477 Awaitable[~.Empty]]:
478 A function that, when called, will call the underlying RPC
479 on the server.
480 """
481 # Generate a "stub function" on-the-fly which will actually make
482 # the request.
483 # gRPC handles serialization and deserialization, so we just need
484 # to pass in the functions for each.
485 if "delete_subscription" not in self._stubs:
486 self._stubs["delete_subscription"] = self._logged_channel.unary_unary(
487 "/google.pubsub.v1.Subscriber/DeleteSubscription",
488 request_serializer=pubsub.DeleteSubscriptionRequest.serialize,
489 response_deserializer=empty_pb2.Empty.FromString,
490 )
491 return self._stubs["delete_subscription"]
492
493 @property
494 def modify_ack_deadline(
495 self,
496 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], Awaitable[empty_pb2.Empty]]:
497 r"""Return a callable for the modify ack deadline method over gRPC.
498
499 Modifies the ack deadline for a specific message. This method is
500 useful to indicate that more time is needed to process a message
501 by the subscriber, or to make the message available for
502 redelivery if the processing was interrupted. Note that this
503 does not modify the subscription-level ``ackDeadlineSeconds``
504 used for subsequent messages.
505
506 Returns:
507 Callable[[~.ModifyAckDeadlineRequest],
508 Awaitable[~.Empty]]:
509 A function that, when called, will call the underlying RPC
510 on the server.
511 """
512 # Generate a "stub function" on-the-fly which will actually make
513 # the request.
514 # gRPC handles serialization and deserialization, so we just need
515 # to pass in the functions for each.
516 if "modify_ack_deadline" not in self._stubs:
517 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary(
518 "/google.pubsub.v1.Subscriber/ModifyAckDeadline",
519 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize,
520 response_deserializer=empty_pb2.Empty.FromString,
521 )
522 return self._stubs["modify_ack_deadline"]
523
524 @property
525 def acknowledge(
526 self,
527 ) -> Callable[[pubsub.AcknowledgeRequest], Awaitable[empty_pb2.Empty]]:
528 r"""Return a callable for the acknowledge method over gRPC.
529
530 Acknowledges the messages associated with the ``ack_ids`` in the
531 ``AcknowledgeRequest``. The Pub/Sub system can remove the
532 relevant messages from the subscription.
533
534 Acknowledging a message whose ack deadline has expired may
535 succeed, but such a message may be redelivered later.
536 Acknowledging a message more than once will not result in an
537 error.
538
539 Returns:
540 Callable[[~.AcknowledgeRequest],
541 Awaitable[~.Empty]]:
542 A function that, when called, will call the underlying RPC
543 on the server.
544 """
545 # Generate a "stub function" on-the-fly which will actually make
546 # the request.
547 # gRPC handles serialization and deserialization, so we just need
548 # to pass in the functions for each.
549 if "acknowledge" not in self._stubs:
550 self._stubs["acknowledge"] = self._logged_channel.unary_unary(
551 "/google.pubsub.v1.Subscriber/Acknowledge",
552 request_serializer=pubsub.AcknowledgeRequest.serialize,
553 response_deserializer=empty_pb2.Empty.FromString,
554 )
555 return self._stubs["acknowledge"]
556
557 @property
558 def pull(self) -> Callable[[pubsub.PullRequest], Awaitable[pubsub.PullResponse]]:
559 r"""Return a callable for the pull method over gRPC.
560
561 Pulls messages from the server.
562
563 Returns:
564 Callable[[~.PullRequest],
565 Awaitable[~.PullResponse]]:
566 A function that, when called, will call the underlying RPC
567 on the server.
568 """
569 # Generate a "stub function" on-the-fly which will actually make
570 # the request.
571 # gRPC handles serialization and deserialization, so we just need
572 # to pass in the functions for each.
573 if "pull" not in self._stubs:
574 self._stubs["pull"] = self._logged_channel.unary_unary(
575 "/google.pubsub.v1.Subscriber/Pull",
576 request_serializer=pubsub.PullRequest.serialize,
577 response_deserializer=pubsub.PullResponse.deserialize,
578 )
579 return self._stubs["pull"]
580
581 @property
582 def streaming_pull(
583 self,
584 ) -> Callable[
585 [pubsub.StreamingPullRequest], Awaitable[pubsub.StreamingPullResponse]
586 ]:
587 r"""Return a callable for the streaming pull method over gRPC.
588
589 Establishes a stream with the server, which sends messages down
590 to the client. The client streams acknowledgments and ack
591 deadline modifications back to the server. The server will close
592 the stream and return the status on any error. The server may
593 close the stream with status ``UNAVAILABLE`` to reassign
594 server-side resources, in which case, the client should
595 re-establish the stream. Flow control can be achieved by
596 configuring the underlying RPC channel.
597
598 Returns:
599 Callable[[~.StreamingPullRequest],
600 Awaitable[~.StreamingPullResponse]]:
601 A function that, when called, will call the underlying RPC
602 on the server.
603 """
604 # Generate a "stub function" on-the-fly which will actually make
605 # the request.
606 # gRPC handles serialization and deserialization, so we just need
607 # to pass in the functions for each.
608 if "streaming_pull" not in self._stubs:
609 self._stubs["streaming_pull"] = self._logged_channel.stream_stream(
610 "/google.pubsub.v1.Subscriber/StreamingPull",
611 request_serializer=pubsub.StreamingPullRequest.serialize,
612 response_deserializer=pubsub.StreamingPullResponse.deserialize,
613 )
614 return self._stubs["streaming_pull"]
615
616 @property
617 def modify_push_config(
618 self,
619 ) -> Callable[[pubsub.ModifyPushConfigRequest], Awaitable[empty_pb2.Empty]]:
620 r"""Return a callable for the modify push config method over gRPC.
621
622 Modifies the ``PushConfig`` for a specified subscription.
623
624 This may be used to change a push subscription to a pull one
625 (signified by an empty ``PushConfig``) or vice versa, or change
626 the endpoint URL and other attributes of a push subscription.
627 Messages will accumulate for delivery continuously through the
628 call regardless of changes to the ``PushConfig``.
629
630 Returns:
631 Callable[[~.ModifyPushConfigRequest],
632 Awaitable[~.Empty]]:
633 A function that, when called, will call the underlying RPC
634 on the server.
635 """
636 # Generate a "stub function" on-the-fly which will actually make
637 # the request.
638 # gRPC handles serialization and deserialization, so we just need
639 # to pass in the functions for each.
640 if "modify_push_config" not in self._stubs:
641 self._stubs["modify_push_config"] = self._logged_channel.unary_unary(
642 "/google.pubsub.v1.Subscriber/ModifyPushConfig",
643 request_serializer=pubsub.ModifyPushConfigRequest.serialize,
644 response_deserializer=empty_pb2.Empty.FromString,
645 )
646 return self._stubs["modify_push_config"]
647
648 @property
649 def get_snapshot(
650 self,
651 ) -> Callable[[pubsub.GetSnapshotRequest], Awaitable[pubsub.Snapshot]]:
652 r"""Return a callable for the get snapshot method over gRPC.
653
654 Gets the configuration details of a snapshot. Snapshots are used
655 in
656 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
657 operations, which allow you to manage message acknowledgments in
658 bulk. That is, you can set the acknowledgment state of messages
659 in an existing subscription to the state captured by a snapshot.
660
661 Returns:
662 Callable[[~.GetSnapshotRequest],
663 Awaitable[~.Snapshot]]:
664 A function that, when called, will call the underlying RPC
665 on the server.
666 """
667 # Generate a "stub function" on-the-fly which will actually make
668 # the request.
669 # gRPC handles serialization and deserialization, so we just need
670 # to pass in the functions for each.
671 if "get_snapshot" not in self._stubs:
672 self._stubs["get_snapshot"] = self._logged_channel.unary_unary(
673 "/google.pubsub.v1.Subscriber/GetSnapshot",
674 request_serializer=pubsub.GetSnapshotRequest.serialize,
675 response_deserializer=pubsub.Snapshot.deserialize,
676 )
677 return self._stubs["get_snapshot"]
678
679 @property
680 def list_snapshots(
681 self,
682 ) -> Callable[
683 [pubsub.ListSnapshotsRequest], Awaitable[pubsub.ListSnapshotsResponse]
684 ]:
685 r"""Return a callable for the list snapshots method over gRPC.
686
687 Lists the existing snapshots. Snapshots are used in
688 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
689 operations, which allow you to manage message acknowledgments in
690 bulk. That is, you can set the acknowledgment state of messages
691 in an existing subscription to the state captured by a snapshot.
692
693 Returns:
694 Callable[[~.ListSnapshotsRequest],
695 Awaitable[~.ListSnapshotsResponse]]:
696 A function that, when called, will call the underlying RPC
697 on the server.
698 """
699 # Generate a "stub function" on-the-fly which will actually make
700 # the request.
701 # gRPC handles serialization and deserialization, so we just need
702 # to pass in the functions for each.
703 if "list_snapshots" not in self._stubs:
704 self._stubs["list_snapshots"] = self._logged_channel.unary_unary(
705 "/google.pubsub.v1.Subscriber/ListSnapshots",
706 request_serializer=pubsub.ListSnapshotsRequest.serialize,
707 response_deserializer=pubsub.ListSnapshotsResponse.deserialize,
708 )
709 return self._stubs["list_snapshots"]
710
711 @property
712 def create_snapshot(
713 self,
714 ) -> Callable[[pubsub.CreateSnapshotRequest], Awaitable[pubsub.Snapshot]]:
715 r"""Return a callable for the create snapshot method over gRPC.
716
717 Creates a snapshot from the requested subscription. Snapshots
718 are used in
719 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
720 operations, which allow you to manage message acknowledgments in
721 bulk. That is, you can set the acknowledgment state of messages
722 in an existing subscription to the state captured by a snapshot.
723 If the snapshot already exists, returns ``ALREADY_EXISTS``. If
724 the requested subscription doesn't exist, returns ``NOT_FOUND``.
725 If the backlog in the subscription is too old -- and the
726 resulting snapshot would expire in less than 1 hour -- then
727 ``FAILED_PRECONDITION`` is returned. See also the
728 ``Snapshot.expire_time`` field. If the name is not provided in
729 the request, the server will assign a random name for this
730 snapshot on the same project as the subscription, conforming to
731 the [resource name format]
732 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
733 The generated name is populated in the returned Snapshot object.
734 Note that for REST API requests, you must specify a name in the
735 request.
736
737 Returns:
738 Callable[[~.CreateSnapshotRequest],
739 Awaitable[~.Snapshot]]:
740 A function that, when called, will call the underlying RPC
741 on the server.
742 """
743 # Generate a "stub function" on-the-fly which will actually make
744 # the request.
745 # gRPC handles serialization and deserialization, so we just need
746 # to pass in the functions for each.
747 if "create_snapshot" not in self._stubs:
748 self._stubs["create_snapshot"] = self._logged_channel.unary_unary(
749 "/google.pubsub.v1.Subscriber/CreateSnapshot",
750 request_serializer=pubsub.CreateSnapshotRequest.serialize,
751 response_deserializer=pubsub.Snapshot.deserialize,
752 )
753 return self._stubs["create_snapshot"]
754
755 @property
756 def update_snapshot(
757 self,
758 ) -> Callable[[pubsub.UpdateSnapshotRequest], Awaitable[pubsub.Snapshot]]:
759 r"""Return a callable for the update snapshot method over gRPC.
760
761 Updates an existing snapshot by updating the fields specified in
762 the update mask. Snapshots are used in
763 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
764 operations, which allow you to manage message acknowledgments in
765 bulk. That is, you can set the acknowledgment state of messages
766 in an existing subscription to the state captured by a snapshot.
767
768 Returns:
769 Callable[[~.UpdateSnapshotRequest],
770 Awaitable[~.Snapshot]]:
771 A function that, when called, will call the underlying RPC
772 on the server.
773 """
774 # Generate a "stub function" on-the-fly which will actually make
775 # the request.
776 # gRPC handles serialization and deserialization, so we just need
777 # to pass in the functions for each.
778 if "update_snapshot" not in self._stubs:
779 self._stubs["update_snapshot"] = self._logged_channel.unary_unary(
780 "/google.pubsub.v1.Subscriber/UpdateSnapshot",
781 request_serializer=pubsub.UpdateSnapshotRequest.serialize,
782 response_deserializer=pubsub.Snapshot.deserialize,
783 )
784 return self._stubs["update_snapshot"]
785
786 @property
787 def delete_snapshot(
788 self,
789 ) -> Callable[[pubsub.DeleteSnapshotRequest], Awaitable[empty_pb2.Empty]]:
790 r"""Return a callable for the delete snapshot method over gRPC.
791
792 Removes an existing snapshot. Snapshots are used in [Seek]
793 (https://cloud.google.com/pubsub/docs/replay-overview)
794 operations, which allow you to manage message acknowledgments in
795 bulk. That is, you can set the acknowledgment state of messages
796 in an existing subscription to the state captured by a snapshot.
797 When the snapshot is deleted, all messages retained in the
798 snapshot are immediately dropped. After a snapshot is deleted, a
799 new one may be created with the same name, but the new one has
800 no association with the old snapshot or its subscription, unless
801 the same subscription is specified.
802
803 Returns:
804 Callable[[~.DeleteSnapshotRequest],
805 Awaitable[~.Empty]]:
806 A function that, when called, will call the underlying RPC
807 on the server.
808 """
809 # Generate a "stub function" on-the-fly which will actually make
810 # the request.
811 # gRPC handles serialization and deserialization, so we just need
812 # to pass in the functions for each.
813 if "delete_snapshot" not in self._stubs:
814 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary(
815 "/google.pubsub.v1.Subscriber/DeleteSnapshot",
816 request_serializer=pubsub.DeleteSnapshotRequest.serialize,
817 response_deserializer=empty_pb2.Empty.FromString,
818 )
819 return self._stubs["delete_snapshot"]
820
821 @property
822 def seek(self) -> Callable[[pubsub.SeekRequest], Awaitable[pubsub.SeekResponse]]:
823 r"""Return a callable for the seek method over gRPC.
824
825 Seeks an existing subscription to a point in time or to a given
826 snapshot, whichever is provided in the request. Snapshots are
827 used in [Seek]
828 (https://cloud.google.com/pubsub/docs/replay-overview)
829 operations, which allow you to manage message acknowledgments in
830 bulk. That is, you can set the acknowledgment state of messages
831 in an existing subscription to the state captured by a snapshot.
832 Note that both the subscription and the snapshot must be on the
833 same topic.
834
835 Returns:
836 Callable[[~.SeekRequest],
837 Awaitable[~.SeekResponse]]:
838 A function that, when called, will call the underlying RPC
839 on the server.
840 """
841 # Generate a "stub function" on-the-fly which will actually make
842 # the request.
843 # gRPC handles serialization and deserialization, so we just need
844 # to pass in the functions for each.
845 if "seek" not in self._stubs:
846 self._stubs["seek"] = self._logged_channel.unary_unary(
847 "/google.pubsub.v1.Subscriber/Seek",
848 request_serializer=pubsub.SeekRequest.serialize,
849 response_deserializer=pubsub.SeekResponse.deserialize,
850 )
851 return self._stubs["seek"]
852
853 def _prep_wrapped_messages(self, client_info):
854 """Precompute the wrapped methods, overriding the base class method to use async wrappers."""
855 self._wrapped_methods = {
856 self.create_subscription: self._wrap_method(
857 self.create_subscription,
858 default_retry=retries.AsyncRetry(
859 initial=0.1,
860 maximum=60.0,
861 multiplier=1.3,
862 predicate=retries.if_exception_type(
863 core_exceptions.Aborted,
864 core_exceptions.ServiceUnavailable,
865 core_exceptions.Unknown,
866 ),
867 deadline=60.0,
868 ),
869 default_timeout=60.0,
870 client_info=client_info,
871 ),
872 self.get_subscription: self._wrap_method(
873 self.get_subscription,
874 default_retry=retries.AsyncRetry(
875 initial=0.1,
876 maximum=60.0,
877 multiplier=1.3,
878 predicate=retries.if_exception_type(
879 core_exceptions.Aborted,
880 core_exceptions.ServiceUnavailable,
881 core_exceptions.Unknown,
882 ),
883 deadline=60.0,
884 ),
885 default_timeout=60.0,
886 client_info=client_info,
887 ),
888 self.update_subscription: self._wrap_method(
889 self.update_subscription,
890 default_retry=retries.AsyncRetry(
891 initial=0.1,
892 maximum=60.0,
893 multiplier=1.3,
894 predicate=retries.if_exception_type(
895 core_exceptions.ServiceUnavailable,
896 ),
897 deadline=60.0,
898 ),
899 default_timeout=60.0,
900 client_info=client_info,
901 ),
902 self.list_subscriptions: self._wrap_method(
903 self.list_subscriptions,
904 default_retry=retries.AsyncRetry(
905 initial=0.1,
906 maximum=60.0,
907 multiplier=1.3,
908 predicate=retries.if_exception_type(
909 core_exceptions.Aborted,
910 core_exceptions.ServiceUnavailable,
911 core_exceptions.Unknown,
912 ),
913 deadline=60.0,
914 ),
915 default_timeout=60.0,
916 client_info=client_info,
917 ),
918 self.delete_subscription: self._wrap_method(
919 self.delete_subscription,
920 default_retry=retries.AsyncRetry(
921 initial=0.1,
922 maximum=60.0,
923 multiplier=1.3,
924 predicate=retries.if_exception_type(
925 core_exceptions.ServiceUnavailable,
926 ),
927 deadline=60.0,
928 ),
929 default_timeout=60.0,
930 client_info=client_info,
931 ),
932 self.modify_ack_deadline: self._wrap_method(
933 self.modify_ack_deadline,
934 default_retry=retries.AsyncRetry(
935 initial=0.1,
936 maximum=60.0,
937 multiplier=1.3,
938 predicate=retries.if_exception_type(
939 core_exceptions.ServiceUnavailable,
940 ),
941 deadline=60.0,
942 ),
943 default_timeout=60.0,
944 client_info=client_info,
945 ),
946 self.acknowledge: self._wrap_method(
947 self.acknowledge,
948 default_retry=retries.AsyncRetry(
949 initial=0.1,
950 maximum=60.0,
951 multiplier=1.3,
952 predicate=retries.if_exception_type(
953 core_exceptions.ServiceUnavailable,
954 ),
955 deadline=60.0,
956 ),
957 default_timeout=60.0,
958 client_info=client_info,
959 ),
960 self.pull: self._wrap_method(
961 self.pull,
962 default_retry=retries.AsyncRetry(
963 initial=0.1,
964 maximum=60.0,
965 multiplier=1.3,
966 predicate=retries.if_exception_type(
967 core_exceptions.Aborted,
968 core_exceptions.InternalServerError,
969 core_exceptions.ServiceUnavailable,
970 core_exceptions.Unknown,
971 ),
972 deadline=60.0,
973 ),
974 default_timeout=60.0,
975 client_info=client_info,
976 ),
977 self.streaming_pull: self._wrap_method(
978 self.streaming_pull,
979 default_retry=retries.AsyncRetry(
980 initial=0.1,
981 maximum=60.0,
982 multiplier=4,
983 predicate=retries.if_exception_type(
984 core_exceptions.Aborted,
985 core_exceptions.DeadlineExceeded,
986 core_exceptions.InternalServerError,
987 core_exceptions.ResourceExhausted,
988 core_exceptions.ServiceUnavailable,
989 ),
990 deadline=900.0,
991 ),
992 default_timeout=900.0,
993 client_info=client_info,
994 ),
995 self.modify_push_config: self._wrap_method(
996 self.modify_push_config,
997 default_retry=retries.AsyncRetry(
998 initial=0.1,
999 maximum=60.0,
1000 multiplier=1.3,
1001 predicate=retries.if_exception_type(
1002 core_exceptions.ServiceUnavailable,
1003 ),
1004 deadline=60.0,
1005 ),
1006 default_timeout=60.0,
1007 client_info=client_info,
1008 ),
1009 self.get_snapshot: self._wrap_method(
1010 self.get_snapshot,
1011 default_retry=retries.AsyncRetry(
1012 initial=0.1,
1013 maximum=60.0,
1014 multiplier=1.3,
1015 predicate=retries.if_exception_type(
1016 core_exceptions.Aborted,
1017 core_exceptions.ServiceUnavailable,
1018 core_exceptions.Unknown,
1019 ),
1020 deadline=60.0,
1021 ),
1022 default_timeout=60.0,
1023 client_info=client_info,
1024 ),
1025 self.list_snapshots: self._wrap_method(
1026 self.list_snapshots,
1027 default_retry=retries.AsyncRetry(
1028 initial=0.1,
1029 maximum=60.0,
1030 multiplier=1.3,
1031 predicate=retries.if_exception_type(
1032 core_exceptions.Aborted,
1033 core_exceptions.ServiceUnavailable,
1034 core_exceptions.Unknown,
1035 ),
1036 deadline=60.0,
1037 ),
1038 default_timeout=60.0,
1039 client_info=client_info,
1040 ),
1041 self.create_snapshot: self._wrap_method(
1042 self.create_snapshot,
1043 default_retry=retries.AsyncRetry(
1044 initial=0.1,
1045 maximum=60.0,
1046 multiplier=1.3,
1047 predicate=retries.if_exception_type(
1048 core_exceptions.ServiceUnavailable,
1049 ),
1050 deadline=60.0,
1051 ),
1052 default_timeout=60.0,
1053 client_info=client_info,
1054 ),
1055 self.update_snapshot: self._wrap_method(
1056 self.update_snapshot,
1057 default_retry=retries.AsyncRetry(
1058 initial=0.1,
1059 maximum=60.0,
1060 multiplier=1.3,
1061 predicate=retries.if_exception_type(
1062 core_exceptions.ServiceUnavailable,
1063 ),
1064 deadline=60.0,
1065 ),
1066 default_timeout=60.0,
1067 client_info=client_info,
1068 ),
1069 self.delete_snapshot: self._wrap_method(
1070 self.delete_snapshot,
1071 default_retry=retries.AsyncRetry(
1072 initial=0.1,
1073 maximum=60.0,
1074 multiplier=1.3,
1075 predicate=retries.if_exception_type(
1076 core_exceptions.ServiceUnavailable,
1077 ),
1078 deadline=60.0,
1079 ),
1080 default_timeout=60.0,
1081 client_info=client_info,
1082 ),
1083 self.seek: self._wrap_method(
1084 self.seek,
1085 default_retry=retries.AsyncRetry(
1086 initial=0.1,
1087 maximum=60.0,
1088 multiplier=1.3,
1089 predicate=retries.if_exception_type(
1090 core_exceptions.Aborted,
1091 core_exceptions.ServiceUnavailable,
1092 core_exceptions.Unknown,
1093 ),
1094 deadline=60.0,
1095 ),
1096 default_timeout=60.0,
1097 client_info=client_info,
1098 ),
1099 self.get_iam_policy: self._wrap_method(
1100 self.get_iam_policy,
1101 default_timeout=None,
1102 client_info=client_info,
1103 ),
1104 self.set_iam_policy: self._wrap_method(
1105 self.set_iam_policy,
1106 default_timeout=None,
1107 client_info=client_info,
1108 ),
1109 self.test_iam_permissions: self._wrap_method(
1110 self.test_iam_permissions,
1111 default_timeout=None,
1112 client_info=client_info,
1113 ),
1114 }
1115
1116 def _wrap_method(self, func, *args, **kwargs):
1117 if self._wrap_with_kind: # pragma: NO COVER
1118 kwargs["kind"] = self.kind
1119 return gapic_v1.method_async.wrap_method(func, *args, **kwargs)
1120
1121 def close(self):
1122 return self._logged_channel.close()
1123
1124 @property
1125 def kind(self) -> str:
1126 return "grpc_asyncio"
1127
1128 @property
1129 def set_iam_policy(
1130 self,
1131 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]:
1132 r"""Return a callable for the set iam policy method over gRPC.
1133 Sets the IAM access control policy on the specified
1134 function. Replaces any existing policy.
1135 Returns:
1136 Callable[[~.SetIamPolicyRequest],
1137 ~.Policy]:
1138 A function that, when called, will call the underlying RPC
1139 on the server.
1140 """
1141 # Generate a "stub function" on-the-fly which will actually make
1142 # the request.
1143 # gRPC handles serialization and deserialization, so we just need
1144 # to pass in the functions for each.
1145 if "set_iam_policy" not in self._stubs:
1146 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
1147 "/google.iam.v1.IAMPolicy/SetIamPolicy",
1148 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
1149 response_deserializer=policy_pb2.Policy.FromString,
1150 )
1151 return self._stubs["set_iam_policy"]
1152
1153 @property
1154 def get_iam_policy(
1155 self,
1156 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]:
1157 r"""Return a callable for the get iam policy method over gRPC.
1158 Gets the IAM access control policy for a function.
1159 Returns an empty policy if the function exists and does
1160 not have a policy set.
1161 Returns:
1162 Callable[[~.GetIamPolicyRequest],
1163 ~.Policy]:
1164 A function that, when called, will call the underlying RPC
1165 on the server.
1166 """
1167 # Generate a "stub function" on-the-fly which will actually make
1168 # the request.
1169 # gRPC handles serialization and deserialization, so we just need
1170 # to pass in the functions for each.
1171 if "get_iam_policy" not in self._stubs:
1172 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
1173 "/google.iam.v1.IAMPolicy/GetIamPolicy",
1174 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
1175 response_deserializer=policy_pb2.Policy.FromString,
1176 )
1177 return self._stubs["get_iam_policy"]
1178
1179 @property
1180 def test_iam_permissions(
1181 self,
1182 ) -> Callable[
1183 [iam_policy_pb2.TestIamPermissionsRequest],
1184 iam_policy_pb2.TestIamPermissionsResponse,
1185 ]:
1186 r"""Return a callable for the test iam permissions method over gRPC.
1187 Tests the specified permissions against the IAM access control
1188 policy for a function. If the function does not exist, this will
1189 return an empty set of permissions, not a NOT_FOUND error.
1190 Returns:
1191 Callable[[~.TestIamPermissionsRequest],
1192 ~.TestIamPermissionsResponse]:
1193 A function that, when called, will call the underlying RPC
1194 on the server.
1195 """
1196 # Generate a "stub function" on-the-fly which will actually make
1197 # the request.
1198 # gRPC handles serialization and deserialization, so we just need
1199 # to pass in the functions for each.
1200 if "test_iam_permissions" not in self._stubs:
1201 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
1202 "/google.iam.v1.IAMPolicy/TestIamPermissions",
1203 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
1204 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
1205 )
1206 return self._stubs["test_iam_permissions"]
1207
1208
1209__all__ = ("SubscriberGrpcAsyncIOTransport",)