1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import warnings
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
18
19from google.api_core import gapic_v1
20from google.api_core import grpc_helpers_async
21from google.auth import credentials as ga_credentials # type: ignore
22from google.auth.transport.grpc import SslCredentials # type: ignore
23
24import grpc # type: ignore
25from grpc.experimental import aio # type: ignore
26
27from google.iam.v1 import iam_policy_pb2 # type: ignore
28from google.iam.v1 import policy_pb2 # type: ignore
29from google.protobuf import empty_pb2 # type: ignore
30from google.pubsub_v1.types import pubsub
31from .base import SubscriberTransport, DEFAULT_CLIENT_INFO
32from .grpc import SubscriberGrpcTransport
33
34
35class SubscriberGrpcAsyncIOTransport(SubscriberTransport):
36 """gRPC AsyncIO backend transport for Subscriber.
37
38 The service that an application uses to manipulate subscriptions and
39 to consume messages from a subscription via the ``Pull`` method or
40 by establishing a bi-directional stream using the ``StreamingPull``
41 method.
42
43 This class defines the same methods as the primary client, so the
44 primary client can load the underlying transport implementation
45 and call it.
46
47 It sends protocol buffers over the wire using gRPC (which is built on
48 top of HTTP/2); the ``grpcio`` package must be installed.
49 """
50
51 _grpc_channel: aio.Channel
52 _stubs: Dict[str, Callable] = {}
53
54 @classmethod
55 def create_channel(
56 cls,
57 host: str = "pubsub.googleapis.com",
58 credentials: Optional[ga_credentials.Credentials] = None,
59 credentials_file: Optional[str] = None,
60 scopes: Optional[Sequence[str]] = None,
61 quota_project_id: Optional[str] = None,
62 **kwargs,
63 ) -> aio.Channel:
64 """Create and return a gRPC AsyncIO channel object.
65 Args:
66 host (Optional[str]): The host for the channel to use.
67 credentials (Optional[~.Credentials]): The
68 authorization credentials to attach to requests. These
69 credentials identify this application to the service. If
70 none are specified, the client will attempt to ascertain
71 the credentials from the environment.
72 credentials_file (Optional[str]): A file with credentials that can
73 be loaded with :func:`google.auth.load_credentials_from_file`.
74 This argument is ignored if ``channel`` is provided.
75 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
76 service. These are only used when credentials are not specified and
77 are passed to :func:`google.auth.default`.
78 quota_project_id (Optional[str]): An optional project to use for billing
79 and quota.
80 kwargs (Optional[dict]): Keyword arguments, which are passed to the
81 channel creation.
82 Returns:
83 aio.Channel: A gRPC AsyncIO channel object.
84 """
85
86 return grpc_helpers_async.create_channel(
87 host,
88 credentials=credentials,
89 credentials_file=credentials_file,
90 quota_project_id=quota_project_id,
91 default_scopes=cls.AUTH_SCOPES,
92 scopes=scopes,
93 default_host=cls.DEFAULT_HOST,
94 **kwargs,
95 )
96
97 def __init__(
98 self,
99 *,
100 host: str = "pubsub.googleapis.com",
101 credentials: Optional[ga_credentials.Credentials] = None,
102 credentials_file: Optional[str] = None,
103 scopes: Optional[Sequence[str]] = None,
104 channel: Optional[aio.Channel] = None,
105 api_mtls_endpoint: Optional[str] = None,
106 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
107 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
108 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
109 quota_project_id: Optional[str] = None,
110 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
111 always_use_jwt_access: Optional[bool] = False,
112 api_audience: Optional[str] = None,
113 ) -> None:
114 """Instantiate the transport.
115
116 Args:
117 host (Optional[str]):
118 The hostname to connect to.
119 credentials (Optional[google.auth.credentials.Credentials]): The
120 authorization credentials to attach to requests. These
121 credentials identify the application to the service; if none
122 are specified, the client will attempt to ascertain the
123 credentials from the environment.
124 This argument is ignored if ``channel`` is provided.
125 credentials_file (Optional[str]): A file with credentials that can
126 be loaded with :func:`google.auth.load_credentials_from_file`.
127 This argument is ignored if ``channel`` is provided.
128 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
129 service. These are only used when credentials are not specified and
130 are passed to :func:`google.auth.default`.
131 channel (Optional[aio.Channel]): A ``Channel`` instance through
132 which to make calls.
133 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
134 If provided, it overrides the ``host`` argument and tries to create
135 a mutual TLS channel with client SSL credentials from
136 ``client_cert_source`` or application default SSL credentials.
137 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
138 Deprecated. A callback to provide client SSL certificate bytes and
139 private key bytes, both in PEM format. It is ignored if
140 ``api_mtls_endpoint`` is None.
141 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
142 for the grpc channel. It is ignored if ``channel`` is provided.
143 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
144 A callback to provide client certificate bytes and private key bytes,
145 both in PEM format. It is used to configure a mutual TLS channel. It is
146 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
147 quota_project_id (Optional[str]): An optional project to use for billing
148 and quota.
149 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
150 The client info used to send a user-agent string along with
151 API requests. If ``None``, then default info will be used.
152 Generally, you only need to set this if you're developing
153 your own client library.
154 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
155 be used for service account credentials.
156
157 Raises:
158 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
159 creation failed for any reason.
160 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
161 and ``credentials_file`` are passed.
162 """
163 self._grpc_channel = None
164 self._ssl_channel_credentials = ssl_channel_credentials
165 self._stubs: Dict[str, Callable] = {}
166
167 if api_mtls_endpoint:
168 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
169 if client_cert_source:
170 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
171
172 if channel:
173 # Ignore credentials if a channel was passed.
174 credentials = False
175 # If a channel was explicitly provided, set it.
176 self._grpc_channel = channel
177 self._ssl_channel_credentials = None
178 else:
179 if api_mtls_endpoint:
180 host = api_mtls_endpoint
181
182 # Create SSL credentials with client_cert_source or application
183 # default SSL credentials.
184 if client_cert_source:
185 cert, key = client_cert_source()
186 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
187 certificate_chain=cert, private_key=key
188 )
189 else:
190 self._ssl_channel_credentials = SslCredentials().ssl_credentials
191
192 else:
193 if client_cert_source_for_mtls and not ssl_channel_credentials:
194 cert, key = client_cert_source_for_mtls()
195 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
196 certificate_chain=cert, private_key=key
197 )
198
199 # The base transport sets the host, credentials and scopes
200 super().__init__(
201 host=host,
202 credentials=credentials,
203 credentials_file=credentials_file,
204 scopes=scopes,
205 quota_project_id=quota_project_id,
206 client_info=client_info,
207 always_use_jwt_access=always_use_jwt_access,
208 api_audience=api_audience,
209 )
210
211 if not self._grpc_channel:
212 self._grpc_channel = type(self).create_channel(
213 self._host,
214 # use the credentials which are saved
215 credentials=self._credentials,
216 # Set ``credentials_file`` to ``None`` here as
217 # the credentials that we saved earlier should be used.
218 credentials_file=None,
219 scopes=self._scopes,
220 ssl_credentials=self._ssl_channel_credentials,
221 quota_project_id=quota_project_id,
222 options=[
223 ("grpc.max_send_message_length", -1),
224 ("grpc.max_receive_message_length", -1),
225 ("grpc.max_metadata_size", 4 * 1024 * 1024),
226 ("grpc.keepalive_time_ms", 30000),
227 ],
228 )
229
230 # Wrap messages. This must be done after self._grpc_channel exists
231 self._prep_wrapped_messages(client_info)
232
233 @property
234 def grpc_channel(self) -> aio.Channel:
235 """Create the channel designed to connect to this service.
236
237 This property caches on the instance; repeated calls return
238 the same channel.
239 """
240 # Return the channel from cache.
241 return self._grpc_channel
242
243 @property
244 def create_subscription(
245 self,
246 ) -> Callable[[pubsub.Subscription], Awaitable[pubsub.Subscription]]:
247 r"""Return a callable for the create subscription method over gRPC.
248
249 Creates a subscription to a given topic. See the [resource name
250 rules]
251 (https://cloud.google.com/pubsub/docs/admin#resource_names). If
252 the subscription already exists, returns ``ALREADY_EXISTS``. If
253 the corresponding topic doesn't exist, returns ``NOT_FOUND``.
254
255 If the name is not provided in the request, the server will
256 assign a random name for this subscription on the same project
257 as the topic, conforming to the [resource name format]
258 (https://cloud.google.com/pubsub/docs/admin#resource_names). The
259 generated name is populated in the returned Subscription object.
260 Note that for REST API requests, you must specify a name in the
261 request.
262
263 Returns:
264 Callable[[~.Subscription],
265 Awaitable[~.Subscription]]:
266 A function that, when called, will call the underlying RPC
267 on the server.
268 """
269 # Generate a "stub function" on-the-fly which will actually make
270 # the request.
271 # gRPC handles serialization and deserialization, so we just need
272 # to pass in the functions for each.
273 if "create_subscription" not in self._stubs:
274 self._stubs["create_subscription"] = self.grpc_channel.unary_unary(
275 "/google.pubsub.v1.Subscriber/CreateSubscription",
276 request_serializer=pubsub.Subscription.serialize,
277 response_deserializer=pubsub.Subscription.deserialize,
278 )
279 return self._stubs["create_subscription"]
280
281 @property
282 def get_subscription(
283 self,
284 ) -> Callable[[pubsub.GetSubscriptionRequest], Awaitable[pubsub.Subscription]]:
285 r"""Return a callable for the get subscription method over gRPC.
286
287 Gets the configuration details of a subscription.
288
289 Returns:
290 Callable[[~.GetSubscriptionRequest],
291 Awaitable[~.Subscription]]:
292 A function that, when called, will call the underlying RPC
293 on the server.
294 """
295 # Generate a "stub function" on-the-fly which will actually make
296 # the request.
297 # gRPC handles serialization and deserialization, so we just need
298 # to pass in the functions for each.
299 if "get_subscription" not in self._stubs:
300 self._stubs["get_subscription"] = self.grpc_channel.unary_unary(
301 "/google.pubsub.v1.Subscriber/GetSubscription",
302 request_serializer=pubsub.GetSubscriptionRequest.serialize,
303 response_deserializer=pubsub.Subscription.deserialize,
304 )
305 return self._stubs["get_subscription"]
306
307 @property
308 def update_subscription(
309 self,
310 ) -> Callable[[pubsub.UpdateSubscriptionRequest], Awaitable[pubsub.Subscription]]:
311 r"""Return a callable for the update subscription method over gRPC.
312
313 Updates an existing subscription. Note that certain
314 properties of a subscription, such as its topic, are not
315 modifiable.
316
317 Returns:
318 Callable[[~.UpdateSubscriptionRequest],
319 Awaitable[~.Subscription]]:
320 A function that, when called, will call the underlying RPC
321 on the server.
322 """
323 # Generate a "stub function" on-the-fly which will actually make
324 # the request.
325 # gRPC handles serialization and deserialization, so we just need
326 # to pass in the functions for each.
327 if "update_subscription" not in self._stubs:
328 self._stubs["update_subscription"] = self.grpc_channel.unary_unary(
329 "/google.pubsub.v1.Subscriber/UpdateSubscription",
330 request_serializer=pubsub.UpdateSubscriptionRequest.serialize,
331 response_deserializer=pubsub.Subscription.deserialize,
332 )
333 return self._stubs["update_subscription"]
334
335 @property
336 def list_subscriptions(
337 self,
338 ) -> Callable[
339 [pubsub.ListSubscriptionsRequest], Awaitable[pubsub.ListSubscriptionsResponse]
340 ]:
341 r"""Return a callable for the list subscriptions method over gRPC.
342
343 Lists matching subscriptions.
344
345 Returns:
346 Callable[[~.ListSubscriptionsRequest],
347 Awaitable[~.ListSubscriptionsResponse]]:
348 A function that, when called, will call the underlying RPC
349 on the server.
350 """
351 # Generate a "stub function" on-the-fly which will actually make
352 # the request.
353 # gRPC handles serialization and deserialization, so we just need
354 # to pass in the functions for each.
355 if "list_subscriptions" not in self._stubs:
356 self._stubs["list_subscriptions"] = self.grpc_channel.unary_unary(
357 "/google.pubsub.v1.Subscriber/ListSubscriptions",
358 request_serializer=pubsub.ListSubscriptionsRequest.serialize,
359 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize,
360 )
361 return self._stubs["list_subscriptions"]
362
363 @property
364 def delete_subscription(
365 self,
366 ) -> Callable[[pubsub.DeleteSubscriptionRequest], Awaitable[empty_pb2.Empty]]:
367 r"""Return a callable for the delete subscription method over gRPC.
368
369 Deletes an existing subscription. All messages retained in the
370 subscription are immediately dropped. Calls to ``Pull`` after
371 deletion will return ``NOT_FOUND``. After a subscription is
372 deleted, a new one may be created with the same name, but the
373 new one has no association with the old subscription or its
374 topic unless the same topic is specified.
375
376 Returns:
377 Callable[[~.DeleteSubscriptionRequest],
378 Awaitable[~.Empty]]:
379 A function that, when called, will call the underlying RPC
380 on the server.
381 """
382 # Generate a "stub function" on-the-fly which will actually make
383 # the request.
384 # gRPC handles serialization and deserialization, so we just need
385 # to pass in the functions for each.
386 if "delete_subscription" not in self._stubs:
387 self._stubs["delete_subscription"] = self.grpc_channel.unary_unary(
388 "/google.pubsub.v1.Subscriber/DeleteSubscription",
389 request_serializer=pubsub.DeleteSubscriptionRequest.serialize,
390 response_deserializer=empty_pb2.Empty.FromString,
391 )
392 return self._stubs["delete_subscription"]
393
394 @property
395 def modify_ack_deadline(
396 self,
397 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], Awaitable[empty_pb2.Empty]]:
398 r"""Return a callable for the modify ack deadline method over gRPC.
399
400 Modifies the ack deadline for a specific message. This method is
401 useful to indicate that more time is needed to process a message
402 by the subscriber, or to make the message available for
403 redelivery if the processing was interrupted. Note that this
404 does not modify the subscription-level ``ackDeadlineSeconds``
405 used for subsequent messages.
406
407 Returns:
408 Callable[[~.ModifyAckDeadlineRequest],
409 Awaitable[~.Empty]]:
410 A function that, when called, will call the underlying RPC
411 on the server.
412 """
413 # Generate a "stub function" on-the-fly which will actually make
414 # the request.
415 # gRPC handles serialization and deserialization, so we just need
416 # to pass in the functions for each.
417 if "modify_ack_deadline" not in self._stubs:
418 self._stubs["modify_ack_deadline"] = self.grpc_channel.unary_unary(
419 "/google.pubsub.v1.Subscriber/ModifyAckDeadline",
420 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize,
421 response_deserializer=empty_pb2.Empty.FromString,
422 )
423 return self._stubs["modify_ack_deadline"]
424
425 @property
426 def acknowledge(
427 self,
428 ) -> Callable[[pubsub.AcknowledgeRequest], Awaitable[empty_pb2.Empty]]:
429 r"""Return a callable for the acknowledge method over gRPC.
430
431 Acknowledges the messages associated with the ``ack_ids`` in the
432 ``AcknowledgeRequest``. The Pub/Sub system can remove the
433 relevant messages from the subscription.
434
435 Acknowledging a message whose ack deadline has expired may
436 succeed, but such a message may be redelivered later.
437 Acknowledging a message more than once will not result in an
438 error.
439
440 Returns:
441 Callable[[~.AcknowledgeRequest],
442 Awaitable[~.Empty]]:
443 A function that, when called, will call the underlying RPC
444 on the server.
445 """
446 # Generate a "stub function" on-the-fly which will actually make
447 # the request.
448 # gRPC handles serialization and deserialization, so we just need
449 # to pass in the functions for each.
450 if "acknowledge" not in self._stubs:
451 self._stubs["acknowledge"] = self.grpc_channel.unary_unary(
452 "/google.pubsub.v1.Subscriber/Acknowledge",
453 request_serializer=pubsub.AcknowledgeRequest.serialize,
454 response_deserializer=empty_pb2.Empty.FromString,
455 )
456 return self._stubs["acknowledge"]
457
458 @property
459 def pull(self) -> Callable[[pubsub.PullRequest], Awaitable[pubsub.PullResponse]]:
460 r"""Return a callable for the pull method over gRPC.
461
462 Pulls messages from the server.
463
464 Returns:
465 Callable[[~.PullRequest],
466 Awaitable[~.PullResponse]]:
467 A function that, when called, will call the underlying RPC
468 on the server.
469 """
470 # Generate a "stub function" on-the-fly which will actually make
471 # the request.
472 # gRPC handles serialization and deserialization, so we just need
473 # to pass in the functions for each.
474 if "pull" not in self._stubs:
475 self._stubs["pull"] = self.grpc_channel.unary_unary(
476 "/google.pubsub.v1.Subscriber/Pull",
477 request_serializer=pubsub.PullRequest.serialize,
478 response_deserializer=pubsub.PullResponse.deserialize,
479 )
480 return self._stubs["pull"]
481
482 @property
483 def streaming_pull(
484 self,
485 ) -> Callable[
486 [pubsub.StreamingPullRequest], Awaitable[pubsub.StreamingPullResponse]
487 ]:
488 r"""Return a callable for the streaming pull method over gRPC.
489
490 Establishes a stream with the server, which sends messages down
491 to the client. The client streams acknowledgements and ack
492 deadline modifications back to the server. The server will close
493 the stream and return the status on any error. The server may
494 close the stream with status ``UNAVAILABLE`` to reassign
495 server-side resources, in which case, the client should
496 re-establish the stream. Flow control can be achieved by
497 configuring the underlying RPC channel.
498
499 Returns:
500 Callable[[~.StreamingPullRequest],
501 Awaitable[~.StreamingPullResponse]]:
502 A function that, when called, will call the underlying RPC
503 on the server.
504 """
505 # Generate a "stub function" on-the-fly which will actually make
506 # the request.
507 # gRPC handles serialization and deserialization, so we just need
508 # to pass in the functions for each.
509 if "streaming_pull" not in self._stubs:
510 self._stubs["streaming_pull"] = self.grpc_channel.stream_stream(
511 "/google.pubsub.v1.Subscriber/StreamingPull",
512 request_serializer=pubsub.StreamingPullRequest.serialize,
513 response_deserializer=pubsub.StreamingPullResponse.deserialize,
514 )
515 return self._stubs["streaming_pull"]
516
517 @property
518 def modify_push_config(
519 self,
520 ) -> Callable[[pubsub.ModifyPushConfigRequest], Awaitable[empty_pb2.Empty]]:
521 r"""Return a callable for the modify push config method over gRPC.
522
523 Modifies the ``PushConfig`` for a specified subscription.
524
525 This may be used to change a push subscription to a pull one
526 (signified by an empty ``PushConfig``) or vice versa, or change
527 the endpoint URL and other attributes of a push subscription.
528 Messages will accumulate for delivery continuously through the
529 call regardless of changes to the ``PushConfig``.
530
531 Returns:
532 Callable[[~.ModifyPushConfigRequest],
533 Awaitable[~.Empty]]:
534 A function that, when called, will call the underlying RPC
535 on the server.
536 """
537 # Generate a "stub function" on-the-fly which will actually make
538 # the request.
539 # gRPC handles serialization and deserialization, so we just need
540 # to pass in the functions for each.
541 if "modify_push_config" not in self._stubs:
542 self._stubs["modify_push_config"] = self.grpc_channel.unary_unary(
543 "/google.pubsub.v1.Subscriber/ModifyPushConfig",
544 request_serializer=pubsub.ModifyPushConfigRequest.serialize,
545 response_deserializer=empty_pb2.Empty.FromString,
546 )
547 return self._stubs["modify_push_config"]
548
549 @property
550 def get_snapshot(
551 self,
552 ) -> Callable[[pubsub.GetSnapshotRequest], Awaitable[pubsub.Snapshot]]:
553 r"""Return a callable for the get snapshot method over gRPC.
554
555 Gets the configuration details of a snapshot. Snapshots are used
556 in
557 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
558 operations, which allow you to manage message acknowledgments in
559 bulk. That is, you can set the acknowledgment state of messages
560 in an existing subscription to the state captured by a snapshot.
561
562 Returns:
563 Callable[[~.GetSnapshotRequest],
564 Awaitable[~.Snapshot]]:
565 A function that, when called, will call the underlying RPC
566 on the server.
567 """
568 # Generate a "stub function" on-the-fly which will actually make
569 # the request.
570 # gRPC handles serialization and deserialization, so we just need
571 # to pass in the functions for each.
572 if "get_snapshot" not in self._stubs:
573 self._stubs["get_snapshot"] = self.grpc_channel.unary_unary(
574 "/google.pubsub.v1.Subscriber/GetSnapshot",
575 request_serializer=pubsub.GetSnapshotRequest.serialize,
576 response_deserializer=pubsub.Snapshot.deserialize,
577 )
578 return self._stubs["get_snapshot"]
579
580 @property
581 def list_snapshots(
582 self,
583 ) -> Callable[
584 [pubsub.ListSnapshotsRequest], Awaitable[pubsub.ListSnapshotsResponse]
585 ]:
586 r"""Return a callable for the list snapshots method over gRPC.
587
588 Lists the existing snapshots. Snapshots are used in
589 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
590 operations, which allow you to manage message acknowledgments in
591 bulk. That is, you can set the acknowledgment state of messages
592 in an existing subscription to the state captured by a snapshot.
593
594 Returns:
595 Callable[[~.ListSnapshotsRequest],
596 Awaitable[~.ListSnapshotsResponse]]:
597 A function that, when called, will call the underlying RPC
598 on the server.
599 """
600 # Generate a "stub function" on-the-fly which will actually make
601 # the request.
602 # gRPC handles serialization and deserialization, so we just need
603 # to pass in the functions for each.
604 if "list_snapshots" not in self._stubs:
605 self._stubs["list_snapshots"] = self.grpc_channel.unary_unary(
606 "/google.pubsub.v1.Subscriber/ListSnapshots",
607 request_serializer=pubsub.ListSnapshotsRequest.serialize,
608 response_deserializer=pubsub.ListSnapshotsResponse.deserialize,
609 )
610 return self._stubs["list_snapshots"]
611
612 @property
613 def create_snapshot(
614 self,
615 ) -> Callable[[pubsub.CreateSnapshotRequest], Awaitable[pubsub.Snapshot]]:
616 r"""Return a callable for the create snapshot method over gRPC.
617
618 Creates a snapshot from the requested subscription. Snapshots
619 are used in
620 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
621 operations, which allow you to manage message acknowledgments in
622 bulk. That is, you can set the acknowledgment state of messages
623 in an existing subscription to the state captured by a snapshot.
624 If the snapshot already exists, returns ``ALREADY_EXISTS``. If
625 the requested subscription doesn't exist, returns ``NOT_FOUND``.
626 If the backlog in the subscription is too old -- and the
627 resulting snapshot would expire in less than 1 hour -- then
628 ``FAILED_PRECONDITION`` is returned. See also the
629 ``Snapshot.expire_time`` field. If the name is not provided in
630 the request, the server will assign a random name for this
631 snapshot on the same project as the subscription, conforming to
632 the [resource name format]
633 (https://cloud.google.com/pubsub/docs/admin#resource_names). The
634 generated name is populated in the returned Snapshot object.
635 Note that for REST API requests, you must specify a name in the
636 request.
637
638 Returns:
639 Callable[[~.CreateSnapshotRequest],
640 Awaitable[~.Snapshot]]:
641 A function that, when called, will call the underlying RPC
642 on the server.
643 """
644 # Generate a "stub function" on-the-fly which will actually make
645 # the request.
646 # gRPC handles serialization and deserialization, so we just need
647 # to pass in the functions for each.
648 if "create_snapshot" not in self._stubs:
649 self._stubs["create_snapshot"] = self.grpc_channel.unary_unary(
650 "/google.pubsub.v1.Subscriber/CreateSnapshot",
651 request_serializer=pubsub.CreateSnapshotRequest.serialize,
652 response_deserializer=pubsub.Snapshot.deserialize,
653 )
654 return self._stubs["create_snapshot"]
655
656 @property
657 def update_snapshot(
658 self,
659 ) -> Callable[[pubsub.UpdateSnapshotRequest], Awaitable[pubsub.Snapshot]]:
660 r"""Return a callable for the update snapshot method over gRPC.
661
662 Updates an existing snapshot. Snapshots are used in
663 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
664 operations, which allow you to manage message acknowledgments in
665 bulk. That is, you can set the acknowledgment state of messages
666 in an existing subscription to the state captured by a snapshot.
667
668 Returns:
669 Callable[[~.UpdateSnapshotRequest],
670 Awaitable[~.Snapshot]]:
671 A function that, when called, will call the underlying RPC
672 on the server.
673 """
674 # Generate a "stub function" on-the-fly which will actually make
675 # the request.
676 # gRPC handles serialization and deserialization, so we just need
677 # to pass in the functions for each.
678 if "update_snapshot" not in self._stubs:
679 self._stubs["update_snapshot"] = self.grpc_channel.unary_unary(
680 "/google.pubsub.v1.Subscriber/UpdateSnapshot",
681 request_serializer=pubsub.UpdateSnapshotRequest.serialize,
682 response_deserializer=pubsub.Snapshot.deserialize,
683 )
684 return self._stubs["update_snapshot"]
685
686 @property
687 def delete_snapshot(
688 self,
689 ) -> Callable[[pubsub.DeleteSnapshotRequest], Awaitable[empty_pb2.Empty]]:
690 r"""Return a callable for the delete snapshot method over gRPC.
691
692 Removes an existing snapshot. Snapshots are used in [Seek]
693 (https://cloud.google.com/pubsub/docs/replay-overview)
694 operations, which allow you to manage message acknowledgments in
695 bulk. That is, you can set the acknowledgment state of messages
696 in an existing subscription to the state captured by a snapshot.
697 When the snapshot is deleted, all messages retained in the
698 snapshot are immediately dropped. After a snapshot is deleted, a
699 new one may be created with the same name, but the new one has
700 no association with the old snapshot or its subscription, unless
701 the same subscription is specified.
702
703 Returns:
704 Callable[[~.DeleteSnapshotRequest],
705 Awaitable[~.Empty]]:
706 A function that, when called, will call the underlying RPC
707 on the server.
708 """
709 # Generate a "stub function" on-the-fly which will actually make
710 # the request.
711 # gRPC handles serialization and deserialization, so we just need
712 # to pass in the functions for each.
713 if "delete_snapshot" not in self._stubs:
714 self._stubs["delete_snapshot"] = self.grpc_channel.unary_unary(
715 "/google.pubsub.v1.Subscriber/DeleteSnapshot",
716 request_serializer=pubsub.DeleteSnapshotRequest.serialize,
717 response_deserializer=empty_pb2.Empty.FromString,
718 )
719 return self._stubs["delete_snapshot"]
720
721 @property
722 def seek(self) -> Callable[[pubsub.SeekRequest], Awaitable[pubsub.SeekResponse]]:
723 r"""Return a callable for the seek method over gRPC.
724
725 Seeks an existing subscription to a point in time or to a given
726 snapshot, whichever is provided in the request. Snapshots are
727 used in [Seek]
728 (https://cloud.google.com/pubsub/docs/replay-overview)
729 operations, which allow you to manage message acknowledgments in
730 bulk. That is, you can set the acknowledgment state of messages
731 in an existing subscription to the state captured by a snapshot.
732 Note that both the subscription and the snapshot must be on the
733 same topic.
734
735 Returns:
736 Callable[[~.SeekRequest],
737 Awaitable[~.SeekResponse]]:
738 A function that, when called, will call the underlying RPC
739 on the server.
740 """
741 # Generate a "stub function" on-the-fly which will actually make
742 # the request.
743 # gRPC handles serialization and deserialization, so we just need
744 # to pass in the functions for each.
745 if "seek" not in self._stubs:
746 self._stubs["seek"] = self.grpc_channel.unary_unary(
747 "/google.pubsub.v1.Subscriber/Seek",
748 request_serializer=pubsub.SeekRequest.serialize,
749 response_deserializer=pubsub.SeekResponse.deserialize,
750 )
751 return self._stubs["seek"]
752
753 @property
754 def set_iam_policy(
755 self,
756 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], Awaitable[policy_pb2.Policy]]:
757 r"""Return a callable for the set iam policy method over gRPC.
758 Sets the IAM access control policy on the specified
759 function. Replaces any existing policy.
760 Returns:
761 Callable[[~.SetIamPolicyRequest],
762 Awaitable[~.Policy]]:
763 A function that, when called, will call the underlying RPC
764 on the server.
765 """
766 # Generate a "stub function" on-the-fly which will actually make
767 # the request.
768 # gRPC handles serialization and deserialization, so we just need
769 # to pass in the functions for each.
770 if "set_iam_policy" not in self._stubs:
771 self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary(
772 "/google.iam.v1.IAMPolicy/SetIamPolicy",
773 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
774 response_deserializer=policy_pb2.Policy.FromString,
775 )
776 return self._stubs["set_iam_policy"]
777
778 @property
779 def get_iam_policy(
780 self,
781 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], Awaitable[policy_pb2.Policy]]:
782 r"""Return a callable for the get iam policy method over gRPC.
783 Gets the IAM access control policy for a function.
784 Returns an empty policy if the function exists and does
785 not have a policy set.
786 Returns:
787 Callable[[~.GetIamPolicyRequest],
788 Awaitable[~.Policy]]:
789 A function that, when called, will call the underlying RPC
790 on the server.
791 """
792 # Generate a "stub function" on-the-fly which will actually make
793 # the request.
794 # gRPC handles serialization and deserialization, so we just need
795 # to pass in the functions for each.
796 if "get_iam_policy" not in self._stubs:
797 self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary(
798 "/google.iam.v1.IAMPolicy/GetIamPolicy",
799 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
800 response_deserializer=policy_pb2.Policy.FromString,
801 )
802 return self._stubs["get_iam_policy"]
803
804 @property
805 def test_iam_permissions(
806 self,
807 ) -> Callable[
808 [iam_policy_pb2.TestIamPermissionsRequest],
809 Awaitable[iam_policy_pb2.TestIamPermissionsResponse],
810 ]:
811 r"""Return a callable for the test iam permissions method over gRPC.
812 Tests the specified permissions against the IAM access control
813 policy for a function. If the function does not exist, this will
814 return an empty set of permissions, not a NOT_FOUND error.
815 Returns:
816 Callable[[~.TestIamPermissionsRequest],
817 Awaitable[~.TestIamPermissionsResponse]]:
818 A function that, when called, will call the underlying RPC
819 on the server.
820 """
821 # Generate a "stub function" on-the-fly which will actually make
822 # the request.
823 # gRPC handles serialization and deserialization, so we just need
824 # to pass in the functions for each.
825 if "test_iam_permissions" not in self._stubs:
826 self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary(
827 "/google.iam.v1.IAMPolicy/TestIamPermissions",
828 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
829 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
830 )
831 return self._stubs["test_iam_permissions"]
832
833 def close(self):
834 return self.grpc_channel.close()
835
836
837__all__ = ("SubscriberGrpcAsyncIOTransport",)