1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json
17import logging as std_logging
18import pickle
19import warnings
20from typing import Callable, Dict, Optional, Sequence, Tuple, Union
21
22from google.api_core import grpc_helpers
23from google.api_core import gapic_v1
24import google.auth # type: ignore
25from google.auth import credentials as ga_credentials # type: ignore
26from google.auth.transport.grpc import SslCredentials # type: ignore
27from google.protobuf.json_format import MessageToJson
28import google.protobuf.message
29
30import grpc # type: ignore
31import proto # type: ignore
32
33from google.iam.v1 import iam_policy_pb2 # type: ignore
34from google.iam.v1 import policy_pb2 # type: ignore
35from google.protobuf import empty_pb2 # type: ignore
36from google.pubsub_v1.types import pubsub
37from .base import SubscriberTransport, DEFAULT_CLIENT_INFO
38
39try:
40 from google.api_core import client_logging # type: ignore
41
42 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
43except ImportError: # pragma: NO COVER
44 CLIENT_LOGGING_SUPPORTED = False
45
46_LOGGER = std_logging.getLogger(__name__)
47
48
49class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
50 def intercept_unary_unary(self, continuation, client_call_details, request):
51 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
52 std_logging.DEBUG
53 )
54 if logging_enabled: # pragma: NO COVER
55 request_metadata = client_call_details.metadata
56 if isinstance(request, proto.Message):
57 request_payload = type(request).to_json(request)
58 elif isinstance(request, google.protobuf.message.Message):
59 request_payload = MessageToJson(request)
60 else:
61 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
62
63 request_metadata = {
64 key: value.decode("utf-8") if isinstance(value, bytes) else value
65 for key, value in request_metadata
66 }
67 grpc_request = {
68 "payload": request_payload,
69 "requestMethod": "grpc",
70 "metadata": dict(request_metadata),
71 }
72 _LOGGER.debug(
73 f"Sending request for {client_call_details.method}",
74 extra={
75 "serviceName": "google.pubsub.v1.Subscriber",
76 "rpcName": client_call_details.method,
77 "request": grpc_request,
78 "metadata": grpc_request["metadata"],
79 },
80 )
81
82 response = continuation(client_call_details, request)
83 if logging_enabled: # pragma: NO COVER
84 response_metadata = response.trailing_metadata()
85 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
86 metadata = (
87 dict([(k, str(v)) for k, v in response_metadata])
88 if response_metadata
89 else None
90 )
91 result = response.result()
92 if isinstance(result, proto.Message):
93 response_payload = type(result).to_json(result)
94 elif isinstance(result, google.protobuf.message.Message):
95 response_payload = MessageToJson(result)
96 else:
97 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
98 grpc_response = {
99 "payload": response_payload,
100 "metadata": metadata,
101 "status": "OK",
102 }
103 _LOGGER.debug(
104 f"Received response for {client_call_details.method}.",
105 extra={
106 "serviceName": "google.pubsub.v1.Subscriber",
107 "rpcName": client_call_details.method,
108 "response": grpc_response,
109 "metadata": grpc_response["metadata"],
110 },
111 )
112 return response
113
114
115class SubscriberGrpcTransport(SubscriberTransport):
116 """gRPC backend transport for Subscriber.
117
118 The service that an application uses to manipulate subscriptions and
119 to consume messages from a subscription via the ``Pull`` method or
120 by establishing a bi-directional stream using the ``StreamingPull``
121 method.
122
123 This class defines the same methods as the primary client, so the
124 primary client can load the underlying transport implementation
125 and call it.
126
127 It sends protocol buffers over the wire using gRPC (which is built on
128 top of HTTP/2); the ``grpcio`` package must be installed.
129 """
130
131 _stubs: Dict[str, Callable]
132
133 def __init__(
134 self,
135 *,
136 host: str = "pubsub.googleapis.com",
137 credentials: Optional[ga_credentials.Credentials] = None,
138 credentials_file: Optional[str] = None,
139 scopes: Optional[Sequence[str]] = None,
140 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None,
141 api_mtls_endpoint: Optional[str] = None,
142 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
143 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
144 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
145 quota_project_id: Optional[str] = None,
146 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
147 always_use_jwt_access: Optional[bool] = False,
148 api_audience: Optional[str] = None,
149 ) -> None:
150 """Instantiate the transport.
151
152 Args:
153 host (Optional[str]):
154 The hostname to connect to (default: 'pubsub.googleapis.com').
155 credentials (Optional[google.auth.credentials.Credentials]): The
156 authorization credentials to attach to requests. These
157 credentials identify the application to the service; if none
158 are specified, the client will attempt to ascertain the
159 credentials from the environment.
160 This argument is ignored if a ``channel`` instance is provided.
161 credentials_file (Optional[str]): A file with credentials that can
162 be loaded with :func:`google.auth.load_credentials_from_file`.
163 This argument is ignored if a ``channel`` instance is provided.
164 scopes (Optional(Sequence[str])): A list of scopes. This argument is
165 ignored if a ``channel`` instance is provided.
166 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]):
167 A ``Channel`` instance through which to make calls, or a Callable
168 that constructs and returns one. If set to None, ``self.create_channel``
169 is used to create the channel. If a Callable is given, it will be called
170 with the same arguments as used in ``self.create_channel``.
171 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
172 If provided, it overrides the ``host`` argument and tries to create
173 a mutual TLS channel with client SSL credentials from
174 ``client_cert_source`` or application default SSL credentials.
175 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
176 Deprecated. A callback to provide client SSL certificate bytes and
177 private key bytes, both in PEM format. It is ignored if
178 ``api_mtls_endpoint`` is None.
179 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
180 for the grpc channel. It is ignored if a ``channel`` instance is provided.
181 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
182 A callback to provide client certificate bytes and private key bytes,
183 both in PEM format. It is used to configure a mutual TLS channel. It is
184 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
185 quota_project_id (Optional[str]): An optional project to use for billing
186 and quota.
187 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
188 The client info used to send a user-agent string along with
189 API requests. If ``None``, then default info will be used.
190 Generally, you only need to set this if you're developing
191 your own client library.
192 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
193 be used for service account credentials.
194
195 Raises:
196 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
197 creation failed for any reason.
198 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
199 and ``credentials_file`` are passed.
200 """
201 self._grpc_channel = None
202 self._ssl_channel_credentials = ssl_channel_credentials
203 self._stubs: Dict[str, Callable] = {}
204
205 if api_mtls_endpoint:
206 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
207 if client_cert_source:
208 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
209
210 if isinstance(channel, grpc.Channel):
211 # Ignore credentials if a channel was passed.
212 credentials = None
213 self._ignore_credentials = True
214 # If a channel was explicitly provided, set it.
215 self._grpc_channel = channel
216 self._ssl_channel_credentials = None
217
218 else:
219 if api_mtls_endpoint:
220 host = api_mtls_endpoint
221
222 # Create SSL credentials with client_cert_source or application
223 # default SSL credentials.
224 if client_cert_source:
225 cert, key = client_cert_source()
226 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
227 certificate_chain=cert, private_key=key
228 )
229 else:
230 self._ssl_channel_credentials = SslCredentials().ssl_credentials
231
232 else:
233 if client_cert_source_for_mtls and not ssl_channel_credentials:
234 cert, key = client_cert_source_for_mtls()
235 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
236 certificate_chain=cert, private_key=key
237 )
238
239 # The base transport sets the host, credentials and scopes
240 super().__init__(
241 host=host,
242 credentials=credentials,
243 credentials_file=credentials_file,
244 scopes=scopes,
245 quota_project_id=quota_project_id,
246 client_info=client_info,
247 always_use_jwt_access=always_use_jwt_access,
248 api_audience=api_audience,
249 )
250
251 if not self._grpc_channel:
252 # initialize with the provided callable or the default channel
253 channel_init = channel or type(self).create_channel
254 self._grpc_channel = channel_init(
255 self._host,
256 # use the credentials which are saved
257 credentials=self._credentials,
258 # Set ``credentials_file`` to ``None`` here as
259 # the credentials that we saved earlier should be used.
260 credentials_file=None,
261 scopes=self._scopes,
262 ssl_credentials=self._ssl_channel_credentials,
263 quota_project_id=quota_project_id,
264 options=[
265 ("grpc.max_send_message_length", -1),
266 ("grpc.max_receive_message_length", -1),
267 ("grpc.max_metadata_size", 4 * 1024 * 1024),
268 ("grpc.keepalive_time_ms", 30000),
269 ],
270 )
271
272 self._interceptor = _LoggingClientInterceptor()
273 self._logged_channel = grpc.intercept_channel(
274 self._grpc_channel, self._interceptor
275 )
276
277 # Wrap messages. This must be done after self._logged_channel exists
278 self._prep_wrapped_messages(client_info)
279
280 @classmethod
281 def create_channel(
282 cls,
283 host: str = "pubsub.googleapis.com",
284 credentials: Optional[ga_credentials.Credentials] = None,
285 credentials_file: Optional[str] = None,
286 scopes: Optional[Sequence[str]] = None,
287 quota_project_id: Optional[str] = None,
288 **kwargs,
289 ) -> grpc.Channel:
290 """Create and return a gRPC channel object.
291 Args:
292 host (Optional[str]): The host for the channel to use.
293 credentials (Optional[~.Credentials]): The
294 authorization credentials to attach to requests. These
295 credentials identify this application to the service. If
296 none are specified, the client will attempt to ascertain
297 the credentials from the environment.
298 credentials_file (Optional[str]): A file with credentials that can
299 be loaded with :func:`google.auth.load_credentials_from_file`.
300 This argument is mutually exclusive with credentials.
301 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
302 service. These are only used when credentials are not specified and
303 are passed to :func:`google.auth.default`.
304 quota_project_id (Optional[str]): An optional project to use for billing
305 and quota.
306 kwargs (Optional[dict]): Keyword arguments, which are passed to the
307 channel creation.
308 Returns:
309 grpc.Channel: A gRPC channel object.
310
311 Raises:
312 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
313 and ``credentials_file`` are passed.
314 """
315
316 return grpc_helpers.create_channel(
317 host,
318 credentials=credentials,
319 credentials_file=credentials_file,
320 quota_project_id=quota_project_id,
321 default_scopes=cls.AUTH_SCOPES,
322 scopes=scopes,
323 default_host=cls.DEFAULT_HOST,
324 **kwargs,
325 )
326
327 @property
328 def grpc_channel(self) -> grpc.Channel:
329 """Return the channel designed to connect to this service."""
330 return self._grpc_channel
331
332 @property
333 def create_subscription(
334 self,
335 ) -> Callable[[pubsub.Subscription], pubsub.Subscription]:
336 r"""Return a callable for the create subscription method over gRPC.
337
338 Creates a subscription to a given topic. See the [resource name
339 rules]
340 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
341 If the subscription already exists, returns ``ALREADY_EXISTS``.
342 If the corresponding topic doesn't exist, returns ``NOT_FOUND``.
343
344 If the name is not provided in the request, the server will
345 assign a random name for this subscription on the same project
346 as the topic, conforming to the [resource name format]
347 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
348 The generated name is populated in the returned Subscription
349 object. Note that for REST API requests, you must specify a name
350 in the request.
351
352 Returns:
353 Callable[[~.Subscription],
354 ~.Subscription]:
355 A function that, when called, will call the underlying RPC
356 on the server.
357 """
358 # Generate a "stub function" on-the-fly which will actually make
359 # the request.
360 # gRPC handles serialization and deserialization, so we just need
361 # to pass in the functions for each.
362 if "create_subscription" not in self._stubs:
363 self._stubs["create_subscription"] = self._logged_channel.unary_unary(
364 "/google.pubsub.v1.Subscriber/CreateSubscription",
365 request_serializer=pubsub.Subscription.serialize,
366 response_deserializer=pubsub.Subscription.deserialize,
367 )
368 return self._stubs["create_subscription"]
369
370 @property
371 def get_subscription(
372 self,
373 ) -> Callable[[pubsub.GetSubscriptionRequest], pubsub.Subscription]:
374 r"""Return a callable for the get subscription method over gRPC.
375
376 Gets the configuration details of a subscription.
377
378 Returns:
379 Callable[[~.GetSubscriptionRequest],
380 ~.Subscription]:
381 A function that, when called, will call the underlying RPC
382 on the server.
383 """
384 # Generate a "stub function" on-the-fly which will actually make
385 # the request.
386 # gRPC handles serialization and deserialization, so we just need
387 # to pass in the functions for each.
388 if "get_subscription" not in self._stubs:
389 self._stubs["get_subscription"] = self._logged_channel.unary_unary(
390 "/google.pubsub.v1.Subscriber/GetSubscription",
391 request_serializer=pubsub.GetSubscriptionRequest.serialize,
392 response_deserializer=pubsub.Subscription.deserialize,
393 )
394 return self._stubs["get_subscription"]
395
396 @property
397 def update_subscription(
398 self,
399 ) -> Callable[[pubsub.UpdateSubscriptionRequest], pubsub.Subscription]:
400 r"""Return a callable for the update subscription method over gRPC.
401
402 Updates an existing subscription by updating the
403 fields specified in the update mask. Note that certain
404 properties of a subscription, such as its topic, are not
405 modifiable.
406
407 Returns:
408 Callable[[~.UpdateSubscriptionRequest],
409 ~.Subscription]:
410 A function that, when called, will call the underlying RPC
411 on the server.
412 """
413 # Generate a "stub function" on-the-fly which will actually make
414 # the request.
415 # gRPC handles serialization and deserialization, so we just need
416 # to pass in the functions for each.
417 if "update_subscription" not in self._stubs:
418 self._stubs["update_subscription"] = self._logged_channel.unary_unary(
419 "/google.pubsub.v1.Subscriber/UpdateSubscription",
420 request_serializer=pubsub.UpdateSubscriptionRequest.serialize,
421 response_deserializer=pubsub.Subscription.deserialize,
422 )
423 return self._stubs["update_subscription"]
424
425 @property
426 def list_subscriptions(
427 self,
428 ) -> Callable[[pubsub.ListSubscriptionsRequest], pubsub.ListSubscriptionsResponse]:
429 r"""Return a callable for the list subscriptions method over gRPC.
430
431 Lists matching subscriptions.
432
433 Returns:
434 Callable[[~.ListSubscriptionsRequest],
435 ~.ListSubscriptionsResponse]:
436 A function that, when called, will call the underlying RPC
437 on the server.
438 """
439 # Generate a "stub function" on-the-fly which will actually make
440 # the request.
441 # gRPC handles serialization and deserialization, so we just need
442 # to pass in the functions for each.
443 if "list_subscriptions" not in self._stubs:
444 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary(
445 "/google.pubsub.v1.Subscriber/ListSubscriptions",
446 request_serializer=pubsub.ListSubscriptionsRequest.serialize,
447 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize,
448 )
449 return self._stubs["list_subscriptions"]
450
451 @property
452 def delete_subscription(
453 self,
454 ) -> Callable[[pubsub.DeleteSubscriptionRequest], empty_pb2.Empty]:
455 r"""Return a callable for the delete subscription method over gRPC.
456
457 Deletes an existing subscription. All messages retained in the
458 subscription are immediately dropped. Calls to ``Pull`` after
459 deletion will return ``NOT_FOUND``. After a subscription is
460 deleted, a new one may be created with the same name, but the
461 new one has no association with the old subscription or its
462 topic unless the same topic is specified.
463
464 Returns:
465 Callable[[~.DeleteSubscriptionRequest],
466 ~.Empty]:
467 A function that, when called, will call the underlying RPC
468 on the server.
469 """
470 # Generate a "stub function" on-the-fly which will actually make
471 # the request.
472 # gRPC handles serialization and deserialization, so we just need
473 # to pass in the functions for each.
474 if "delete_subscription" not in self._stubs:
475 self._stubs["delete_subscription"] = self._logged_channel.unary_unary(
476 "/google.pubsub.v1.Subscriber/DeleteSubscription",
477 request_serializer=pubsub.DeleteSubscriptionRequest.serialize,
478 response_deserializer=empty_pb2.Empty.FromString,
479 )
480 return self._stubs["delete_subscription"]
481
482 @property
483 def modify_ack_deadline(
484 self,
485 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], empty_pb2.Empty]:
486 r"""Return a callable for the modify ack deadline method over gRPC.
487
488 Modifies the ack deadline for a specific message. This method is
489 useful to indicate that more time is needed to process a message
490 by the subscriber, or to make the message available for
491 redelivery if the processing was interrupted. Note that this
492 does not modify the subscription-level ``ackDeadlineSeconds``
493 used for subsequent messages.
494
495 Returns:
496 Callable[[~.ModifyAckDeadlineRequest],
497 ~.Empty]:
498 A function that, when called, will call the underlying RPC
499 on the server.
500 """
501 # Generate a "stub function" on-the-fly which will actually make
502 # the request.
503 # gRPC handles serialization and deserialization, so we just need
504 # to pass in the functions for each.
505 if "modify_ack_deadline" not in self._stubs:
506 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary(
507 "/google.pubsub.v1.Subscriber/ModifyAckDeadline",
508 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize,
509 response_deserializer=empty_pb2.Empty.FromString,
510 )
511 return self._stubs["modify_ack_deadline"]
512
513 @property
514 def acknowledge(self) -> Callable[[pubsub.AcknowledgeRequest], empty_pb2.Empty]:
515 r"""Return a callable for the acknowledge method over gRPC.
516
517 Acknowledges the messages associated with the ``ack_ids`` in the
518 ``AcknowledgeRequest``. The Pub/Sub system can remove the
519 relevant messages from the subscription.
520
521 Acknowledging a message whose ack deadline has expired may
522 succeed, but such a message may be redelivered later.
523 Acknowledging a message more than once will not result in an
524 error.
525
526 Returns:
527 Callable[[~.AcknowledgeRequest],
528 ~.Empty]:
529 A function that, when called, will call the underlying RPC
530 on the server.
531 """
532 # Generate a "stub function" on-the-fly which will actually make
533 # the request.
534 # gRPC handles serialization and deserialization, so we just need
535 # to pass in the functions for each.
536 if "acknowledge" not in self._stubs:
537 self._stubs["acknowledge"] = self._logged_channel.unary_unary(
538 "/google.pubsub.v1.Subscriber/Acknowledge",
539 request_serializer=pubsub.AcknowledgeRequest.serialize,
540 response_deserializer=empty_pb2.Empty.FromString,
541 )
542 return self._stubs["acknowledge"]
543
544 @property
545 def pull(self) -> Callable[[pubsub.PullRequest], pubsub.PullResponse]:
546 r"""Return a callable for the pull method over gRPC.
547
548 Pulls messages from the server.
549
550 Returns:
551 Callable[[~.PullRequest],
552 ~.PullResponse]:
553 A function that, when called, will call the underlying RPC
554 on the server.
555 """
556 # Generate a "stub function" on-the-fly which will actually make
557 # the request.
558 # gRPC handles serialization and deserialization, so we just need
559 # to pass in the functions for each.
560 if "pull" not in self._stubs:
561 self._stubs["pull"] = self._logged_channel.unary_unary(
562 "/google.pubsub.v1.Subscriber/Pull",
563 request_serializer=pubsub.PullRequest.serialize,
564 response_deserializer=pubsub.PullResponse.deserialize,
565 )
566 return self._stubs["pull"]
567
568 @property
569 def streaming_pull(
570 self,
571 ) -> Callable[[pubsub.StreamingPullRequest], pubsub.StreamingPullResponse]:
572 r"""Return a callable for the streaming pull method over gRPC.
573
574 Establishes a stream with the server, which sends messages down
575 to the client. The client streams acknowledgements and ack
576 deadline modifications back to the server. The server will close
577 the stream and return the status on any error. The server may
578 close the stream with status ``UNAVAILABLE`` to reassign
579 server-side resources, in which case, the client should
580 re-establish the stream. Flow control can be achieved by
581 configuring the underlying RPC channel.
582
583 Returns:
584 Callable[[~.StreamingPullRequest],
585 ~.StreamingPullResponse]:
586 A function that, when called, will call the underlying RPC
587 on the server.
588 """
589 # Generate a "stub function" on-the-fly which will actually make
590 # the request.
591 # gRPC handles serialization and deserialization, so we just need
592 # to pass in the functions for each.
593 if "streaming_pull" not in self._stubs:
594 self._stubs["streaming_pull"] = self._logged_channel.stream_stream(
595 "/google.pubsub.v1.Subscriber/StreamingPull",
596 request_serializer=pubsub.StreamingPullRequest.serialize,
597 response_deserializer=pubsub.StreamingPullResponse.deserialize,
598 )
599 return self._stubs["streaming_pull"]
600
601 @property
602 def modify_push_config(
603 self,
604 ) -> Callable[[pubsub.ModifyPushConfigRequest], empty_pb2.Empty]:
605 r"""Return a callable for the modify push config method over gRPC.
606
607 Modifies the ``PushConfig`` for a specified subscription.
608
609 This may be used to change a push subscription to a pull one
610 (signified by an empty ``PushConfig``) or vice versa, or change
611 the endpoint URL and other attributes of a push subscription.
612 Messages will accumulate for delivery continuously through the
613 call regardless of changes to the ``PushConfig``.
614
615 Returns:
616 Callable[[~.ModifyPushConfigRequest],
617 ~.Empty]:
618 A function that, when called, will call the underlying RPC
619 on the server.
620 """
621 # Generate a "stub function" on-the-fly which will actually make
622 # the request.
623 # gRPC handles serialization and deserialization, so we just need
624 # to pass in the functions for each.
625 if "modify_push_config" not in self._stubs:
626 self._stubs["modify_push_config"] = self._logged_channel.unary_unary(
627 "/google.pubsub.v1.Subscriber/ModifyPushConfig",
628 request_serializer=pubsub.ModifyPushConfigRequest.serialize,
629 response_deserializer=empty_pb2.Empty.FromString,
630 )
631 return self._stubs["modify_push_config"]
632
633 @property
634 def get_snapshot(self) -> Callable[[pubsub.GetSnapshotRequest], pubsub.Snapshot]:
635 r"""Return a callable for the get snapshot method over gRPC.
636
637 Gets the configuration details of a snapshot. Snapshots are used
638 in
639 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
640 operations, which allow you to manage message acknowledgments in
641 bulk. That is, you can set the acknowledgment state of messages
642 in an existing subscription to the state captured by a snapshot.
643
644 Returns:
645 Callable[[~.GetSnapshotRequest],
646 ~.Snapshot]:
647 A function that, when called, will call the underlying RPC
648 on the server.
649 """
650 # Generate a "stub function" on-the-fly which will actually make
651 # the request.
652 # gRPC handles serialization and deserialization, so we just need
653 # to pass in the functions for each.
654 if "get_snapshot" not in self._stubs:
655 self._stubs["get_snapshot"] = self._logged_channel.unary_unary(
656 "/google.pubsub.v1.Subscriber/GetSnapshot",
657 request_serializer=pubsub.GetSnapshotRequest.serialize,
658 response_deserializer=pubsub.Snapshot.deserialize,
659 )
660 return self._stubs["get_snapshot"]
661
662 @property
663 def list_snapshots(
664 self,
665 ) -> Callable[[pubsub.ListSnapshotsRequest], pubsub.ListSnapshotsResponse]:
666 r"""Return a callable for the list snapshots method over gRPC.
667
668 Lists the existing snapshots. Snapshots are used in
669 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
670 operations, which allow you to manage message acknowledgments in
671 bulk. That is, you can set the acknowledgment state of messages
672 in an existing subscription to the state captured by a snapshot.
673
674 Returns:
675 Callable[[~.ListSnapshotsRequest],
676 ~.ListSnapshotsResponse]:
677 A function that, when called, will call the underlying RPC
678 on the server.
679 """
680 # Generate a "stub function" on-the-fly which will actually make
681 # the request.
682 # gRPC handles serialization and deserialization, so we just need
683 # to pass in the functions for each.
684 if "list_snapshots" not in self._stubs:
685 self._stubs["list_snapshots"] = self._logged_channel.unary_unary(
686 "/google.pubsub.v1.Subscriber/ListSnapshots",
687 request_serializer=pubsub.ListSnapshotsRequest.serialize,
688 response_deserializer=pubsub.ListSnapshotsResponse.deserialize,
689 )
690 return self._stubs["list_snapshots"]
691
692 @property
693 def create_snapshot(
694 self,
695 ) -> Callable[[pubsub.CreateSnapshotRequest], pubsub.Snapshot]:
696 r"""Return a callable for the create snapshot method over gRPC.
697
698 Creates a snapshot from the requested subscription. Snapshots
699 are used in
700 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
701 operations, which allow you to manage message acknowledgments in
702 bulk. That is, you can set the acknowledgment state of messages
703 in an existing subscription to the state captured by a snapshot.
704 If the snapshot already exists, returns ``ALREADY_EXISTS``. If
705 the requested subscription doesn't exist, returns ``NOT_FOUND``.
706 If the backlog in the subscription is too old -- and the
707 resulting snapshot would expire in less than 1 hour -- then
708 ``FAILED_PRECONDITION`` is returned. See also the
709 ``Snapshot.expire_time`` field. If the name is not provided in
710 the request, the server will assign a random name for this
711 snapshot on the same project as the subscription, conforming to
712 the [resource name format]
713 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
714 The generated name is populated in the returned Snapshot object.
715 Note that for REST API requests, you must specify a name in the
716 request.
717
718 Returns:
719 Callable[[~.CreateSnapshotRequest],
720 ~.Snapshot]:
721 A function that, when called, will call the underlying RPC
722 on the server.
723 """
724 # Generate a "stub function" on-the-fly which will actually make
725 # the request.
726 # gRPC handles serialization and deserialization, so we just need
727 # to pass in the functions for each.
728 if "create_snapshot" not in self._stubs:
729 self._stubs["create_snapshot"] = self._logged_channel.unary_unary(
730 "/google.pubsub.v1.Subscriber/CreateSnapshot",
731 request_serializer=pubsub.CreateSnapshotRequest.serialize,
732 response_deserializer=pubsub.Snapshot.deserialize,
733 )
734 return self._stubs["create_snapshot"]
735
736 @property
737 def update_snapshot(
738 self,
739 ) -> Callable[[pubsub.UpdateSnapshotRequest], pubsub.Snapshot]:
740 r"""Return a callable for the update snapshot method over gRPC.
741
742 Updates an existing snapshot by updating the fields specified in
743 the update mask. Snapshots are used in
744 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
745 operations, which allow you to manage message acknowledgments in
746 bulk. That is, you can set the acknowledgment state of messages
747 in an existing subscription to the state captured by a snapshot.
748
749 Returns:
750 Callable[[~.UpdateSnapshotRequest],
751 ~.Snapshot]:
752 A function that, when called, will call the underlying RPC
753 on the server.
754 """
755 # Generate a "stub function" on-the-fly which will actually make
756 # the request.
757 # gRPC handles serialization and deserialization, so we just need
758 # to pass in the functions for each.
759 if "update_snapshot" not in self._stubs:
760 self._stubs["update_snapshot"] = self._logged_channel.unary_unary(
761 "/google.pubsub.v1.Subscriber/UpdateSnapshot",
762 request_serializer=pubsub.UpdateSnapshotRequest.serialize,
763 response_deserializer=pubsub.Snapshot.deserialize,
764 )
765 return self._stubs["update_snapshot"]
766
767 @property
768 def delete_snapshot(
769 self,
770 ) -> Callable[[pubsub.DeleteSnapshotRequest], empty_pb2.Empty]:
771 r"""Return a callable for the delete snapshot method over gRPC.
772
773 Removes an existing snapshot. Snapshots are used in [Seek]
774 (https://cloud.google.com/pubsub/docs/replay-overview)
775 operations, which allow you to manage message acknowledgments in
776 bulk. That is, you can set the acknowledgment state of messages
777 in an existing subscription to the state captured by a snapshot.
778 When the snapshot is deleted, all messages retained in the
779 snapshot are immediately dropped. After a snapshot is deleted, a
780 new one may be created with the same name, but the new one has
781 no association with the old snapshot or its subscription, unless
782 the same subscription is specified.
783
784 Returns:
785 Callable[[~.DeleteSnapshotRequest],
786 ~.Empty]:
787 A function that, when called, will call the underlying RPC
788 on the server.
789 """
790 # Generate a "stub function" on-the-fly which will actually make
791 # the request.
792 # gRPC handles serialization and deserialization, so we just need
793 # to pass in the functions for each.
794 if "delete_snapshot" not in self._stubs:
795 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary(
796 "/google.pubsub.v1.Subscriber/DeleteSnapshot",
797 request_serializer=pubsub.DeleteSnapshotRequest.serialize,
798 response_deserializer=empty_pb2.Empty.FromString,
799 )
800 return self._stubs["delete_snapshot"]
801
802 @property
803 def seek(self) -> Callable[[pubsub.SeekRequest], pubsub.SeekResponse]:
804 r"""Return a callable for the seek method over gRPC.
805
806 Seeks an existing subscription to a point in time or to a given
807 snapshot, whichever is provided in the request. Snapshots are
808 used in [Seek]
809 (https://cloud.google.com/pubsub/docs/replay-overview)
810 operations, which allow you to manage message acknowledgments in
811 bulk. That is, you can set the acknowledgment state of messages
812 in an existing subscription to the state captured by a snapshot.
813 Note that both the subscription and the snapshot must be on the
814 same topic.
815
816 Returns:
817 Callable[[~.SeekRequest],
818 ~.SeekResponse]:
819 A function that, when called, will call the underlying RPC
820 on the server.
821 """
822 # Generate a "stub function" on-the-fly which will actually make
823 # the request.
824 # gRPC handles serialization and deserialization, so we just need
825 # to pass in the functions for each.
826 if "seek" not in self._stubs:
827 self._stubs["seek"] = self._logged_channel.unary_unary(
828 "/google.pubsub.v1.Subscriber/Seek",
829 request_serializer=pubsub.SeekRequest.serialize,
830 response_deserializer=pubsub.SeekResponse.deserialize,
831 )
832 return self._stubs["seek"]
833
834 def close(self):
835 self._logged_channel.close()
836
837 @property
838 def set_iam_policy(
839 self,
840 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]:
841 r"""Return a callable for the set iam policy method over gRPC.
842 Sets the IAM access control policy on the specified
843 function. Replaces any existing policy.
844 Returns:
845 Callable[[~.SetIamPolicyRequest],
846 ~.Policy]:
847 A function that, when called, will call the underlying RPC
848 on the server.
849 """
850 # Generate a "stub function" on-the-fly which will actually make
851 # the request.
852 # gRPC handles serialization and deserialization, so we just need
853 # to pass in the functions for each.
854 if "set_iam_policy" not in self._stubs:
855 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
856 "/google.iam.v1.IAMPolicy/SetIamPolicy",
857 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
858 response_deserializer=policy_pb2.Policy.FromString,
859 )
860 return self._stubs["set_iam_policy"]
861
862 @property
863 def get_iam_policy(
864 self,
865 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]:
866 r"""Return a callable for the get iam policy method over gRPC.
867 Gets the IAM access control policy for a function.
868 Returns an empty policy if the function exists and does
869 not have a policy set.
870 Returns:
871 Callable[[~.GetIamPolicyRequest],
872 ~.Policy]:
873 A function that, when called, will call the underlying RPC
874 on the server.
875 """
876 # Generate a "stub function" on-the-fly which will actually make
877 # the request.
878 # gRPC handles serialization and deserialization, so we just need
879 # to pass in the functions for each.
880 if "get_iam_policy" not in self._stubs:
881 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
882 "/google.iam.v1.IAMPolicy/GetIamPolicy",
883 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
884 response_deserializer=policy_pb2.Policy.FromString,
885 )
886 return self._stubs["get_iam_policy"]
887
888 @property
889 def test_iam_permissions(
890 self,
891 ) -> Callable[
892 [iam_policy_pb2.TestIamPermissionsRequest],
893 iam_policy_pb2.TestIamPermissionsResponse,
894 ]:
895 r"""Return a callable for the test iam permissions method over gRPC.
896 Tests the specified permissions against the IAM access control
897 policy for a function. If the function does not exist, this will
898 return an empty set of permissions, not a NOT_FOUND error.
899 Returns:
900 Callable[[~.TestIamPermissionsRequest],
901 ~.TestIamPermissionsResponse]:
902 A function that, when called, will call the underlying RPC
903 on the server.
904 """
905 # Generate a "stub function" on-the-fly which will actually make
906 # the request.
907 # gRPC handles serialization and deserialization, so we just need
908 # to pass in the functions for each.
909 if "test_iam_permissions" not in self._stubs:
910 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
911 "/google.iam.v1.IAMPolicy/TestIamPermissions",
912 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
913 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
914 )
915 return self._stubs["test_iam_permissions"]
916
917 @property
918 def kind(self) -> str:
919 return "grpc"
920
921
922__all__ = ("SubscriberGrpcTransport",)