1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json
17import logging as std_logging
18import pickle
19import warnings
20from typing import Callable, Dict, Optional, Sequence, Tuple, Union
21
22from google.api_core import grpc_helpers
23from google.api_core import gapic_v1
24import google.auth # type: ignore
25from google.auth import credentials as ga_credentials # type: ignore
26from google.auth.transport.grpc import SslCredentials # type: ignore
27from google.protobuf.json_format import MessageToJson
28import google.protobuf.message
29
30import grpc # type: ignore
31import proto # type: ignore
32
33from google.iam.v1 import iam_policy_pb2 # type: ignore
34from google.iam.v1 import policy_pb2 # type: ignore
35from google.protobuf import empty_pb2 # type: ignore
36from google.pubsub_v1.types import pubsub
37from .base import SubscriberTransport, DEFAULT_CLIENT_INFO
38
39try:
40 from google.api_core import client_logging # type: ignore
41
42 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
43except ImportError: # pragma: NO COVER
44 CLIENT_LOGGING_SUPPORTED = False
45
46_LOGGER = std_logging.getLogger(__name__)
47
48
49class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
50 def intercept_unary_unary(self, continuation, client_call_details, request):
51 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
52 std_logging.DEBUG
53 )
54 if logging_enabled: # pragma: NO COVER
55 request_metadata = client_call_details.metadata
56 if isinstance(request, proto.Message):
57 request_payload = type(request).to_json(request)
58 elif isinstance(request, google.protobuf.message.Message):
59 request_payload = MessageToJson(request)
60 else:
61 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
62
63 request_metadata = {
64 key: value.decode("utf-8") if isinstance(value, bytes) else value
65 for key, value in request_metadata
66 }
67 grpc_request = {
68 "payload": request_payload,
69 "requestMethod": "grpc",
70 "metadata": dict(request_metadata),
71 }
72 _LOGGER.debug(
73 f"Sending request for {client_call_details.method}",
74 extra={
75 "serviceName": "google.pubsub.v1.Subscriber",
76 "rpcName": str(client_call_details.method),
77 "request": grpc_request,
78 "metadata": grpc_request["metadata"],
79 },
80 )
81 response = continuation(client_call_details, request)
82 if logging_enabled: # pragma: NO COVER
83 response_metadata = response.trailing_metadata()
84 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
85 metadata = (
86 dict([(k, str(v)) for k, v in response_metadata])
87 if response_metadata
88 else None
89 )
90 result = response.result()
91 if isinstance(result, proto.Message):
92 response_payload = type(result).to_json(result)
93 elif isinstance(result, google.protobuf.message.Message):
94 response_payload = MessageToJson(result)
95 else:
96 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
97 grpc_response = {
98 "payload": response_payload,
99 "metadata": metadata,
100 "status": "OK",
101 }
102 _LOGGER.debug(
103 f"Received response for {client_call_details.method}.",
104 extra={
105 "serviceName": "google.pubsub.v1.Subscriber",
106 "rpcName": client_call_details.method,
107 "response": grpc_response,
108 "metadata": grpc_response["metadata"],
109 },
110 )
111 return response
112
113
114class SubscriberGrpcTransport(SubscriberTransport):
115 """gRPC backend transport for Subscriber.
116
117 The service that an application uses to manipulate subscriptions and
118 to consume messages from a subscription via the ``Pull`` method or
119 by establishing a bi-directional stream using the ``StreamingPull``
120 method.
121
122 This class defines the same methods as the primary client, so the
123 primary client can load the underlying transport implementation
124 and call it.
125
126 It sends protocol buffers over the wire using gRPC (which is built on
127 top of HTTP/2); the ``grpcio`` package must be installed.
128 """
129
130 _stubs: Dict[str, Callable]
131
132 def __init__(
133 self,
134 *,
135 host: str = "pubsub.googleapis.com",
136 credentials: Optional[ga_credentials.Credentials] = None,
137 credentials_file: Optional[str] = None,
138 scopes: Optional[Sequence[str]] = None,
139 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None,
140 api_mtls_endpoint: Optional[str] = None,
141 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
142 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
143 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
144 quota_project_id: Optional[str] = None,
145 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
146 always_use_jwt_access: Optional[bool] = False,
147 api_audience: Optional[str] = None,
148 ) -> None:
149 """Instantiate the transport.
150
151 Args:
152 host (Optional[str]):
153 The hostname to connect to (default: 'pubsub.googleapis.com').
154 credentials (Optional[google.auth.credentials.Credentials]): The
155 authorization credentials to attach to requests. These
156 credentials identify the application to the service; if none
157 are specified, the client will attempt to ascertain the
158 credentials from the environment.
159 This argument is ignored if a ``channel`` instance is provided.
160 credentials_file (Optional[str]): Deprecated. A file with credentials that can
161 be loaded with :func:`google.auth.load_credentials_from_file`.
162 This argument is ignored if a ``channel`` instance is provided.
163 This argument will be removed in the next major version of this library.
164 scopes (Optional(Sequence[str])): A list of scopes. This argument is
165 ignored if a ``channel`` instance is provided.
166 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]):
167 A ``Channel`` instance through which to make calls, or a Callable
168 that constructs and returns one. If set to None, ``self.create_channel``
169 is used to create the channel. If a Callable is given, it will be called
170 with the same arguments as used in ``self.create_channel``.
171 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
172 If provided, it overrides the ``host`` argument and tries to create
173 a mutual TLS channel with client SSL credentials from
174 ``client_cert_source`` or application default SSL credentials.
175 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
176 Deprecated. A callback to provide client SSL certificate bytes and
177 private key bytes, both in PEM format. It is ignored if
178 ``api_mtls_endpoint`` is None.
179 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
180 for the grpc channel. It is ignored if a ``channel`` instance is provided.
181 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
182 A callback to provide client certificate bytes and private key bytes,
183 both in PEM format. It is used to configure a mutual TLS channel. It is
184 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
185 quota_project_id (Optional[str]): An optional project to use for billing
186 and quota.
187 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
188 The client info used to send a user-agent string along with
189 API requests. If ``None``, then default info will be used.
190 Generally, you only need to set this if you're developing
191 your own client library.
192 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
193 be used for service account credentials.
194
195 Raises:
196 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
197 creation failed for any reason.
198 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
199 and ``credentials_file`` are passed.
200 """
201 self._grpc_channel = None
202 self._ssl_channel_credentials = ssl_channel_credentials
203 self._stubs: Dict[str, Callable] = {}
204
205 if api_mtls_endpoint:
206 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
207 if client_cert_source:
208 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
209
210 if isinstance(channel, grpc.Channel):
211 # Ignore credentials if a channel was passed.
212 credentials = None
213 self._ignore_credentials = True
214 # If a channel was explicitly provided, set it.
215 self._grpc_channel = channel
216 self._ssl_channel_credentials = None
217
218 else:
219 if api_mtls_endpoint:
220 host = api_mtls_endpoint
221
222 # Create SSL credentials with client_cert_source or application
223 # default SSL credentials.
224 if client_cert_source:
225 cert, key = client_cert_source()
226 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
227 certificate_chain=cert, private_key=key
228 )
229 else:
230 self._ssl_channel_credentials = SslCredentials().ssl_credentials
231
232 else:
233 if client_cert_source_for_mtls and not ssl_channel_credentials:
234 cert, key = client_cert_source_for_mtls()
235 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
236 certificate_chain=cert, private_key=key
237 )
238
239 # The base transport sets the host, credentials and scopes
240 super().__init__(
241 host=host,
242 credentials=credentials,
243 credentials_file=credentials_file,
244 scopes=scopes,
245 quota_project_id=quota_project_id,
246 client_info=client_info,
247 always_use_jwt_access=always_use_jwt_access,
248 api_audience=api_audience,
249 )
250
251 if not self._grpc_channel:
252 # initialize with the provided callable or the default channel
253 channel_init = channel or type(self).create_channel
254 self._grpc_channel = channel_init(
255 self._host,
256 # use the credentials which are saved
257 credentials=self._credentials,
258 # Set ``credentials_file`` to ``None`` here as
259 # the credentials that we saved earlier should be used.
260 credentials_file=None,
261 scopes=self._scopes,
262 ssl_credentials=self._ssl_channel_credentials,
263 quota_project_id=quota_project_id,
264 options=[
265 ("grpc.max_send_message_length", -1),
266 ("grpc.max_receive_message_length", -1),
267 ("grpc.max_metadata_size", 4 * 1024 * 1024),
268 ("grpc.keepalive_time_ms", 30000),
269 ],
270 )
271
272 self._interceptor = _LoggingClientInterceptor()
273 self._logged_channel = grpc.intercept_channel(
274 self._grpc_channel, self._interceptor
275 )
276
277 # Wrap messages. This must be done after self._logged_channel exists
278 self._prep_wrapped_messages(client_info)
279
280 @classmethod
281 def create_channel(
282 cls,
283 host: str = "pubsub.googleapis.com",
284 credentials: Optional[ga_credentials.Credentials] = None,
285 credentials_file: Optional[str] = None,
286 scopes: Optional[Sequence[str]] = None,
287 quota_project_id: Optional[str] = None,
288 **kwargs,
289 ) -> grpc.Channel:
290 """Create and return a gRPC channel object.
291 Args:
292 host (Optional[str]): The host for the channel to use.
293 credentials (Optional[~.Credentials]): The
294 authorization credentials to attach to requests. These
295 credentials identify this application to the service. If
296 none are specified, the client will attempt to ascertain
297 the credentials from the environment.
298 credentials_file (Optional[str]): Deprecated. A file with credentials that can
299 be loaded with :func:`google.auth.load_credentials_from_file`.
300 This argument is mutually exclusive with credentials. This argument will be
301 removed in the next major version of this library.
302 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
303 service. These are only used when credentials are not specified and
304 are passed to :func:`google.auth.default`.
305 quota_project_id (Optional[str]): An optional project to use for billing
306 and quota.
307 kwargs (Optional[dict]): Keyword arguments, which are passed to the
308 channel creation.
309 Returns:
310 grpc.Channel: A gRPC channel object.
311
312 Raises:
313 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
314 and ``credentials_file`` are passed.
315 """
316
317 return grpc_helpers.create_channel(
318 host,
319 credentials=credentials,
320 credentials_file=credentials_file,
321 quota_project_id=quota_project_id,
322 default_scopes=cls.AUTH_SCOPES,
323 scopes=scopes,
324 default_host=cls.DEFAULT_HOST,
325 **kwargs,
326 )
327
328 @property
329 def grpc_channel(self) -> grpc.Channel:
330 """Return the channel designed to connect to this service."""
331 return self._grpc_channel
332
333 @property
334 def create_subscription(
335 self,
336 ) -> Callable[[pubsub.Subscription], pubsub.Subscription]:
337 r"""Return a callable for the create subscription method over gRPC.
338
339 Creates a subscription to a given topic. See the [resource name
340 rules]
341 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
342 If the subscription already exists, returns ``ALREADY_EXISTS``.
343 If the corresponding topic doesn't exist, returns ``NOT_FOUND``.
344
345 If the name is not provided in the request, the server will
346 assign a random name for this subscription on the same project
347 as the topic, conforming to the [resource name format]
348 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
349 The generated name is populated in the returned Subscription
350 object. Note that for REST API requests, you must specify a name
351 in the request.
352
353 Returns:
354 Callable[[~.Subscription],
355 ~.Subscription]:
356 A function that, when called, will call the underlying RPC
357 on the server.
358 """
359 # Generate a "stub function" on-the-fly which will actually make
360 # the request.
361 # gRPC handles serialization and deserialization, so we just need
362 # to pass in the functions for each.
363 if "create_subscription" not in self._stubs:
364 self._stubs["create_subscription"] = self._logged_channel.unary_unary(
365 "/google.pubsub.v1.Subscriber/CreateSubscription",
366 request_serializer=pubsub.Subscription.serialize,
367 response_deserializer=pubsub.Subscription.deserialize,
368 )
369 return self._stubs["create_subscription"]
370
371 @property
372 def get_subscription(
373 self,
374 ) -> Callable[[pubsub.GetSubscriptionRequest], pubsub.Subscription]:
375 r"""Return a callable for the get subscription method over gRPC.
376
377 Gets the configuration details of a subscription.
378
379 Returns:
380 Callable[[~.GetSubscriptionRequest],
381 ~.Subscription]:
382 A function that, when called, will call the underlying RPC
383 on the server.
384 """
385 # Generate a "stub function" on-the-fly which will actually make
386 # the request.
387 # gRPC handles serialization and deserialization, so we just need
388 # to pass in the functions for each.
389 if "get_subscription" not in self._stubs:
390 self._stubs["get_subscription"] = self._logged_channel.unary_unary(
391 "/google.pubsub.v1.Subscriber/GetSubscription",
392 request_serializer=pubsub.GetSubscriptionRequest.serialize,
393 response_deserializer=pubsub.Subscription.deserialize,
394 )
395 return self._stubs["get_subscription"]
396
397 @property
398 def update_subscription(
399 self,
400 ) -> Callable[[pubsub.UpdateSubscriptionRequest], pubsub.Subscription]:
401 r"""Return a callable for the update subscription method over gRPC.
402
403 Updates an existing subscription by updating the
404 fields specified in the update mask. Note that certain
405 properties of a subscription, such as its topic, are not
406 modifiable.
407
408 Returns:
409 Callable[[~.UpdateSubscriptionRequest],
410 ~.Subscription]:
411 A function that, when called, will call the underlying RPC
412 on the server.
413 """
414 # Generate a "stub function" on-the-fly which will actually make
415 # the request.
416 # gRPC handles serialization and deserialization, so we just need
417 # to pass in the functions for each.
418 if "update_subscription" not in self._stubs:
419 self._stubs["update_subscription"] = self._logged_channel.unary_unary(
420 "/google.pubsub.v1.Subscriber/UpdateSubscription",
421 request_serializer=pubsub.UpdateSubscriptionRequest.serialize,
422 response_deserializer=pubsub.Subscription.deserialize,
423 )
424 return self._stubs["update_subscription"]
425
426 @property
427 def list_subscriptions(
428 self,
429 ) -> Callable[[pubsub.ListSubscriptionsRequest], pubsub.ListSubscriptionsResponse]:
430 r"""Return a callable for the list subscriptions method over gRPC.
431
432 Lists matching subscriptions.
433
434 Returns:
435 Callable[[~.ListSubscriptionsRequest],
436 ~.ListSubscriptionsResponse]:
437 A function that, when called, will call the underlying RPC
438 on the server.
439 """
440 # Generate a "stub function" on-the-fly which will actually make
441 # the request.
442 # gRPC handles serialization and deserialization, so we just need
443 # to pass in the functions for each.
444 if "list_subscriptions" not in self._stubs:
445 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary(
446 "/google.pubsub.v1.Subscriber/ListSubscriptions",
447 request_serializer=pubsub.ListSubscriptionsRequest.serialize,
448 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize,
449 )
450 return self._stubs["list_subscriptions"]
451
452 @property
453 def delete_subscription(
454 self,
455 ) -> Callable[[pubsub.DeleteSubscriptionRequest], empty_pb2.Empty]:
456 r"""Return a callable for the delete subscription method over gRPC.
457
458 Deletes an existing subscription. All messages retained in the
459 subscription are immediately dropped. Calls to ``Pull`` after
460 deletion will return ``NOT_FOUND``. After a subscription is
461 deleted, a new one may be created with the same name, but the
462 new one has no association with the old subscription or its
463 topic unless the same topic is specified.
464
465 Returns:
466 Callable[[~.DeleteSubscriptionRequest],
467 ~.Empty]:
468 A function that, when called, will call the underlying RPC
469 on the server.
470 """
471 # Generate a "stub function" on-the-fly which will actually make
472 # the request.
473 # gRPC handles serialization and deserialization, so we just need
474 # to pass in the functions for each.
475 if "delete_subscription" not in self._stubs:
476 self._stubs["delete_subscription"] = self._logged_channel.unary_unary(
477 "/google.pubsub.v1.Subscriber/DeleteSubscription",
478 request_serializer=pubsub.DeleteSubscriptionRequest.serialize,
479 response_deserializer=empty_pb2.Empty.FromString,
480 )
481 return self._stubs["delete_subscription"]
482
483 @property
484 def modify_ack_deadline(
485 self,
486 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], empty_pb2.Empty]:
487 r"""Return a callable for the modify ack deadline method over gRPC.
488
489 Modifies the ack deadline for a specific message. This method is
490 useful to indicate that more time is needed to process a message
491 by the subscriber, or to make the message available for
492 redelivery if the processing was interrupted. Note that this
493 does not modify the subscription-level ``ackDeadlineSeconds``
494 used for subsequent messages.
495
496 Returns:
497 Callable[[~.ModifyAckDeadlineRequest],
498 ~.Empty]:
499 A function that, when called, will call the underlying RPC
500 on the server.
501 """
502 # Generate a "stub function" on-the-fly which will actually make
503 # the request.
504 # gRPC handles serialization and deserialization, so we just need
505 # to pass in the functions for each.
506 if "modify_ack_deadline" not in self._stubs:
507 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary(
508 "/google.pubsub.v1.Subscriber/ModifyAckDeadline",
509 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize,
510 response_deserializer=empty_pb2.Empty.FromString,
511 )
512 return self._stubs["modify_ack_deadline"]
513
514 @property
515 def acknowledge(self) -> Callable[[pubsub.AcknowledgeRequest], empty_pb2.Empty]:
516 r"""Return a callable for the acknowledge method over gRPC.
517
518 Acknowledges the messages associated with the ``ack_ids`` in the
519 ``AcknowledgeRequest``. The Pub/Sub system can remove the
520 relevant messages from the subscription.
521
522 Acknowledging a message whose ack deadline has expired may
523 succeed, but such a message may be redelivered later.
524 Acknowledging a message more than once will not result in an
525 error.
526
527 Returns:
528 Callable[[~.AcknowledgeRequest],
529 ~.Empty]:
530 A function that, when called, will call the underlying RPC
531 on the server.
532 """
533 # Generate a "stub function" on-the-fly which will actually make
534 # the request.
535 # gRPC handles serialization and deserialization, so we just need
536 # to pass in the functions for each.
537 if "acknowledge" not in self._stubs:
538 self._stubs["acknowledge"] = self._logged_channel.unary_unary(
539 "/google.pubsub.v1.Subscriber/Acknowledge",
540 request_serializer=pubsub.AcknowledgeRequest.serialize,
541 response_deserializer=empty_pb2.Empty.FromString,
542 )
543 return self._stubs["acknowledge"]
544
545 @property
546 def pull(self) -> Callable[[pubsub.PullRequest], pubsub.PullResponse]:
547 r"""Return a callable for the pull method over gRPC.
548
549 Pulls messages from the server.
550
551 Returns:
552 Callable[[~.PullRequest],
553 ~.PullResponse]:
554 A function that, when called, will call the underlying RPC
555 on the server.
556 """
557 # Generate a "stub function" on-the-fly which will actually make
558 # the request.
559 # gRPC handles serialization and deserialization, so we just need
560 # to pass in the functions for each.
561 if "pull" not in self._stubs:
562 self._stubs["pull"] = self._logged_channel.unary_unary(
563 "/google.pubsub.v1.Subscriber/Pull",
564 request_serializer=pubsub.PullRequest.serialize,
565 response_deserializer=pubsub.PullResponse.deserialize,
566 )
567 return self._stubs["pull"]
568
569 @property
570 def streaming_pull(
571 self,
572 ) -> Callable[[pubsub.StreamingPullRequest], pubsub.StreamingPullResponse]:
573 r"""Return a callable for the streaming pull method over gRPC.
574
575 Establishes a stream with the server, which sends messages down
576 to the client. The client streams acknowledgments and ack
577 deadline modifications back to the server. The server will close
578 the stream and return the status on any error. The server may
579 close the stream with status ``UNAVAILABLE`` to reassign
580 server-side resources, in which case, the client should
581 re-establish the stream. Flow control can be achieved by
582 configuring the underlying RPC channel.
583
584 Returns:
585 Callable[[~.StreamingPullRequest],
586 ~.StreamingPullResponse]:
587 A function that, when called, will call the underlying RPC
588 on the server.
589 """
590 # Generate a "stub function" on-the-fly which will actually make
591 # the request.
592 # gRPC handles serialization and deserialization, so we just need
593 # to pass in the functions for each.
594 if "streaming_pull" not in self._stubs:
595 self._stubs["streaming_pull"] = self._logged_channel.stream_stream(
596 "/google.pubsub.v1.Subscriber/StreamingPull",
597 request_serializer=pubsub.StreamingPullRequest.serialize,
598 response_deserializer=pubsub.StreamingPullResponse.deserialize,
599 )
600 return self._stubs["streaming_pull"]
601
602 @property
603 def modify_push_config(
604 self,
605 ) -> Callable[[pubsub.ModifyPushConfigRequest], empty_pb2.Empty]:
606 r"""Return a callable for the modify push config method over gRPC.
607
608 Modifies the ``PushConfig`` for a specified subscription.
609
610 This may be used to change a push subscription to a pull one
611 (signified by an empty ``PushConfig``) or vice versa, or change
612 the endpoint URL and other attributes of a push subscription.
613 Messages will accumulate for delivery continuously through the
614 call regardless of changes to the ``PushConfig``.
615
616 Returns:
617 Callable[[~.ModifyPushConfigRequest],
618 ~.Empty]:
619 A function that, when called, will call the underlying RPC
620 on the server.
621 """
622 # Generate a "stub function" on-the-fly which will actually make
623 # the request.
624 # gRPC handles serialization and deserialization, so we just need
625 # to pass in the functions for each.
626 if "modify_push_config" not in self._stubs:
627 self._stubs["modify_push_config"] = self._logged_channel.unary_unary(
628 "/google.pubsub.v1.Subscriber/ModifyPushConfig",
629 request_serializer=pubsub.ModifyPushConfigRequest.serialize,
630 response_deserializer=empty_pb2.Empty.FromString,
631 )
632 return self._stubs["modify_push_config"]
633
634 @property
635 def get_snapshot(self) -> Callable[[pubsub.GetSnapshotRequest], pubsub.Snapshot]:
636 r"""Return a callable for the get snapshot method over gRPC.
637
638 Gets the configuration details of a snapshot. Snapshots are used
639 in
640 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
641 operations, which allow you to manage message acknowledgments in
642 bulk. That is, you can set the acknowledgment state of messages
643 in an existing subscription to the state captured by a snapshot.
644
645 Returns:
646 Callable[[~.GetSnapshotRequest],
647 ~.Snapshot]:
648 A function that, when called, will call the underlying RPC
649 on the server.
650 """
651 # Generate a "stub function" on-the-fly which will actually make
652 # the request.
653 # gRPC handles serialization and deserialization, so we just need
654 # to pass in the functions for each.
655 if "get_snapshot" not in self._stubs:
656 self._stubs["get_snapshot"] = self._logged_channel.unary_unary(
657 "/google.pubsub.v1.Subscriber/GetSnapshot",
658 request_serializer=pubsub.GetSnapshotRequest.serialize,
659 response_deserializer=pubsub.Snapshot.deserialize,
660 )
661 return self._stubs["get_snapshot"]
662
663 @property
664 def list_snapshots(
665 self,
666 ) -> Callable[[pubsub.ListSnapshotsRequest], pubsub.ListSnapshotsResponse]:
667 r"""Return a callable for the list snapshots method over gRPC.
668
669 Lists the existing snapshots. Snapshots are used in
670 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
671 operations, which allow you to manage message acknowledgments in
672 bulk. That is, you can set the acknowledgment state of messages
673 in an existing subscription to the state captured by a snapshot.
674
675 Returns:
676 Callable[[~.ListSnapshotsRequest],
677 ~.ListSnapshotsResponse]:
678 A function that, when called, will call the underlying RPC
679 on the server.
680 """
681 # Generate a "stub function" on-the-fly which will actually make
682 # the request.
683 # gRPC handles serialization and deserialization, so we just need
684 # to pass in the functions for each.
685 if "list_snapshots" not in self._stubs:
686 self._stubs["list_snapshots"] = self._logged_channel.unary_unary(
687 "/google.pubsub.v1.Subscriber/ListSnapshots",
688 request_serializer=pubsub.ListSnapshotsRequest.serialize,
689 response_deserializer=pubsub.ListSnapshotsResponse.deserialize,
690 )
691 return self._stubs["list_snapshots"]
692
693 @property
694 def create_snapshot(
695 self,
696 ) -> Callable[[pubsub.CreateSnapshotRequest], pubsub.Snapshot]:
697 r"""Return a callable for the create snapshot method over gRPC.
698
699 Creates a snapshot from the requested subscription. Snapshots
700 are used in
701 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
702 operations, which allow you to manage message acknowledgments in
703 bulk. That is, you can set the acknowledgment state of messages
704 in an existing subscription to the state captured by a snapshot.
705 If the snapshot already exists, returns ``ALREADY_EXISTS``. If
706 the requested subscription doesn't exist, returns ``NOT_FOUND``.
707 If the backlog in the subscription is too old -- and the
708 resulting snapshot would expire in less than 1 hour -- then
709 ``FAILED_PRECONDITION`` is returned. See also the
710 ``Snapshot.expire_time`` field. If the name is not provided in
711 the request, the server will assign a random name for this
712 snapshot on the same project as the subscription, conforming to
713 the [resource name format]
714 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
715 The generated name is populated in the returned Snapshot object.
716 Note that for REST API requests, you must specify a name in the
717 request.
718
719 Returns:
720 Callable[[~.CreateSnapshotRequest],
721 ~.Snapshot]:
722 A function that, when called, will call the underlying RPC
723 on the server.
724 """
725 # Generate a "stub function" on-the-fly which will actually make
726 # the request.
727 # gRPC handles serialization and deserialization, so we just need
728 # to pass in the functions for each.
729 if "create_snapshot" not in self._stubs:
730 self._stubs["create_snapshot"] = self._logged_channel.unary_unary(
731 "/google.pubsub.v1.Subscriber/CreateSnapshot",
732 request_serializer=pubsub.CreateSnapshotRequest.serialize,
733 response_deserializer=pubsub.Snapshot.deserialize,
734 )
735 return self._stubs["create_snapshot"]
736
737 @property
738 def update_snapshot(
739 self,
740 ) -> Callable[[pubsub.UpdateSnapshotRequest], pubsub.Snapshot]:
741 r"""Return a callable for the update snapshot method over gRPC.
742
743 Updates an existing snapshot by updating the fields specified in
744 the update mask. Snapshots are used in
745 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
746 operations, which allow you to manage message acknowledgments in
747 bulk. That is, you can set the acknowledgment state of messages
748 in an existing subscription to the state captured by a snapshot.
749
750 Returns:
751 Callable[[~.UpdateSnapshotRequest],
752 ~.Snapshot]:
753 A function that, when called, will call the underlying RPC
754 on the server.
755 """
756 # Generate a "stub function" on-the-fly which will actually make
757 # the request.
758 # gRPC handles serialization and deserialization, so we just need
759 # to pass in the functions for each.
760 if "update_snapshot" not in self._stubs:
761 self._stubs["update_snapshot"] = self._logged_channel.unary_unary(
762 "/google.pubsub.v1.Subscriber/UpdateSnapshot",
763 request_serializer=pubsub.UpdateSnapshotRequest.serialize,
764 response_deserializer=pubsub.Snapshot.deserialize,
765 )
766 return self._stubs["update_snapshot"]
767
768 @property
769 def delete_snapshot(
770 self,
771 ) -> Callable[[pubsub.DeleteSnapshotRequest], empty_pb2.Empty]:
772 r"""Return a callable for the delete snapshot method over gRPC.
773
774 Removes an existing snapshot. Snapshots are used in [Seek]
775 (https://cloud.google.com/pubsub/docs/replay-overview)
776 operations, which allow you to manage message acknowledgments in
777 bulk. That is, you can set the acknowledgment state of messages
778 in an existing subscription to the state captured by a snapshot.
779 When the snapshot is deleted, all messages retained in the
780 snapshot are immediately dropped. After a snapshot is deleted, a
781 new one may be created with the same name, but the new one has
782 no association with the old snapshot or its subscription, unless
783 the same subscription is specified.
784
785 Returns:
786 Callable[[~.DeleteSnapshotRequest],
787 ~.Empty]:
788 A function that, when called, will call the underlying RPC
789 on the server.
790 """
791 # Generate a "stub function" on-the-fly which will actually make
792 # the request.
793 # gRPC handles serialization and deserialization, so we just need
794 # to pass in the functions for each.
795 if "delete_snapshot" not in self._stubs:
796 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary(
797 "/google.pubsub.v1.Subscriber/DeleteSnapshot",
798 request_serializer=pubsub.DeleteSnapshotRequest.serialize,
799 response_deserializer=empty_pb2.Empty.FromString,
800 )
801 return self._stubs["delete_snapshot"]
802
803 @property
804 def seek(self) -> Callable[[pubsub.SeekRequest], pubsub.SeekResponse]:
805 r"""Return a callable for the seek method over gRPC.
806
807 Seeks an existing subscription to a point in time or to a given
808 snapshot, whichever is provided in the request. Snapshots are
809 used in [Seek]
810 (https://cloud.google.com/pubsub/docs/replay-overview)
811 operations, which allow you to manage message acknowledgments in
812 bulk. That is, you can set the acknowledgment state of messages
813 in an existing subscription to the state captured by a snapshot.
814 Note that both the subscription and the snapshot must be on the
815 same topic.
816
817 Returns:
818 Callable[[~.SeekRequest],
819 ~.SeekResponse]:
820 A function that, when called, will call the underlying RPC
821 on the server.
822 """
823 # Generate a "stub function" on-the-fly which will actually make
824 # the request.
825 # gRPC handles serialization and deserialization, so we just need
826 # to pass in the functions for each.
827 if "seek" not in self._stubs:
828 self._stubs["seek"] = self._logged_channel.unary_unary(
829 "/google.pubsub.v1.Subscriber/Seek",
830 request_serializer=pubsub.SeekRequest.serialize,
831 response_deserializer=pubsub.SeekResponse.deserialize,
832 )
833 return self._stubs["seek"]
834
835 def close(self):
836 self._logged_channel.close()
837
838 @property
839 def set_iam_policy(
840 self,
841 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]:
842 r"""Return a callable for the set iam policy method over gRPC.
843 Sets the IAM access control policy on the specified
844 function. Replaces any existing policy.
845 Returns:
846 Callable[[~.SetIamPolicyRequest],
847 ~.Policy]:
848 A function that, when called, will call the underlying RPC
849 on the server.
850 """
851 # Generate a "stub function" on-the-fly which will actually make
852 # the request.
853 # gRPC handles serialization and deserialization, so we just need
854 # to pass in the functions for each.
855 if "set_iam_policy" not in self._stubs:
856 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
857 "/google.iam.v1.IAMPolicy/SetIamPolicy",
858 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
859 response_deserializer=policy_pb2.Policy.FromString,
860 )
861 return self._stubs["set_iam_policy"]
862
863 @property
864 def get_iam_policy(
865 self,
866 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]:
867 r"""Return a callable for the get iam policy method over gRPC.
868 Gets the IAM access control policy for a function.
869 Returns an empty policy if the function exists and does
870 not have a policy set.
871 Returns:
872 Callable[[~.GetIamPolicyRequest],
873 ~.Policy]:
874 A function that, when called, will call the underlying RPC
875 on the server.
876 """
877 # Generate a "stub function" on-the-fly which will actually make
878 # the request.
879 # gRPC handles serialization and deserialization, so we just need
880 # to pass in the functions for each.
881 if "get_iam_policy" not in self._stubs:
882 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
883 "/google.iam.v1.IAMPolicy/GetIamPolicy",
884 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
885 response_deserializer=policy_pb2.Policy.FromString,
886 )
887 return self._stubs["get_iam_policy"]
888
889 @property
890 def test_iam_permissions(
891 self,
892 ) -> Callable[
893 [iam_policy_pb2.TestIamPermissionsRequest],
894 iam_policy_pb2.TestIamPermissionsResponse,
895 ]:
896 r"""Return a callable for the test iam permissions method over gRPC.
897 Tests the specified permissions against the IAM access control
898 policy for a function. If the function does not exist, this will
899 return an empty set of permissions, not a NOT_FOUND error.
900 Returns:
901 Callable[[~.TestIamPermissionsRequest],
902 ~.TestIamPermissionsResponse]:
903 A function that, when called, will call the underlying RPC
904 on the server.
905 """
906 # Generate a "stub function" on-the-fly which will actually make
907 # the request.
908 # gRPC handles serialization and deserialization, so we just need
909 # to pass in the functions for each.
910 if "test_iam_permissions" not in self._stubs:
911 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
912 "/google.iam.v1.IAMPolicy/TestIamPermissions",
913 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
914 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
915 )
916 return self._stubs["test_iam_permissions"]
917
918 @property
919 def kind(self) -> str:
920 return "grpc"
921
922
923__all__ = ("SubscriberGrpcTransport",)