1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json
17import logging as std_logging
18import pickle
19import warnings
20from typing import Callable, Dict, Optional, Sequence, Tuple, Union
21
22from google.api_core import grpc_helpers
23from google.api_core import gapic_v1
24import google.auth # type: ignore
25from google.auth import credentials as ga_credentials # type: ignore
26from google.auth.transport.grpc import SslCredentials # type: ignore
27from google.protobuf.json_format import MessageToJson
28import google.protobuf.message
29
30import grpc # type: ignore
31import proto # type: ignore
32
33from google.iam.v1 import iam_policy_pb2 # type: ignore
34from google.iam.v1 import policy_pb2 # type: ignore
35from google.protobuf import empty_pb2 # type: ignore
36from google.pubsub_v1.types import pubsub
37from .base import SubscriberTransport, DEFAULT_CLIENT_INFO
38
39try:
40 from google.api_core import client_logging # type: ignore
41
42 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
43except ImportError: # pragma: NO COVER
44 CLIENT_LOGGING_SUPPORTED = False
45
46_LOGGER = std_logging.getLogger(__name__)
47
48
49class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
50 def intercept_unary_unary(self, continuation, client_call_details, request):
51 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
52 std_logging.DEBUG
53 )
54 if logging_enabled: # pragma: NO COVER
55 request_metadata = client_call_details.metadata
56 if isinstance(request, proto.Message):
57 request_payload = type(request).to_json(request)
58 elif isinstance(request, google.protobuf.message.Message):
59 request_payload = MessageToJson(request)
60 else:
61 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
62
63 request_metadata = {
64 key: value.decode("utf-8") if isinstance(value, bytes) else value
65 for key, value in request_metadata
66 }
67 grpc_request = {
68 "payload": request_payload,
69 "requestMethod": "grpc",
70 "metadata": dict(request_metadata),
71 }
72 _LOGGER.debug(
73 f"Sending request for {client_call_details.method}",
74 extra={
75 "serviceName": "google.pubsub.v1.Subscriber",
76 "rpcName": str(client_call_details.method),
77 "request": grpc_request,
78 "metadata": grpc_request["metadata"],
79 },
80 )
81 response = continuation(client_call_details, request)
82 if logging_enabled: # pragma: NO COVER
83 response_metadata = response.trailing_metadata()
84 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
85 metadata = (
86 dict([(k, str(v)) for k, v in response_metadata])
87 if response_metadata
88 else None
89 )
90 result = response.result()
91 if isinstance(result, proto.Message):
92 response_payload = type(result).to_json(result)
93 elif isinstance(result, google.protobuf.message.Message):
94 response_payload = MessageToJson(result)
95 else:
96 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
97 grpc_response = {
98 "payload": response_payload,
99 "metadata": metadata,
100 "status": "OK",
101 }
102 _LOGGER.debug(
103 f"Received response for {client_call_details.method}.",
104 extra={
105 "serviceName": "google.pubsub.v1.Subscriber",
106 "rpcName": client_call_details.method,
107 "response": grpc_response,
108 "metadata": grpc_response["metadata"],
109 },
110 )
111 return response
112
113
114class SubscriberGrpcTransport(SubscriberTransport):
115 """gRPC backend transport for Subscriber.
116
117 The service that an application uses to manipulate subscriptions and
118 to consume messages from a subscription via the ``Pull`` method or
119 by establishing a bi-directional stream using the ``StreamingPull``
120 method.
121
122 This class defines the same methods as the primary client, so the
123 primary client can load the underlying transport implementation
124 and call it.
125
126 It sends protocol buffers over the wire using gRPC (which is built on
127 top of HTTP/2); the ``grpcio`` package must be installed.
128 """
129
130 _stubs: Dict[str, Callable]
131
132 def __init__(
133 self,
134 *,
135 host: str = "pubsub.googleapis.com",
136 credentials: Optional[ga_credentials.Credentials] = None,
137 credentials_file: Optional[str] = None,
138 scopes: Optional[Sequence[str]] = None,
139 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None,
140 api_mtls_endpoint: Optional[str] = None,
141 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
142 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
143 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
144 quota_project_id: Optional[str] = None,
145 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
146 always_use_jwt_access: Optional[bool] = False,
147 api_audience: Optional[str] = None,
148 ) -> None:
149 """Instantiate the transport.
150
151 Args:
152 host (Optional[str]):
153 The hostname to connect to (default: 'pubsub.googleapis.com').
154 credentials (Optional[google.auth.credentials.Credentials]): The
155 authorization credentials to attach to requests. These
156 credentials identify the application to the service; if none
157 are specified, the client will attempt to ascertain the
158 credentials from the environment.
159 This argument is ignored if a ``channel`` instance is provided.
160 credentials_file (Optional[str]): A file with credentials that can
161 be loaded with :func:`google.auth.load_credentials_from_file`.
162 This argument is ignored if a ``channel`` instance is provided.
163 scopes (Optional(Sequence[str])): A list of scopes. This argument is
164 ignored if a ``channel`` instance is provided.
165 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]):
166 A ``Channel`` instance through which to make calls, or a Callable
167 that constructs and returns one. If set to None, ``self.create_channel``
168 is used to create the channel. If a Callable is given, it will be called
169 with the same arguments as used in ``self.create_channel``.
170 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
171 If provided, it overrides the ``host`` argument and tries to create
172 a mutual TLS channel with client SSL credentials from
173 ``client_cert_source`` or application default SSL credentials.
174 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
175 Deprecated. A callback to provide client SSL certificate bytes and
176 private key bytes, both in PEM format. It is ignored if
177 ``api_mtls_endpoint`` is None.
178 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
179 for the grpc channel. It is ignored if a ``channel`` instance is provided.
180 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
181 A callback to provide client certificate bytes and private key bytes,
182 both in PEM format. It is used to configure a mutual TLS channel. It is
183 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
184 quota_project_id (Optional[str]): An optional project to use for billing
185 and quota.
186 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
187 The client info used to send a user-agent string along with
188 API requests. If ``None``, then default info will be used.
189 Generally, you only need to set this if you're developing
190 your own client library.
191 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
192 be used for service account credentials.
193
194 Raises:
195 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
196 creation failed for any reason.
197 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
198 and ``credentials_file`` are passed.
199 """
200 self._grpc_channel = None
201 self._ssl_channel_credentials = ssl_channel_credentials
202 self._stubs: Dict[str, Callable] = {}
203
204 if api_mtls_endpoint:
205 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
206 if client_cert_source:
207 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
208
209 if isinstance(channel, grpc.Channel):
210 # Ignore credentials if a channel was passed.
211 credentials = None
212 self._ignore_credentials = True
213 # If a channel was explicitly provided, set it.
214 self._grpc_channel = channel
215 self._ssl_channel_credentials = None
216
217 else:
218 if api_mtls_endpoint:
219 host = api_mtls_endpoint
220
221 # Create SSL credentials with client_cert_source or application
222 # default SSL credentials.
223 if client_cert_source:
224 cert, key = client_cert_source()
225 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
226 certificate_chain=cert, private_key=key
227 )
228 else:
229 self._ssl_channel_credentials = SslCredentials().ssl_credentials
230
231 else:
232 if client_cert_source_for_mtls and not ssl_channel_credentials:
233 cert, key = client_cert_source_for_mtls()
234 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
235 certificate_chain=cert, private_key=key
236 )
237
238 # The base transport sets the host, credentials and scopes
239 super().__init__(
240 host=host,
241 credentials=credentials,
242 credentials_file=credentials_file,
243 scopes=scopes,
244 quota_project_id=quota_project_id,
245 client_info=client_info,
246 always_use_jwt_access=always_use_jwt_access,
247 api_audience=api_audience,
248 )
249
250 if not self._grpc_channel:
251 # initialize with the provided callable or the default channel
252 channel_init = channel or type(self).create_channel
253 self._grpc_channel = channel_init(
254 self._host,
255 # use the credentials which are saved
256 credentials=self._credentials,
257 # Set ``credentials_file`` to ``None`` here as
258 # the credentials that we saved earlier should be used.
259 credentials_file=None,
260 scopes=self._scopes,
261 ssl_credentials=self._ssl_channel_credentials,
262 quota_project_id=quota_project_id,
263 options=[
264 ("grpc.max_send_message_length", -1),
265 ("grpc.max_receive_message_length", -1),
266 ("grpc.max_metadata_size", 4 * 1024 * 1024),
267 ("grpc.keepalive_time_ms", 30000),
268 ],
269 )
270
271 self._interceptor = _LoggingClientInterceptor()
272 self._logged_channel = grpc.intercept_channel(
273 self._grpc_channel, self._interceptor
274 )
275
276 # Wrap messages. This must be done after self._logged_channel exists
277 self._prep_wrapped_messages(client_info)
278
279 @classmethod
280 def create_channel(
281 cls,
282 host: str = "pubsub.googleapis.com",
283 credentials: Optional[ga_credentials.Credentials] = None,
284 credentials_file: Optional[str] = None,
285 scopes: Optional[Sequence[str]] = None,
286 quota_project_id: Optional[str] = None,
287 **kwargs,
288 ) -> grpc.Channel:
289 """Create and return a gRPC channel object.
290 Args:
291 host (Optional[str]): The host for the channel to use.
292 credentials (Optional[~.Credentials]): The
293 authorization credentials to attach to requests. These
294 credentials identify this application to the service. If
295 none are specified, the client will attempt to ascertain
296 the credentials from the environment.
297 credentials_file (Optional[str]): A file with credentials that can
298 be loaded with :func:`google.auth.load_credentials_from_file`.
299 This argument is mutually exclusive with credentials.
300 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
301 service. These are only used when credentials are not specified and
302 are passed to :func:`google.auth.default`.
303 quota_project_id (Optional[str]): An optional project to use for billing
304 and quota.
305 kwargs (Optional[dict]): Keyword arguments, which are passed to the
306 channel creation.
307 Returns:
308 grpc.Channel: A gRPC channel object.
309
310 Raises:
311 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
312 and ``credentials_file`` are passed.
313 """
314
315 return grpc_helpers.create_channel(
316 host,
317 credentials=credentials,
318 credentials_file=credentials_file,
319 quota_project_id=quota_project_id,
320 default_scopes=cls.AUTH_SCOPES,
321 scopes=scopes,
322 default_host=cls.DEFAULT_HOST,
323 **kwargs,
324 )
325
326 @property
327 def grpc_channel(self) -> grpc.Channel:
328 """Return the channel designed to connect to this service."""
329 return self._grpc_channel
330
331 @property
332 def create_subscription(
333 self,
334 ) -> Callable[[pubsub.Subscription], pubsub.Subscription]:
335 r"""Return a callable for the create subscription method over gRPC.
336
337 Creates a subscription to a given topic. See the [resource name
338 rules]
339 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
340 If the subscription already exists, returns ``ALREADY_EXISTS``.
341 If the corresponding topic doesn't exist, returns ``NOT_FOUND``.
342
343 If the name is not provided in the request, the server will
344 assign a random name for this subscription on the same project
345 as the topic, conforming to the [resource name format]
346 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
347 The generated name is populated in the returned Subscription
348 object. Note that for REST API requests, you must specify a name
349 in the request.
350
351 Returns:
352 Callable[[~.Subscription],
353 ~.Subscription]:
354 A function that, when called, will call the underlying RPC
355 on the server.
356 """
357 # Generate a "stub function" on-the-fly which will actually make
358 # the request.
359 # gRPC handles serialization and deserialization, so we just need
360 # to pass in the functions for each.
361 if "create_subscription" not in self._stubs:
362 self._stubs["create_subscription"] = self._logged_channel.unary_unary(
363 "/google.pubsub.v1.Subscriber/CreateSubscription",
364 request_serializer=pubsub.Subscription.serialize,
365 response_deserializer=pubsub.Subscription.deserialize,
366 )
367 return self._stubs["create_subscription"]
368
369 @property
370 def get_subscription(
371 self,
372 ) -> Callable[[pubsub.GetSubscriptionRequest], pubsub.Subscription]:
373 r"""Return a callable for the get subscription method over gRPC.
374
375 Gets the configuration details of a subscription.
376
377 Returns:
378 Callable[[~.GetSubscriptionRequest],
379 ~.Subscription]:
380 A function that, when called, will call the underlying RPC
381 on the server.
382 """
383 # Generate a "stub function" on-the-fly which will actually make
384 # the request.
385 # gRPC handles serialization and deserialization, so we just need
386 # to pass in the functions for each.
387 if "get_subscription" not in self._stubs:
388 self._stubs["get_subscription"] = self._logged_channel.unary_unary(
389 "/google.pubsub.v1.Subscriber/GetSubscription",
390 request_serializer=pubsub.GetSubscriptionRequest.serialize,
391 response_deserializer=pubsub.Subscription.deserialize,
392 )
393 return self._stubs["get_subscription"]
394
395 @property
396 def update_subscription(
397 self,
398 ) -> Callable[[pubsub.UpdateSubscriptionRequest], pubsub.Subscription]:
399 r"""Return a callable for the update subscription method over gRPC.
400
401 Updates an existing subscription by updating the
402 fields specified in the update mask. Note that certain
403 properties of a subscription, such as its topic, are not
404 modifiable.
405
406 Returns:
407 Callable[[~.UpdateSubscriptionRequest],
408 ~.Subscription]:
409 A function that, when called, will call the underlying RPC
410 on the server.
411 """
412 # Generate a "stub function" on-the-fly which will actually make
413 # the request.
414 # gRPC handles serialization and deserialization, so we just need
415 # to pass in the functions for each.
416 if "update_subscription" not in self._stubs:
417 self._stubs["update_subscription"] = self._logged_channel.unary_unary(
418 "/google.pubsub.v1.Subscriber/UpdateSubscription",
419 request_serializer=pubsub.UpdateSubscriptionRequest.serialize,
420 response_deserializer=pubsub.Subscription.deserialize,
421 )
422 return self._stubs["update_subscription"]
423
424 @property
425 def list_subscriptions(
426 self,
427 ) -> Callable[[pubsub.ListSubscriptionsRequest], pubsub.ListSubscriptionsResponse]:
428 r"""Return a callable for the list subscriptions method over gRPC.
429
430 Lists matching subscriptions.
431
432 Returns:
433 Callable[[~.ListSubscriptionsRequest],
434 ~.ListSubscriptionsResponse]:
435 A function that, when called, will call the underlying RPC
436 on the server.
437 """
438 # Generate a "stub function" on-the-fly which will actually make
439 # the request.
440 # gRPC handles serialization and deserialization, so we just need
441 # to pass in the functions for each.
442 if "list_subscriptions" not in self._stubs:
443 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary(
444 "/google.pubsub.v1.Subscriber/ListSubscriptions",
445 request_serializer=pubsub.ListSubscriptionsRequest.serialize,
446 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize,
447 )
448 return self._stubs["list_subscriptions"]
449
450 @property
451 def delete_subscription(
452 self,
453 ) -> Callable[[pubsub.DeleteSubscriptionRequest], empty_pb2.Empty]:
454 r"""Return a callable for the delete subscription method over gRPC.
455
456 Deletes an existing subscription. All messages retained in the
457 subscription are immediately dropped. Calls to ``Pull`` after
458 deletion will return ``NOT_FOUND``. After a subscription is
459 deleted, a new one may be created with the same name, but the
460 new one has no association with the old subscription or its
461 topic unless the same topic is specified.
462
463 Returns:
464 Callable[[~.DeleteSubscriptionRequest],
465 ~.Empty]:
466 A function that, when called, will call the underlying RPC
467 on the server.
468 """
469 # Generate a "stub function" on-the-fly which will actually make
470 # the request.
471 # gRPC handles serialization and deserialization, so we just need
472 # to pass in the functions for each.
473 if "delete_subscription" not in self._stubs:
474 self._stubs["delete_subscription"] = self._logged_channel.unary_unary(
475 "/google.pubsub.v1.Subscriber/DeleteSubscription",
476 request_serializer=pubsub.DeleteSubscriptionRequest.serialize,
477 response_deserializer=empty_pb2.Empty.FromString,
478 )
479 return self._stubs["delete_subscription"]
480
481 @property
482 def modify_ack_deadline(
483 self,
484 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], empty_pb2.Empty]:
485 r"""Return a callable for the modify ack deadline method over gRPC.
486
487 Modifies the ack deadline for a specific message. This method is
488 useful to indicate that more time is needed to process a message
489 by the subscriber, or to make the message available for
490 redelivery if the processing was interrupted. Note that this
491 does not modify the subscription-level ``ackDeadlineSeconds``
492 used for subsequent messages.
493
494 Returns:
495 Callable[[~.ModifyAckDeadlineRequest],
496 ~.Empty]:
497 A function that, when called, will call the underlying RPC
498 on the server.
499 """
500 # Generate a "stub function" on-the-fly which will actually make
501 # the request.
502 # gRPC handles serialization and deserialization, so we just need
503 # to pass in the functions for each.
504 if "modify_ack_deadline" not in self._stubs:
505 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary(
506 "/google.pubsub.v1.Subscriber/ModifyAckDeadline",
507 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize,
508 response_deserializer=empty_pb2.Empty.FromString,
509 )
510 return self._stubs["modify_ack_deadline"]
511
512 @property
513 def acknowledge(self) -> Callable[[pubsub.AcknowledgeRequest], empty_pb2.Empty]:
514 r"""Return a callable for the acknowledge method over gRPC.
515
516 Acknowledges the messages associated with the ``ack_ids`` in the
517 ``AcknowledgeRequest``. The Pub/Sub system can remove the
518 relevant messages from the subscription.
519
520 Acknowledging a message whose ack deadline has expired may
521 succeed, but such a message may be redelivered later.
522 Acknowledging a message more than once will not result in an
523 error.
524
525 Returns:
526 Callable[[~.AcknowledgeRequest],
527 ~.Empty]:
528 A function that, when called, will call the underlying RPC
529 on the server.
530 """
531 # Generate a "stub function" on-the-fly which will actually make
532 # the request.
533 # gRPC handles serialization and deserialization, so we just need
534 # to pass in the functions for each.
535 if "acknowledge" not in self._stubs:
536 self._stubs["acknowledge"] = self._logged_channel.unary_unary(
537 "/google.pubsub.v1.Subscriber/Acknowledge",
538 request_serializer=pubsub.AcknowledgeRequest.serialize,
539 response_deserializer=empty_pb2.Empty.FromString,
540 )
541 return self._stubs["acknowledge"]
542
543 @property
544 def pull(self) -> Callable[[pubsub.PullRequest], pubsub.PullResponse]:
545 r"""Return a callable for the pull method over gRPC.
546
547 Pulls messages from the server.
548
549 Returns:
550 Callable[[~.PullRequest],
551 ~.PullResponse]:
552 A function that, when called, will call the underlying RPC
553 on the server.
554 """
555 # Generate a "stub function" on-the-fly which will actually make
556 # the request.
557 # gRPC handles serialization and deserialization, so we just need
558 # to pass in the functions for each.
559 if "pull" not in self._stubs:
560 self._stubs["pull"] = self._logged_channel.unary_unary(
561 "/google.pubsub.v1.Subscriber/Pull",
562 request_serializer=pubsub.PullRequest.serialize,
563 response_deserializer=pubsub.PullResponse.deserialize,
564 )
565 return self._stubs["pull"]
566
567 @property
568 def streaming_pull(
569 self,
570 ) -> Callable[[pubsub.StreamingPullRequest], pubsub.StreamingPullResponse]:
571 r"""Return a callable for the streaming pull method over gRPC.
572
573 Establishes a stream with the server, which sends messages down
574 to the client. The client streams acknowledgments and ack
575 deadline modifications back to the server. The server will close
576 the stream and return the status on any error. The server may
577 close the stream with status ``UNAVAILABLE`` to reassign
578 server-side resources, in which case, the client should
579 re-establish the stream. Flow control can be achieved by
580 configuring the underlying RPC channel.
581
582 Returns:
583 Callable[[~.StreamingPullRequest],
584 ~.StreamingPullResponse]:
585 A function that, when called, will call the underlying RPC
586 on the server.
587 """
588 # Generate a "stub function" on-the-fly which will actually make
589 # the request.
590 # gRPC handles serialization and deserialization, so we just need
591 # to pass in the functions for each.
592 if "streaming_pull" not in self._stubs:
593 self._stubs["streaming_pull"] = self._logged_channel.stream_stream(
594 "/google.pubsub.v1.Subscriber/StreamingPull",
595 request_serializer=pubsub.StreamingPullRequest.serialize,
596 response_deserializer=pubsub.StreamingPullResponse.deserialize,
597 )
598 return self._stubs["streaming_pull"]
599
600 @property
601 def modify_push_config(
602 self,
603 ) -> Callable[[pubsub.ModifyPushConfigRequest], empty_pb2.Empty]:
604 r"""Return a callable for the modify push config method over gRPC.
605
606 Modifies the ``PushConfig`` for a specified subscription.
607
608 This may be used to change a push subscription to a pull one
609 (signified by an empty ``PushConfig``) or vice versa, or change
610 the endpoint URL and other attributes of a push subscription.
611 Messages will accumulate for delivery continuously through the
612 call regardless of changes to the ``PushConfig``.
613
614 Returns:
615 Callable[[~.ModifyPushConfigRequest],
616 ~.Empty]:
617 A function that, when called, will call the underlying RPC
618 on the server.
619 """
620 # Generate a "stub function" on-the-fly which will actually make
621 # the request.
622 # gRPC handles serialization and deserialization, so we just need
623 # to pass in the functions for each.
624 if "modify_push_config" not in self._stubs:
625 self._stubs["modify_push_config"] = self._logged_channel.unary_unary(
626 "/google.pubsub.v1.Subscriber/ModifyPushConfig",
627 request_serializer=pubsub.ModifyPushConfigRequest.serialize,
628 response_deserializer=empty_pb2.Empty.FromString,
629 )
630 return self._stubs["modify_push_config"]
631
632 @property
633 def get_snapshot(self) -> Callable[[pubsub.GetSnapshotRequest], pubsub.Snapshot]:
634 r"""Return a callable for the get snapshot method over gRPC.
635
636 Gets the configuration details of a snapshot. Snapshots are used
637 in
638 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
639 operations, which allow you to manage message acknowledgments in
640 bulk. That is, you can set the acknowledgment state of messages
641 in an existing subscription to the state captured by a snapshot.
642
643 Returns:
644 Callable[[~.GetSnapshotRequest],
645 ~.Snapshot]:
646 A function that, when called, will call the underlying RPC
647 on the server.
648 """
649 # Generate a "stub function" on-the-fly which will actually make
650 # the request.
651 # gRPC handles serialization and deserialization, so we just need
652 # to pass in the functions for each.
653 if "get_snapshot" not in self._stubs:
654 self._stubs["get_snapshot"] = self._logged_channel.unary_unary(
655 "/google.pubsub.v1.Subscriber/GetSnapshot",
656 request_serializer=pubsub.GetSnapshotRequest.serialize,
657 response_deserializer=pubsub.Snapshot.deserialize,
658 )
659 return self._stubs["get_snapshot"]
660
661 @property
662 def list_snapshots(
663 self,
664 ) -> Callable[[pubsub.ListSnapshotsRequest], pubsub.ListSnapshotsResponse]:
665 r"""Return a callable for the list snapshots method over gRPC.
666
667 Lists the existing snapshots. Snapshots are used in
668 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
669 operations, which allow you to manage message acknowledgments in
670 bulk. That is, you can set the acknowledgment state of messages
671 in an existing subscription to the state captured by a snapshot.
672
673 Returns:
674 Callable[[~.ListSnapshotsRequest],
675 ~.ListSnapshotsResponse]:
676 A function that, when called, will call the underlying RPC
677 on the server.
678 """
679 # Generate a "stub function" on-the-fly which will actually make
680 # the request.
681 # gRPC handles serialization and deserialization, so we just need
682 # to pass in the functions for each.
683 if "list_snapshots" not in self._stubs:
684 self._stubs["list_snapshots"] = self._logged_channel.unary_unary(
685 "/google.pubsub.v1.Subscriber/ListSnapshots",
686 request_serializer=pubsub.ListSnapshotsRequest.serialize,
687 response_deserializer=pubsub.ListSnapshotsResponse.deserialize,
688 )
689 return self._stubs["list_snapshots"]
690
691 @property
692 def create_snapshot(
693 self,
694 ) -> Callable[[pubsub.CreateSnapshotRequest], pubsub.Snapshot]:
695 r"""Return a callable for the create snapshot method over gRPC.
696
697 Creates a snapshot from the requested subscription. Snapshots
698 are used in
699 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
700 operations, which allow you to manage message acknowledgments in
701 bulk. That is, you can set the acknowledgment state of messages
702 in an existing subscription to the state captured by a snapshot.
703 If the snapshot already exists, returns ``ALREADY_EXISTS``. If
704 the requested subscription doesn't exist, returns ``NOT_FOUND``.
705 If the backlog in the subscription is too old -- and the
706 resulting snapshot would expire in less than 1 hour -- then
707 ``FAILED_PRECONDITION`` is returned. See also the
708 ``Snapshot.expire_time`` field. If the name is not provided in
709 the request, the server will assign a random name for this
710 snapshot on the same project as the subscription, conforming to
711 the [resource name format]
712 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
713 The generated name is populated in the returned Snapshot object.
714 Note that for REST API requests, you must specify a name in the
715 request.
716
717 Returns:
718 Callable[[~.CreateSnapshotRequest],
719 ~.Snapshot]:
720 A function that, when called, will call the underlying RPC
721 on the server.
722 """
723 # Generate a "stub function" on-the-fly which will actually make
724 # the request.
725 # gRPC handles serialization and deserialization, so we just need
726 # to pass in the functions for each.
727 if "create_snapshot" not in self._stubs:
728 self._stubs["create_snapshot"] = self._logged_channel.unary_unary(
729 "/google.pubsub.v1.Subscriber/CreateSnapshot",
730 request_serializer=pubsub.CreateSnapshotRequest.serialize,
731 response_deserializer=pubsub.Snapshot.deserialize,
732 )
733 return self._stubs["create_snapshot"]
734
735 @property
736 def update_snapshot(
737 self,
738 ) -> Callable[[pubsub.UpdateSnapshotRequest], pubsub.Snapshot]:
739 r"""Return a callable for the update snapshot method over gRPC.
740
741 Updates an existing snapshot by updating the fields specified in
742 the update mask. Snapshots are used in
743 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
744 operations, which allow you to manage message acknowledgments in
745 bulk. That is, you can set the acknowledgment state of messages
746 in an existing subscription to the state captured by a snapshot.
747
748 Returns:
749 Callable[[~.UpdateSnapshotRequest],
750 ~.Snapshot]:
751 A function that, when called, will call the underlying RPC
752 on the server.
753 """
754 # Generate a "stub function" on-the-fly which will actually make
755 # the request.
756 # gRPC handles serialization and deserialization, so we just need
757 # to pass in the functions for each.
758 if "update_snapshot" not in self._stubs:
759 self._stubs["update_snapshot"] = self._logged_channel.unary_unary(
760 "/google.pubsub.v1.Subscriber/UpdateSnapshot",
761 request_serializer=pubsub.UpdateSnapshotRequest.serialize,
762 response_deserializer=pubsub.Snapshot.deserialize,
763 )
764 return self._stubs["update_snapshot"]
765
766 @property
767 def delete_snapshot(
768 self,
769 ) -> Callable[[pubsub.DeleteSnapshotRequest], empty_pb2.Empty]:
770 r"""Return a callable for the delete snapshot method over gRPC.
771
772 Removes an existing snapshot. Snapshots are used in [Seek]
773 (https://cloud.google.com/pubsub/docs/replay-overview)
774 operations, which allow you to manage message acknowledgments in
775 bulk. That is, you can set the acknowledgment state of messages
776 in an existing subscription to the state captured by a snapshot.
777 When the snapshot is deleted, all messages retained in the
778 snapshot are immediately dropped. After a snapshot is deleted, a
779 new one may be created with the same name, but the new one has
780 no association with the old snapshot or its subscription, unless
781 the same subscription is specified.
782
783 Returns:
784 Callable[[~.DeleteSnapshotRequest],
785 ~.Empty]:
786 A function that, when called, will call the underlying RPC
787 on the server.
788 """
789 # Generate a "stub function" on-the-fly which will actually make
790 # the request.
791 # gRPC handles serialization and deserialization, so we just need
792 # to pass in the functions for each.
793 if "delete_snapshot" not in self._stubs:
794 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary(
795 "/google.pubsub.v1.Subscriber/DeleteSnapshot",
796 request_serializer=pubsub.DeleteSnapshotRequest.serialize,
797 response_deserializer=empty_pb2.Empty.FromString,
798 )
799 return self._stubs["delete_snapshot"]
800
801 @property
802 def seek(self) -> Callable[[pubsub.SeekRequest], pubsub.SeekResponse]:
803 r"""Return a callable for the seek method over gRPC.
804
805 Seeks an existing subscription to a point in time or to a given
806 snapshot, whichever is provided in the request. Snapshots are
807 used in [Seek]
808 (https://cloud.google.com/pubsub/docs/replay-overview)
809 operations, which allow you to manage message acknowledgments in
810 bulk. That is, you can set the acknowledgment state of messages
811 in an existing subscription to the state captured by a snapshot.
812 Note that both the subscription and the snapshot must be on the
813 same topic.
814
815 Returns:
816 Callable[[~.SeekRequest],
817 ~.SeekResponse]:
818 A function that, when called, will call the underlying RPC
819 on the server.
820 """
821 # Generate a "stub function" on-the-fly which will actually make
822 # the request.
823 # gRPC handles serialization and deserialization, so we just need
824 # to pass in the functions for each.
825 if "seek" not in self._stubs:
826 self._stubs["seek"] = self._logged_channel.unary_unary(
827 "/google.pubsub.v1.Subscriber/Seek",
828 request_serializer=pubsub.SeekRequest.serialize,
829 response_deserializer=pubsub.SeekResponse.deserialize,
830 )
831 return self._stubs["seek"]
832
833 def close(self):
834 self._logged_channel.close()
835
836 @property
837 def set_iam_policy(
838 self,
839 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]:
840 r"""Return a callable for the set iam policy method over gRPC.
841 Sets the IAM access control policy on the specified
842 function. Replaces any existing policy.
843 Returns:
844 Callable[[~.SetIamPolicyRequest],
845 ~.Policy]:
846 A function that, when called, will call the underlying RPC
847 on the server.
848 """
849 # Generate a "stub function" on-the-fly which will actually make
850 # the request.
851 # gRPC handles serialization and deserialization, so we just need
852 # to pass in the functions for each.
853 if "set_iam_policy" not in self._stubs:
854 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
855 "/google.iam.v1.IAMPolicy/SetIamPolicy",
856 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
857 response_deserializer=policy_pb2.Policy.FromString,
858 )
859 return self._stubs["set_iam_policy"]
860
861 @property
862 def get_iam_policy(
863 self,
864 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]:
865 r"""Return a callable for the get iam policy method over gRPC.
866 Gets the IAM access control policy for a function.
867 Returns an empty policy if the function exists and does
868 not have a policy set.
869 Returns:
870 Callable[[~.GetIamPolicyRequest],
871 ~.Policy]:
872 A function that, when called, will call the underlying RPC
873 on the server.
874 """
875 # Generate a "stub function" on-the-fly which will actually make
876 # the request.
877 # gRPC handles serialization and deserialization, so we just need
878 # to pass in the functions for each.
879 if "get_iam_policy" not in self._stubs:
880 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
881 "/google.iam.v1.IAMPolicy/GetIamPolicy",
882 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
883 response_deserializer=policy_pb2.Policy.FromString,
884 )
885 return self._stubs["get_iam_policy"]
886
887 @property
888 def test_iam_permissions(
889 self,
890 ) -> Callable[
891 [iam_policy_pb2.TestIamPermissionsRequest],
892 iam_policy_pb2.TestIamPermissionsResponse,
893 ]:
894 r"""Return a callable for the test iam permissions method over gRPC.
895 Tests the specified permissions against the IAM access control
896 policy for a function. If the function does not exist, this will
897 return an empty set of permissions, not a NOT_FOUND error.
898 Returns:
899 Callable[[~.TestIamPermissionsRequest],
900 ~.TestIamPermissionsResponse]:
901 A function that, when called, will call the underlying RPC
902 on the server.
903 """
904 # Generate a "stub function" on-the-fly which will actually make
905 # the request.
906 # gRPC handles serialization and deserialization, so we just need
907 # to pass in the functions for each.
908 if "test_iam_permissions" not in self._stubs:
909 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
910 "/google.iam.v1.IAMPolicy/TestIamPermissions",
911 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
912 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
913 )
914 return self._stubs["test_iam_permissions"]
915
916 @property
917 def kind(self) -> str:
918 return "grpc"
919
920
921__all__ = ("SubscriberGrpcTransport",)