1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import logging as std_logging
17from collections import OrderedDict
18import re
19from typing import (
20 Dict,
21 Callable,
22 Mapping,
23 MutableMapping,
24 MutableSequence,
25 Optional,
26 AsyncIterable,
27 Awaitable,
28 AsyncIterator,
29 Sequence,
30 Tuple,
31 Type,
32 Union,
33)
34
35import warnings
36from google.pubsub_v1 import gapic_version as package_version
37
38from google.api_core.client_options import ClientOptions
39from google.api_core import exceptions as core_exceptions
40from google.api_core import gapic_v1
41from google.api_core import retry_async as retries
42from google.auth import credentials as ga_credentials # type: ignore
43from google.oauth2 import service_account # type: ignore
44import google.protobuf
45
46
47try:
48 OptionalRetry = Union[retries.AsyncRetry, gapic_v1.method._MethodDefault, None]
49except AttributeError: # pragma: NO COVER
50 OptionalRetry = Union[retries.AsyncRetry, object, None] # type: ignore
51
52from google.iam.v1 import iam_policy_pb2 # type: ignore
53from google.iam.v1 import policy_pb2 # type: ignore
54from google.protobuf import duration_pb2 # type: ignore
55from google.protobuf import field_mask_pb2 # type: ignore
56from google.protobuf import timestamp_pb2 # type: ignore
57from google.pubsub_v1.services.subscriber import pagers
58from google.pubsub_v1.types import pubsub
59from .transports.base import SubscriberTransport, DEFAULT_CLIENT_INFO
60from .transports.grpc_asyncio import SubscriberGrpcAsyncIOTransport
61from .client import SubscriberClient
62
63try:
64 from google.api_core import client_logging # type: ignore
65
66 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
67except ImportError: # pragma: NO COVER
68 CLIENT_LOGGING_SUPPORTED = False
69
70_LOGGER = std_logging.getLogger(__name__)
71
72
73class SubscriberAsyncClient:
74 """The service that an application uses to manipulate subscriptions and
75 to consume messages from a subscription via the ``Pull`` method or
76 by establishing a bi-directional stream using the ``StreamingPull``
77 method.
78 """
79
80 _client: SubscriberClient
81
82 # Copy defaults from the synchronous client for use here.
83 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
84 DEFAULT_ENDPOINT = SubscriberClient.DEFAULT_ENDPOINT
85 DEFAULT_MTLS_ENDPOINT = SubscriberClient.DEFAULT_MTLS_ENDPOINT
86 _DEFAULT_ENDPOINT_TEMPLATE = SubscriberClient._DEFAULT_ENDPOINT_TEMPLATE
87 _DEFAULT_UNIVERSE = SubscriberClient._DEFAULT_UNIVERSE
88
89 snapshot_path = staticmethod(SubscriberClient.snapshot_path)
90 parse_snapshot_path = staticmethod(SubscriberClient.parse_snapshot_path)
91 subscription_path = staticmethod(SubscriberClient.subscription_path)
92 parse_subscription_path = staticmethod(SubscriberClient.parse_subscription_path)
93 topic_path = staticmethod(SubscriberClient.topic_path)
94 parse_topic_path = staticmethod(SubscriberClient.parse_topic_path)
95 common_billing_account_path = staticmethod(
96 SubscriberClient.common_billing_account_path
97 )
98 parse_common_billing_account_path = staticmethod(
99 SubscriberClient.parse_common_billing_account_path
100 )
101 common_folder_path = staticmethod(SubscriberClient.common_folder_path)
102 parse_common_folder_path = staticmethod(SubscriberClient.parse_common_folder_path)
103 common_organization_path = staticmethod(SubscriberClient.common_organization_path)
104 parse_common_organization_path = staticmethod(
105 SubscriberClient.parse_common_organization_path
106 )
107 common_project_path = staticmethod(SubscriberClient.common_project_path)
108 parse_common_project_path = staticmethod(SubscriberClient.parse_common_project_path)
109 common_location_path = staticmethod(SubscriberClient.common_location_path)
110 parse_common_location_path = staticmethod(
111 SubscriberClient.parse_common_location_path
112 )
113
114 @classmethod
115 def from_service_account_info(cls, info: dict, *args, **kwargs):
116 """Creates an instance of this client using the provided credentials
117 info.
118
119 Args:
120 info (dict): The service account private key info.
121 args: Additional arguments to pass to the constructor.
122 kwargs: Additional arguments to pass to the constructor.
123
124 Returns:
125 SubscriberAsyncClient: The constructed client.
126 """
127 return SubscriberClient.from_service_account_info.__func__(SubscriberAsyncClient, info, *args, **kwargs) # type: ignore
128
129 @classmethod
130 def from_service_account_file(cls, filename: str, *args, **kwargs):
131 """Creates an instance of this client using the provided credentials
132 file.
133
134 Args:
135 filename (str): The path to the service account private key json
136 file.
137 args: Additional arguments to pass to the constructor.
138 kwargs: Additional arguments to pass to the constructor.
139
140 Returns:
141 SubscriberAsyncClient: The constructed client.
142 """
143 return SubscriberClient.from_service_account_file.__func__(SubscriberAsyncClient, filename, *args, **kwargs) # type: ignore
144
145 from_service_account_json = from_service_account_file
146
147 @classmethod
148 def get_mtls_endpoint_and_cert_source(
149 cls, client_options: Optional[ClientOptions] = None
150 ):
151 """Return the API endpoint and client cert source for mutual TLS.
152
153 The client cert source is determined in the following order:
154 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
155 client cert source is None.
156 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
157 default client cert source exists, use the default one; otherwise the client cert
158 source is None.
159
160 The API endpoint is determined in the following order:
161 (1) if `client_options.api_endpoint` if provided, use the provided one.
162 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
163 default mTLS endpoint; if the environment variable is "never", use the default API
164 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
165 use the default API endpoint.
166
167 More details can be found at https://google.aip.dev/auth/4114.
168
169 Args:
170 client_options (google.api_core.client_options.ClientOptions): Custom options for the
171 client. Only the `api_endpoint` and `client_cert_source` properties may be used
172 in this method.
173
174 Returns:
175 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
176 client cert source to use.
177
178 Raises:
179 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
180 """
181 return SubscriberClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore
182
183 @property
184 def transport(self) -> SubscriberTransport:
185 """Returns the transport used by the client instance.
186
187 Returns:
188 SubscriberTransport: The transport used by the client instance.
189 """
190 return self._client.transport
191
192 @property
193 def api_endpoint(self):
194 """Return the API endpoint used by the client instance.
195
196 Returns:
197 str: The API endpoint used by the client instance.
198 """
199 return self._client._api_endpoint
200
201 @property
202 def universe_domain(self) -> str:
203 """Return the universe domain used by the client instance.
204
205 Returns:
206 str: The universe domain used
207 by the client instance.
208 """
209 return self._client._universe_domain
210
211 get_transport_class = SubscriberClient.get_transport_class
212
213 def __init__(
214 self,
215 *,
216 credentials: Optional[ga_credentials.Credentials] = None,
217 transport: Optional[
218 Union[str, SubscriberTransport, Callable[..., SubscriberTransport]]
219 ] = "grpc_asyncio",
220 client_options: Optional[ClientOptions] = None,
221 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
222 ) -> None:
223 """Instantiates the subscriber async client.
224
225 Args:
226 credentials (Optional[google.auth.credentials.Credentials]): The
227 authorization credentials to attach to requests. These
228 credentials identify the application to the service; if none
229 are specified, the client will attempt to ascertain the
230 credentials from the environment.
231 transport (Optional[Union[str,SubscriberTransport,Callable[..., SubscriberTransport]]]):
232 The transport to use, or a Callable that constructs and returns a new transport to use.
233 If a Callable is given, it will be called with the same set of initialization
234 arguments as used in the SubscriberTransport constructor.
235 If set to None, a transport is chosen automatically.
236 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
237 Custom options for the client.
238
239 1. The ``api_endpoint`` property can be used to override the
240 default endpoint provided by the client when ``transport`` is
241 not explicitly provided. Only if this property is not set and
242 ``transport`` was not explicitly provided, the endpoint is
243 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
244 variable, which have one of the following values:
245 "always" (always use the default mTLS endpoint), "never" (always
246 use the default regular endpoint) and "auto" (auto-switch to the
247 default mTLS endpoint if client certificate is present; this is
248 the default value).
249
250 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
251 is "true", then the ``client_cert_source`` property can be used
252 to provide a client certificate for mTLS transport. If
253 not provided, the default SSL client certificate will be used if
254 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
255 set, no client certificate will be used.
256
257 3. The ``universe_domain`` property can be used to override the
258 default "googleapis.com" universe. Note that ``api_endpoint``
259 property still takes precedence; and ``universe_domain`` is
260 currently not supported for mTLS.
261
262 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
263 The client info used to send a user-agent string along with
264 API requests. If ``None``, then default info will be used.
265 Generally, you only need to set this if you're developing
266 your own client library.
267
268 Raises:
269 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
270 creation failed for any reason.
271 """
272 self._client = SubscriberClient(
273 credentials=credentials,
274 transport=transport,
275 client_options=client_options,
276 client_info=client_info,
277 )
278
279 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
280 std_logging.DEBUG
281 ): # pragma: NO COVER
282 _LOGGER.debug(
283 "Created client `google.pubsub_v1.SubscriberAsyncClient`.",
284 extra={
285 "serviceName": "google.pubsub.v1.Subscriber",
286 "universeDomain": getattr(
287 self._client._transport._credentials, "universe_domain", ""
288 ),
289 "credentialsType": f"{type(self._client._transport._credentials).__module__}.{type(self._client._transport._credentials).__qualname__}",
290 "credentialsInfo": getattr(
291 self.transport._credentials, "get_cred_info", lambda: None
292 )(),
293 }
294 if hasattr(self._client._transport, "_credentials")
295 else {
296 "serviceName": "google.pubsub.v1.Subscriber",
297 "credentialsType": None,
298 },
299 )
300
301 async def create_subscription(
302 self,
303 request: Optional[Union[pubsub.Subscription, dict]] = None,
304 *,
305 name: Optional[str] = None,
306 topic: Optional[str] = None,
307 push_config: Optional[pubsub.PushConfig] = None,
308 ack_deadline_seconds: Optional[int] = None,
309 retry: OptionalRetry = gapic_v1.method.DEFAULT,
310 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
311 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
312 ) -> pubsub.Subscription:
313 r"""Creates a subscription to a given topic. See the [resource name
314 rules]
315 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
316 If the subscription already exists, returns ``ALREADY_EXISTS``.
317 If the corresponding topic doesn't exist, returns ``NOT_FOUND``.
318
319 If the name is not provided in the request, the server will
320 assign a random name for this subscription on the same project
321 as the topic, conforming to the [resource name format]
322 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
323 The generated name is populated in the returned Subscription
324 object. Note that for REST API requests, you must specify a name
325 in the request.
326
327 .. code-block:: python
328
329 # This snippet has been automatically generated and should be regarded as a
330 # code template only.
331 # It will require modifications to work:
332 # - It may require correct/in-range values for request initialization.
333 # - It may require specifying regional endpoints when creating the service
334 # client as shown in:
335 # https://googleapis.dev/python/google-api-core/latest/client_options.html
336 from google import pubsub_v1
337
338 async def sample_create_subscription():
339 # Create a client
340 client = pubsub_v1.SubscriberAsyncClient()
341
342 # Initialize request argument(s)
343 request = pubsub_v1.Subscription(
344 name="name_value",
345 topic="topic_value",
346 )
347
348 # Make the request
349 response = await client.create_subscription(request=request)
350
351 # Handle the response
352 print(response)
353
354 Args:
355 request (Optional[Union[google.pubsub_v1.types.Subscription, dict]]):
356 The request object. A subscription resource. If none of ``push_config``,
357 ``bigquery_config``, or ``cloud_storage_config`` is set,
358 then the subscriber will pull and ack messages using API
359 methods. At most one of these fields may be set.
360 name (:class:`str`):
361 Required. The name of the subscription. It must have the
362 format
363 ``"projects/{project}/subscriptions/{subscription}"``.
364 ``{subscription}`` must start with a letter, and contain
365 only letters (``[A-Za-z]``), numbers (``[0-9]``), dashes
366 (``-``), underscores (``_``), periods (``.``), tildes
367 (``~``), plus (``+``) or percent signs (``%``). It must
368 be between 3 and 255 characters in length, and it must
369 not start with ``"goog"``.
370
371 This corresponds to the ``name`` field
372 on the ``request`` instance; if ``request`` is provided, this
373 should not be set.
374 topic (:class:`str`):
375 Required. The name of the topic from which this
376 subscription is receiving messages. Format is
377 ``projects/{project}/topics/{topic}``. The value of this
378 field will be ``_deleted-topic_`` if the topic has been
379 deleted.
380
381 This corresponds to the ``topic`` field
382 on the ``request`` instance; if ``request`` is provided, this
383 should not be set.
384 push_config (:class:`google.pubsub_v1.types.PushConfig`):
385 Optional. If push delivery is used
386 with this subscription, this field is
387 used to configure it.
388
389 This corresponds to the ``push_config`` field
390 on the ``request`` instance; if ``request`` is provided, this
391 should not be set.
392 ack_deadline_seconds (:class:`int`):
393 Optional. The approximate amount of time (on a
394 best-effort basis) Pub/Sub waits for the subscriber to
395 acknowledge receipt before resending the message. In the
396 interval after the message is delivered and before it is
397 acknowledged, it is considered to be *outstanding*.
398 During that time period, the message will not be
399 redelivered (on a best-effort basis).
400
401 For pull subscriptions, this value is used as the
402 initial value for the ack deadline. To override this
403 value for a given message, call ``ModifyAckDeadline``
404 with the corresponding ``ack_id`` if using non-streaming
405 pull or send the ``ack_id`` in a
406 ``StreamingModifyAckDeadlineRequest`` if using streaming
407 pull. The minimum custom deadline you can specify is 10
408 seconds. The maximum custom deadline you can specify is
409 600 seconds (10 minutes). If this parameter is 0, a
410 default value of 10 seconds is used.
411
412 For push delivery, this value is also used to set the
413 request timeout for the call to the push endpoint.
414
415 If the subscriber never acknowledges the message, the
416 Pub/Sub system will eventually redeliver the message.
417
418 This corresponds to the ``ack_deadline_seconds`` field
419 on the ``request`` instance; if ``request`` is provided, this
420 should not be set.
421 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
422 should be retried.
423 timeout (float): The timeout for this request.
424 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
425 sent along with the request as metadata. Normally, each value must be of type `str`,
426 but for metadata keys ending with the suffix `-bin`, the corresponding values must
427 be of type `bytes`.
428
429 Returns:
430 google.pubsub_v1.types.Subscription:
431 A subscription resource. If none of push_config, bigquery_config, or
432 cloud_storage_config is set, then the subscriber will
433 pull and ack messages using API methods. At most one
434 of these fields may be set.
435
436 """
437 # Create or coerce a protobuf request object.
438 # - Quick check: If we got a request object, we should *not* have
439 # gotten any keyword arguments that map to the request.
440 flattened_params = [name, topic, push_config, ack_deadline_seconds]
441 has_flattened_params = (
442 len([param for param in flattened_params if param is not None]) > 0
443 )
444 if request is not None and has_flattened_params:
445 raise ValueError(
446 "If the `request` argument is set, then none of "
447 "the individual field arguments should be set."
448 )
449
450 # - Use the request object if provided (there's no risk of modifying the input as
451 # there are no flattened fields), or create one.
452 if not isinstance(request, pubsub.Subscription):
453 request = pubsub.Subscription(request)
454
455 # If we have keyword arguments corresponding to fields on the
456 # request, apply these.
457 if name is not None:
458 request.name = name
459 if topic is not None:
460 request.topic = topic
461 if push_config is not None:
462 request.push_config = push_config
463 if ack_deadline_seconds is not None:
464 request.ack_deadline_seconds = ack_deadline_seconds
465
466 # Wrap the RPC method; this adds retry and timeout information,
467 # and friendly error handling.
468 rpc = self._client._transport._wrapped_methods[
469 self._client._transport.create_subscription
470 ]
471
472 # Certain fields should be provided within the metadata header;
473 # add these here.
474 metadata = tuple(metadata) + (
475 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
476 )
477
478 # Validate the universe domain.
479 self._client._validate_universe_domain()
480
481 # Send the request.
482 response = await rpc(
483 request,
484 retry=retry,
485 timeout=timeout,
486 metadata=metadata,
487 )
488
489 # Done; return the response.
490 return response
491
492 async def get_subscription(
493 self,
494 request: Optional[Union[pubsub.GetSubscriptionRequest, dict]] = None,
495 *,
496 subscription: Optional[str] = None,
497 retry: OptionalRetry = gapic_v1.method.DEFAULT,
498 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
499 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
500 ) -> pubsub.Subscription:
501 r"""Gets the configuration details of a subscription.
502
503 .. code-block:: python
504
505 # This snippet has been automatically generated and should be regarded as a
506 # code template only.
507 # It will require modifications to work:
508 # - It may require correct/in-range values for request initialization.
509 # - It may require specifying regional endpoints when creating the service
510 # client as shown in:
511 # https://googleapis.dev/python/google-api-core/latest/client_options.html
512 from google import pubsub_v1
513
514 async def sample_get_subscription():
515 # Create a client
516 client = pubsub_v1.SubscriberAsyncClient()
517
518 # Initialize request argument(s)
519 request = pubsub_v1.GetSubscriptionRequest(
520 subscription="subscription_value",
521 )
522
523 # Make the request
524 response = await client.get_subscription(request=request)
525
526 # Handle the response
527 print(response)
528
529 Args:
530 request (Optional[Union[google.pubsub_v1.types.GetSubscriptionRequest, dict]]):
531 The request object. Request for the GetSubscription
532 method.
533 subscription (:class:`str`):
534 Required. The name of the subscription to get. Format is
535 ``projects/{project}/subscriptions/{sub}``.
536
537 This corresponds to the ``subscription`` field
538 on the ``request`` instance; if ``request`` is provided, this
539 should not be set.
540 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
541 should be retried.
542 timeout (float): The timeout for this request.
543 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
544 sent along with the request as metadata. Normally, each value must be of type `str`,
545 but for metadata keys ending with the suffix `-bin`, the corresponding values must
546 be of type `bytes`.
547
548 Returns:
549 google.pubsub_v1.types.Subscription:
550 A subscription resource. If none of push_config, bigquery_config, or
551 cloud_storage_config is set, then the subscriber will
552 pull and ack messages using API methods. At most one
553 of these fields may be set.
554
555 """
556 # Create or coerce a protobuf request object.
557 # - Quick check: If we got a request object, we should *not* have
558 # gotten any keyword arguments that map to the request.
559 flattened_params = [subscription]
560 has_flattened_params = (
561 len([param for param in flattened_params if param is not None]) > 0
562 )
563 if request is not None and has_flattened_params:
564 raise ValueError(
565 "If the `request` argument is set, then none of "
566 "the individual field arguments should be set."
567 )
568
569 # - Use the request object if provided (there's no risk of modifying the input as
570 # there are no flattened fields), or create one.
571 if not isinstance(request, pubsub.GetSubscriptionRequest):
572 request = pubsub.GetSubscriptionRequest(request)
573
574 # If we have keyword arguments corresponding to fields on the
575 # request, apply these.
576 if subscription is not None:
577 request.subscription = subscription
578
579 # Wrap the RPC method; this adds retry and timeout information,
580 # and friendly error handling.
581 rpc = self._client._transport._wrapped_methods[
582 self._client._transport.get_subscription
583 ]
584
585 # Certain fields should be provided within the metadata header;
586 # add these here.
587 metadata = tuple(metadata) + (
588 gapic_v1.routing_header.to_grpc_metadata(
589 (("subscription", request.subscription),)
590 ),
591 )
592
593 # Validate the universe domain.
594 self._client._validate_universe_domain()
595
596 # Send the request.
597 response = await rpc(
598 request,
599 retry=retry,
600 timeout=timeout,
601 metadata=metadata,
602 )
603
604 # Done; return the response.
605 return response
606
607 async def update_subscription(
608 self,
609 request: Optional[Union[pubsub.UpdateSubscriptionRequest, dict]] = None,
610 *,
611 subscription: Optional[pubsub.Subscription] = None,
612 update_mask: Optional[field_mask_pb2.FieldMask] = None,
613 retry: OptionalRetry = gapic_v1.method.DEFAULT,
614 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
615 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
616 ) -> pubsub.Subscription:
617 r"""Updates an existing subscription by updating the
618 fields specified in the update mask. Note that certain
619 properties of a subscription, such as its topic, are not
620 modifiable.
621
622 .. code-block:: python
623
624 # This snippet has been automatically generated and should be regarded as a
625 # code template only.
626 # It will require modifications to work:
627 # - It may require correct/in-range values for request initialization.
628 # - It may require specifying regional endpoints when creating the service
629 # client as shown in:
630 # https://googleapis.dev/python/google-api-core/latest/client_options.html
631 from google import pubsub_v1
632
633 async def sample_update_subscription():
634 # Create a client
635 client = pubsub_v1.SubscriberAsyncClient()
636
637 # Initialize request argument(s)
638 subscription = pubsub_v1.Subscription()
639 subscription.name = "name_value"
640 subscription.topic = "topic_value"
641
642 request = pubsub_v1.UpdateSubscriptionRequest(
643 subscription=subscription,
644 )
645
646 # Make the request
647 response = await client.update_subscription(request=request)
648
649 # Handle the response
650 print(response)
651
652 Args:
653 request (Optional[Union[google.pubsub_v1.types.UpdateSubscriptionRequest, dict]]):
654 The request object. Request for the UpdateSubscription
655 method.
656 subscription (:class:`google.pubsub_v1.types.Subscription`):
657 Required. The updated subscription
658 object.
659
660 This corresponds to the ``subscription`` field
661 on the ``request`` instance; if ``request`` is provided, this
662 should not be set.
663 update_mask (:class:`google.protobuf.field_mask_pb2.FieldMask`):
664 Required. Indicates which fields in
665 the provided subscription to update.
666 Must be specified and non-empty.
667
668 This corresponds to the ``update_mask`` field
669 on the ``request`` instance; if ``request`` is provided, this
670 should not be set.
671 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
672 should be retried.
673 timeout (float): The timeout for this request.
674 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
675 sent along with the request as metadata. Normally, each value must be of type `str`,
676 but for metadata keys ending with the suffix `-bin`, the corresponding values must
677 be of type `bytes`.
678
679 Returns:
680 google.pubsub_v1.types.Subscription:
681 A subscription resource. If none of push_config, bigquery_config, or
682 cloud_storage_config is set, then the subscriber will
683 pull and ack messages using API methods. At most one
684 of these fields may be set.
685
686 """
687 # Create or coerce a protobuf request object.
688 # - Quick check: If we got a request object, we should *not* have
689 # gotten any keyword arguments that map to the request.
690 flattened_params = [subscription, update_mask]
691 has_flattened_params = (
692 len([param for param in flattened_params if param is not None]) > 0
693 )
694 if request is not None and has_flattened_params:
695 raise ValueError(
696 "If the `request` argument is set, then none of "
697 "the individual field arguments should be set."
698 )
699
700 # - Use the request object if provided (there's no risk of modifying the input as
701 # there are no flattened fields), or create one.
702 if not isinstance(request, pubsub.UpdateSubscriptionRequest):
703 request = pubsub.UpdateSubscriptionRequest(request)
704
705 # If we have keyword arguments corresponding to fields on the
706 # request, apply these.
707 if subscription is not None:
708 request.subscription = subscription
709 if update_mask is not None:
710 request.update_mask = update_mask
711
712 # Wrap the RPC method; this adds retry and timeout information,
713 # and friendly error handling.
714 rpc = self._client._transport._wrapped_methods[
715 self._client._transport.update_subscription
716 ]
717
718 # Certain fields should be provided within the metadata header;
719 # add these here.
720 metadata = tuple(metadata) + (
721 gapic_v1.routing_header.to_grpc_metadata(
722 (("subscription.name", request.subscription.name),)
723 ),
724 )
725
726 # Validate the universe domain.
727 self._client._validate_universe_domain()
728
729 # Send the request.
730 response = await rpc(
731 request,
732 retry=retry,
733 timeout=timeout,
734 metadata=metadata,
735 )
736
737 # Done; return the response.
738 return response
739
740 async def list_subscriptions(
741 self,
742 request: Optional[Union[pubsub.ListSubscriptionsRequest, dict]] = None,
743 *,
744 project: Optional[str] = None,
745 retry: OptionalRetry = gapic_v1.method.DEFAULT,
746 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
747 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
748 ) -> pagers.ListSubscriptionsAsyncPager:
749 r"""Lists matching subscriptions.
750
751 .. code-block:: python
752
753 # This snippet has been automatically generated and should be regarded as a
754 # code template only.
755 # It will require modifications to work:
756 # - It may require correct/in-range values for request initialization.
757 # - It may require specifying regional endpoints when creating the service
758 # client as shown in:
759 # https://googleapis.dev/python/google-api-core/latest/client_options.html
760 from google import pubsub_v1
761
762 async def sample_list_subscriptions():
763 # Create a client
764 client = pubsub_v1.SubscriberAsyncClient()
765
766 # Initialize request argument(s)
767 request = pubsub_v1.ListSubscriptionsRequest(
768 project="project_value",
769 )
770
771 # Make the request
772 page_result = client.list_subscriptions(request=request)
773
774 # Handle the response
775 async for response in page_result:
776 print(response)
777
778 Args:
779 request (Optional[Union[google.pubsub_v1.types.ListSubscriptionsRequest, dict]]):
780 The request object. Request for the ``ListSubscriptions`` method.
781 project (:class:`str`):
782 Required. The name of the project in which to list
783 subscriptions. Format is ``projects/{project-id}``.
784
785 This corresponds to the ``project`` field
786 on the ``request`` instance; if ``request`` is provided, this
787 should not be set.
788 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
789 should be retried.
790 timeout (float): The timeout for this request.
791 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
792 sent along with the request as metadata. Normally, each value must be of type `str`,
793 but for metadata keys ending with the suffix `-bin`, the corresponding values must
794 be of type `bytes`.
795
796 Returns:
797 google.pubsub_v1.services.subscriber.pagers.ListSubscriptionsAsyncPager:
798 Response for the ListSubscriptions method.
799
800 Iterating over this object will yield results and
801 resolve additional pages automatically.
802
803 """
804 # Create or coerce a protobuf request object.
805 # - Quick check: If we got a request object, we should *not* have
806 # gotten any keyword arguments that map to the request.
807 flattened_params = [project]
808 has_flattened_params = (
809 len([param for param in flattened_params if param is not None]) > 0
810 )
811 if request is not None and has_flattened_params:
812 raise ValueError(
813 "If the `request` argument is set, then none of "
814 "the individual field arguments should be set."
815 )
816
817 # - Use the request object if provided (there's no risk of modifying the input as
818 # there are no flattened fields), or create one.
819 if not isinstance(request, pubsub.ListSubscriptionsRequest):
820 request = pubsub.ListSubscriptionsRequest(request)
821
822 # If we have keyword arguments corresponding to fields on the
823 # request, apply these.
824 if project is not None:
825 request.project = project
826
827 # Wrap the RPC method; this adds retry and timeout information,
828 # and friendly error handling.
829 rpc = self._client._transport._wrapped_methods[
830 self._client._transport.list_subscriptions
831 ]
832
833 # Certain fields should be provided within the metadata header;
834 # add these here.
835 metadata = tuple(metadata) + (
836 gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)),
837 )
838
839 # Validate the universe domain.
840 self._client._validate_universe_domain()
841
842 # Send the request.
843 response = await rpc(
844 request,
845 retry=retry,
846 timeout=timeout,
847 metadata=metadata,
848 )
849
850 # This method is paged; wrap the response in a pager, which provides
851 # an `__aiter__` convenience method.
852 response = pagers.ListSubscriptionsAsyncPager(
853 method=rpc,
854 request=request,
855 response=response,
856 retry=retry,
857 timeout=timeout,
858 metadata=metadata,
859 )
860
861 # Done; return the response.
862 return response
863
864 async def delete_subscription(
865 self,
866 request: Optional[Union[pubsub.DeleteSubscriptionRequest, dict]] = None,
867 *,
868 subscription: Optional[str] = None,
869 retry: OptionalRetry = gapic_v1.method.DEFAULT,
870 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
871 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
872 ) -> None:
873 r"""Deletes an existing subscription. All messages retained in the
874 subscription are immediately dropped. Calls to ``Pull`` after
875 deletion will return ``NOT_FOUND``. After a subscription is
876 deleted, a new one may be created with the same name, but the
877 new one has no association with the old subscription or its
878 topic unless the same topic is specified.
879
880 .. code-block:: python
881
882 # This snippet has been automatically generated and should be regarded as a
883 # code template only.
884 # It will require modifications to work:
885 # - It may require correct/in-range values for request initialization.
886 # - It may require specifying regional endpoints when creating the service
887 # client as shown in:
888 # https://googleapis.dev/python/google-api-core/latest/client_options.html
889 from google import pubsub_v1
890
891 async def sample_delete_subscription():
892 # Create a client
893 client = pubsub_v1.SubscriberAsyncClient()
894
895 # Initialize request argument(s)
896 request = pubsub_v1.DeleteSubscriptionRequest(
897 subscription="subscription_value",
898 )
899
900 # Make the request
901 await client.delete_subscription(request=request)
902
903 Args:
904 request (Optional[Union[google.pubsub_v1.types.DeleteSubscriptionRequest, dict]]):
905 The request object. Request for the DeleteSubscription
906 method.
907 subscription (:class:`str`):
908 Required. The subscription to delete. Format is
909 ``projects/{project}/subscriptions/{sub}``.
910
911 This corresponds to the ``subscription`` field
912 on the ``request`` instance; if ``request`` is provided, this
913 should not be set.
914 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
915 should be retried.
916 timeout (float): The timeout for this request.
917 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
918 sent along with the request as metadata. Normally, each value must be of type `str`,
919 but for metadata keys ending with the suffix `-bin`, the corresponding values must
920 be of type `bytes`.
921 """
922 # Create or coerce a protobuf request object.
923 # - Quick check: If we got a request object, we should *not* have
924 # gotten any keyword arguments that map to the request.
925 flattened_params = [subscription]
926 has_flattened_params = (
927 len([param for param in flattened_params if param is not None]) > 0
928 )
929 if request is not None and has_flattened_params:
930 raise ValueError(
931 "If the `request` argument is set, then none of "
932 "the individual field arguments should be set."
933 )
934
935 # - Use the request object if provided (there's no risk of modifying the input as
936 # there are no flattened fields), or create one.
937 if not isinstance(request, pubsub.DeleteSubscriptionRequest):
938 request = pubsub.DeleteSubscriptionRequest(request)
939
940 # If we have keyword arguments corresponding to fields on the
941 # request, apply these.
942 if subscription is not None:
943 request.subscription = subscription
944
945 # Wrap the RPC method; this adds retry and timeout information,
946 # and friendly error handling.
947 rpc = self._client._transport._wrapped_methods[
948 self._client._transport.delete_subscription
949 ]
950
951 # Certain fields should be provided within the metadata header;
952 # add these here.
953 metadata = tuple(metadata) + (
954 gapic_v1.routing_header.to_grpc_metadata(
955 (("subscription", request.subscription),)
956 ),
957 )
958
959 # Validate the universe domain.
960 self._client._validate_universe_domain()
961
962 # Send the request.
963 await rpc(
964 request,
965 retry=retry,
966 timeout=timeout,
967 metadata=metadata,
968 )
969
970 async def modify_ack_deadline(
971 self,
972 request: Optional[Union[pubsub.ModifyAckDeadlineRequest, dict]] = None,
973 *,
974 subscription: Optional[str] = None,
975 ack_ids: Optional[MutableSequence[str]] = None,
976 ack_deadline_seconds: Optional[int] = None,
977 retry: OptionalRetry = gapic_v1.method.DEFAULT,
978 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
979 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
980 ) -> None:
981 r"""Modifies the ack deadline for a specific message. This method is
982 useful to indicate that more time is needed to process a message
983 by the subscriber, or to make the message available for
984 redelivery if the processing was interrupted. Note that this
985 does not modify the subscription-level ``ackDeadlineSeconds``
986 used for subsequent messages.
987
988 .. code-block:: python
989
990 # This snippet has been automatically generated and should be regarded as a
991 # code template only.
992 # It will require modifications to work:
993 # - It may require correct/in-range values for request initialization.
994 # - It may require specifying regional endpoints when creating the service
995 # client as shown in:
996 # https://googleapis.dev/python/google-api-core/latest/client_options.html
997 from google import pubsub_v1
998
999 async def sample_modify_ack_deadline():
1000 # Create a client
1001 client = pubsub_v1.SubscriberAsyncClient()
1002
1003 # Initialize request argument(s)
1004 request = pubsub_v1.ModifyAckDeadlineRequest(
1005 subscription="subscription_value",
1006 ack_ids=['ack_ids_value1', 'ack_ids_value2'],
1007 ack_deadline_seconds=2066,
1008 )
1009
1010 # Make the request
1011 await client.modify_ack_deadline(request=request)
1012
1013 Args:
1014 request (Optional[Union[google.pubsub_v1.types.ModifyAckDeadlineRequest, dict]]):
1015 The request object. Request for the ModifyAckDeadline
1016 method.
1017 subscription (:class:`str`):
1018 Required. The name of the subscription. Format is
1019 ``projects/{project}/subscriptions/{sub}``.
1020
1021 This corresponds to the ``subscription`` field
1022 on the ``request`` instance; if ``request`` is provided, this
1023 should not be set.
1024 ack_ids (:class:`MutableSequence[str]`):
1025 Required. List of acknowledgment IDs.
1026 This corresponds to the ``ack_ids`` field
1027 on the ``request`` instance; if ``request`` is provided, this
1028 should not be set.
1029 ack_deadline_seconds (:class:`int`):
1030 Required. The new ack deadline with respect to the time
1031 this request was sent to the Pub/Sub system. For
1032 example, if the value is 10, the new ack deadline will
1033 expire 10 seconds after the ``ModifyAckDeadline`` call
1034 was made. Specifying zero might immediately make the
1035 message available for delivery to another subscriber
1036 client. This typically results in an increase in the
1037 rate of message redeliveries (that is, duplicates). The
1038 minimum deadline you can specify is 0 seconds. The
1039 maximum deadline you can specify in a single request is
1040 600 seconds (10 minutes).
1041
1042 This corresponds to the ``ack_deadline_seconds`` field
1043 on the ``request`` instance; if ``request`` is provided, this
1044 should not be set.
1045 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
1046 should be retried.
1047 timeout (float): The timeout for this request.
1048 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1049 sent along with the request as metadata. Normally, each value must be of type `str`,
1050 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1051 be of type `bytes`.
1052 """
1053 # Create or coerce a protobuf request object.
1054 # - Quick check: If we got a request object, we should *not* have
1055 # gotten any keyword arguments that map to the request.
1056 flattened_params = [subscription, ack_ids, ack_deadline_seconds]
1057 has_flattened_params = (
1058 len([param for param in flattened_params if param is not None]) > 0
1059 )
1060 if request is not None and has_flattened_params:
1061 raise ValueError(
1062 "If the `request` argument is set, then none of "
1063 "the individual field arguments should be set."
1064 )
1065
1066 # - Use the request object if provided (there's no risk of modifying the input as
1067 # there are no flattened fields), or create one.
1068 if not isinstance(request, pubsub.ModifyAckDeadlineRequest):
1069 request = pubsub.ModifyAckDeadlineRequest(request)
1070
1071 # If we have keyword arguments corresponding to fields on the
1072 # request, apply these.
1073 if subscription is not None:
1074 request.subscription = subscription
1075 if ack_deadline_seconds is not None:
1076 request.ack_deadline_seconds = ack_deadline_seconds
1077 if ack_ids:
1078 request.ack_ids.extend(ack_ids)
1079
1080 # Wrap the RPC method; this adds retry and timeout information,
1081 # and friendly error handling.
1082 rpc = self._client._transport._wrapped_methods[
1083 self._client._transport.modify_ack_deadline
1084 ]
1085
1086 # Certain fields should be provided within the metadata header;
1087 # add these here.
1088 metadata = tuple(metadata) + (
1089 gapic_v1.routing_header.to_grpc_metadata(
1090 (("subscription", request.subscription),)
1091 ),
1092 )
1093
1094 # Validate the universe domain.
1095 self._client._validate_universe_domain()
1096
1097 # Send the request.
1098 await rpc(
1099 request,
1100 retry=retry,
1101 timeout=timeout,
1102 metadata=metadata,
1103 )
1104
1105 async def acknowledge(
1106 self,
1107 request: Optional[Union[pubsub.AcknowledgeRequest, dict]] = None,
1108 *,
1109 subscription: Optional[str] = None,
1110 ack_ids: Optional[MutableSequence[str]] = None,
1111 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1112 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1113 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1114 ) -> None:
1115 r"""Acknowledges the messages associated with the ``ack_ids`` in the
1116 ``AcknowledgeRequest``. The Pub/Sub system can remove the
1117 relevant messages from the subscription.
1118
1119 Acknowledging a message whose ack deadline has expired may
1120 succeed, but such a message may be redelivered later.
1121 Acknowledging a message more than once will not result in an
1122 error.
1123
1124 .. code-block:: python
1125
1126 # This snippet has been automatically generated and should be regarded as a
1127 # code template only.
1128 # It will require modifications to work:
1129 # - It may require correct/in-range values for request initialization.
1130 # - It may require specifying regional endpoints when creating the service
1131 # client as shown in:
1132 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1133 from google import pubsub_v1
1134
1135 async def sample_acknowledge():
1136 # Create a client
1137 client = pubsub_v1.SubscriberAsyncClient()
1138
1139 # Initialize request argument(s)
1140 request = pubsub_v1.AcknowledgeRequest(
1141 subscription="subscription_value",
1142 ack_ids=['ack_ids_value1', 'ack_ids_value2'],
1143 )
1144
1145 # Make the request
1146 await client.acknowledge(request=request)
1147
1148 Args:
1149 request (Optional[Union[google.pubsub_v1.types.AcknowledgeRequest, dict]]):
1150 The request object. Request for the Acknowledge method.
1151 subscription (:class:`str`):
1152 Required. The subscription whose message is being
1153 acknowledged. Format is
1154 ``projects/{project}/subscriptions/{sub}``.
1155
1156 This corresponds to the ``subscription`` field
1157 on the ``request`` instance; if ``request`` is provided, this
1158 should not be set.
1159 ack_ids (:class:`MutableSequence[str]`):
1160 Required. The acknowledgment ID for the messages being
1161 acknowledged that was returned by the Pub/Sub system in
1162 the ``Pull`` response. Must not be empty.
1163
1164 This corresponds to the ``ack_ids`` field
1165 on the ``request`` instance; if ``request`` is provided, this
1166 should not be set.
1167 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
1168 should be retried.
1169 timeout (float): The timeout for this request.
1170 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1171 sent along with the request as metadata. Normally, each value must be of type `str`,
1172 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1173 be of type `bytes`.
1174 """
1175 # Create or coerce a protobuf request object.
1176 # - Quick check: If we got a request object, we should *not* have
1177 # gotten any keyword arguments that map to the request.
1178 flattened_params = [subscription, ack_ids]
1179 has_flattened_params = (
1180 len([param for param in flattened_params if param is not None]) > 0
1181 )
1182 if request is not None and has_flattened_params:
1183 raise ValueError(
1184 "If the `request` argument is set, then none of "
1185 "the individual field arguments should be set."
1186 )
1187
1188 # - Use the request object if provided (there's no risk of modifying the input as
1189 # there are no flattened fields), or create one.
1190 if not isinstance(request, pubsub.AcknowledgeRequest):
1191 request = pubsub.AcknowledgeRequest(request)
1192
1193 # If we have keyword arguments corresponding to fields on the
1194 # request, apply these.
1195 if subscription is not None:
1196 request.subscription = subscription
1197 if ack_ids:
1198 request.ack_ids.extend(ack_ids)
1199
1200 # Wrap the RPC method; this adds retry and timeout information,
1201 # and friendly error handling.
1202 rpc = self._client._transport._wrapped_methods[
1203 self._client._transport.acknowledge
1204 ]
1205
1206 # Certain fields should be provided within the metadata header;
1207 # add these here.
1208 metadata = tuple(metadata) + (
1209 gapic_v1.routing_header.to_grpc_metadata(
1210 (("subscription", request.subscription),)
1211 ),
1212 )
1213
1214 # Validate the universe domain.
1215 self._client._validate_universe_domain()
1216
1217 # Send the request.
1218 await rpc(
1219 request,
1220 retry=retry,
1221 timeout=timeout,
1222 metadata=metadata,
1223 )
1224
1225 async def pull(
1226 self,
1227 request: Optional[Union[pubsub.PullRequest, dict]] = None,
1228 *,
1229 subscription: Optional[str] = None,
1230 return_immediately: Optional[bool] = None,
1231 max_messages: Optional[int] = None,
1232 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1233 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1234 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1235 ) -> pubsub.PullResponse:
1236 r"""Pulls messages from the server.
1237
1238 .. code-block:: python
1239
1240 # This snippet has been automatically generated and should be regarded as a
1241 # code template only.
1242 # It will require modifications to work:
1243 # - It may require correct/in-range values for request initialization.
1244 # - It may require specifying regional endpoints when creating the service
1245 # client as shown in:
1246 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1247 from google import pubsub_v1
1248
1249 async def sample_pull():
1250 # Create a client
1251 client = pubsub_v1.SubscriberAsyncClient()
1252
1253 # Initialize request argument(s)
1254 request = pubsub_v1.PullRequest(
1255 subscription="subscription_value",
1256 max_messages=1277,
1257 )
1258
1259 # Make the request
1260 response = await client.pull(request=request)
1261
1262 # Handle the response
1263 print(response)
1264
1265 Args:
1266 request (Optional[Union[google.pubsub_v1.types.PullRequest, dict]]):
1267 The request object. Request for the ``Pull`` method.
1268 subscription (:class:`str`):
1269 Required. The subscription from which messages should be
1270 pulled. Format is
1271 ``projects/{project}/subscriptions/{sub}``.
1272
1273 This corresponds to the ``subscription`` field
1274 on the ``request`` instance; if ``request`` is provided, this
1275 should not be set.
1276 return_immediately (:class:`bool`):
1277 Optional. If this field set to true, the system will
1278 respond immediately even if it there are no messages
1279 available to return in the ``Pull`` response. Otherwise,
1280 the system may wait (for a bounded amount of time) until
1281 at least one message is available, rather than returning
1282 no messages. Warning: setting this field to ``true`` is
1283 discouraged because it adversely impacts the performance
1284 of ``Pull`` operations. We recommend that users do not
1285 set this field.
1286
1287 This corresponds to the ``return_immediately`` field
1288 on the ``request`` instance; if ``request`` is provided, this
1289 should not be set.
1290 max_messages (:class:`int`):
1291 Required. The maximum number of
1292 messages to return for this request.
1293 Must be a positive integer. The Pub/Sub
1294 system may return fewer than the number
1295 specified.
1296
1297 This corresponds to the ``max_messages`` field
1298 on the ``request`` instance; if ``request`` is provided, this
1299 should not be set.
1300 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
1301 should be retried.
1302 timeout (float): The timeout for this request.
1303 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1304 sent along with the request as metadata. Normally, each value must be of type `str`,
1305 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1306 be of type `bytes`.
1307
1308 Returns:
1309 google.pubsub_v1.types.PullResponse:
1310 Response for the Pull method.
1311 """
1312 # Create or coerce a protobuf request object.
1313 # - Quick check: If we got a request object, we should *not* have
1314 # gotten any keyword arguments that map to the request.
1315 flattened_params = [subscription, return_immediately, max_messages]
1316 has_flattened_params = (
1317 len([param for param in flattened_params if param is not None]) > 0
1318 )
1319 if request is not None and has_flattened_params:
1320 raise ValueError(
1321 "If the `request` argument is set, then none of "
1322 "the individual field arguments should be set."
1323 )
1324
1325 # - Use the request object if provided (there's no risk of modifying the input as
1326 # there are no flattened fields), or create one.
1327 if not isinstance(request, pubsub.PullRequest):
1328 request = pubsub.PullRequest(request)
1329
1330 # If we have keyword arguments corresponding to fields on the
1331 # request, apply these.
1332 if subscription is not None:
1333 request.subscription = subscription
1334 if return_immediately is not None:
1335 request.return_immediately = return_immediately
1336 if max_messages is not None:
1337 request.max_messages = max_messages
1338
1339 if request.return_immediately:
1340 warnings.warn(
1341 "The return_immediately flag is deprecated and should be set to False.",
1342 category=DeprecationWarning,
1343 )
1344
1345 # Wrap the RPC method; this adds retry and timeout information,
1346 # and friendly error handling.
1347 rpc = self._client._transport._wrapped_methods[self._client._transport.pull]
1348
1349 # Certain fields should be provided within the metadata header;
1350 # add these here.
1351 metadata = tuple(metadata) + (
1352 gapic_v1.routing_header.to_grpc_metadata(
1353 (("subscription", request.subscription),)
1354 ),
1355 )
1356
1357 # Validate the universe domain.
1358 self._client._validate_universe_domain()
1359
1360 # Send the request.
1361 response = await rpc(
1362 request,
1363 retry=retry,
1364 timeout=timeout,
1365 metadata=metadata,
1366 )
1367
1368 # Done; return the response.
1369 return response
1370
1371 def streaming_pull(
1372 self,
1373 requests: Optional[AsyncIterator[pubsub.StreamingPullRequest]] = None,
1374 *,
1375 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1376 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1377 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1378 ) -> Awaitable[AsyncIterable[pubsub.StreamingPullResponse]]:
1379 r"""Establishes a stream with the server, which sends messages down
1380 to the client. The client streams acknowledgments and ack
1381 deadline modifications back to the server. The server will close
1382 the stream and return the status on any error. The server may
1383 close the stream with status ``UNAVAILABLE`` to reassign
1384 server-side resources, in which case, the client should
1385 re-establish the stream. Flow control can be achieved by
1386 configuring the underlying RPC channel.
1387
1388 .. code-block:: python
1389
1390 # This snippet has been automatically generated and should be regarded as a
1391 # code template only.
1392 # It will require modifications to work:
1393 # - It may require correct/in-range values for request initialization.
1394 # - It may require specifying regional endpoints when creating the service
1395 # client as shown in:
1396 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1397 from google import pubsub_v1
1398
1399 async def sample_streaming_pull():
1400 # Create a client
1401 client = pubsub_v1.SubscriberAsyncClient()
1402
1403 # Initialize request argument(s)
1404 request = pubsub_v1.StreamingPullRequest(
1405 subscription="subscription_value",
1406 stream_ack_deadline_seconds=2813,
1407 )
1408
1409 # This method expects an iterator which contains
1410 # 'pubsub_v1.StreamingPullRequest' objects
1411 # Here we create a generator that yields a single `request` for
1412 # demonstrative purposes.
1413 requests = [request]
1414
1415 def request_generator():
1416 for request in requests:
1417 yield request
1418
1419 # Make the request
1420 stream = await client.streaming_pull(requests=request_generator())
1421
1422 # Handle the response
1423 async for response in stream:
1424 print(response)
1425
1426 Args:
1427 requests (AsyncIterator[`google.pubsub_v1.types.StreamingPullRequest`]):
1428 The request object AsyncIterator. Request for the ``StreamingPull`` streaming RPC method.
1429 This request is used to establish the initial stream as
1430 well as to stream acknowledgments and ack deadline
1431 modifications from the client to the server.
1432 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
1433 should be retried.
1434 timeout (float): The timeout for this request.
1435 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1436 sent along with the request as metadata. Normally, each value must be of type `str`,
1437 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1438 be of type `bytes`.
1439
1440 Returns:
1441 AsyncIterable[google.pubsub_v1.types.StreamingPullResponse]:
1442 Response for the StreamingPull method. This response is used to stream
1443 messages from the server to the client.
1444
1445 """
1446
1447 # Wrap the RPC method; this adds retry and timeout information,
1448 # and friendly error handling.
1449 rpc = self._client._transport._wrapped_methods[
1450 self._client._transport.streaming_pull
1451 ]
1452
1453 # Validate the universe domain.
1454 self._client._validate_universe_domain()
1455
1456 # Send the request.
1457 response = rpc(
1458 requests,
1459 retry=retry,
1460 timeout=timeout,
1461 metadata=metadata,
1462 )
1463
1464 # Done; return the response.
1465 return response
1466
1467 async def modify_push_config(
1468 self,
1469 request: Optional[Union[pubsub.ModifyPushConfigRequest, dict]] = None,
1470 *,
1471 subscription: Optional[str] = None,
1472 push_config: Optional[pubsub.PushConfig] = None,
1473 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1474 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1475 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1476 ) -> None:
1477 r"""Modifies the ``PushConfig`` for a specified subscription.
1478
1479 This may be used to change a push subscription to a pull one
1480 (signified by an empty ``PushConfig``) or vice versa, or change
1481 the endpoint URL and other attributes of a push subscription.
1482 Messages will accumulate for delivery continuously through the
1483 call regardless of changes to the ``PushConfig``.
1484
1485 .. code-block:: python
1486
1487 # This snippet has been automatically generated and should be regarded as a
1488 # code template only.
1489 # It will require modifications to work:
1490 # - It may require correct/in-range values for request initialization.
1491 # - It may require specifying regional endpoints when creating the service
1492 # client as shown in:
1493 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1494 from google import pubsub_v1
1495
1496 async def sample_modify_push_config():
1497 # Create a client
1498 client = pubsub_v1.SubscriberAsyncClient()
1499
1500 # Initialize request argument(s)
1501 request = pubsub_v1.ModifyPushConfigRequest(
1502 subscription="subscription_value",
1503 )
1504
1505 # Make the request
1506 await client.modify_push_config(request=request)
1507
1508 Args:
1509 request (Optional[Union[google.pubsub_v1.types.ModifyPushConfigRequest, dict]]):
1510 The request object. Request for the ModifyPushConfig
1511 method.
1512 subscription (:class:`str`):
1513 Required. The name of the subscription. Format is
1514 ``projects/{project}/subscriptions/{sub}``.
1515
1516 This corresponds to the ``subscription`` field
1517 on the ``request`` instance; if ``request`` is provided, this
1518 should not be set.
1519 push_config (:class:`google.pubsub_v1.types.PushConfig`):
1520 Required. The push configuration for future deliveries.
1521
1522 An empty ``pushConfig`` indicates that the Pub/Sub
1523 system should stop pushing messages from the given
1524 subscription and allow messages to be pulled and
1525 acknowledged - effectively pausing the subscription if
1526 ``Pull`` or ``StreamingPull`` is not called.
1527
1528 This corresponds to the ``push_config`` field
1529 on the ``request`` instance; if ``request`` is provided, this
1530 should not be set.
1531 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
1532 should be retried.
1533 timeout (float): The timeout for this request.
1534 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1535 sent along with the request as metadata. Normally, each value must be of type `str`,
1536 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1537 be of type `bytes`.
1538 """
1539 # Create or coerce a protobuf request object.
1540 # - Quick check: If we got a request object, we should *not* have
1541 # gotten any keyword arguments that map to the request.
1542 flattened_params = [subscription, push_config]
1543 has_flattened_params = (
1544 len([param for param in flattened_params if param is not None]) > 0
1545 )
1546 if request is not None and has_flattened_params:
1547 raise ValueError(
1548 "If the `request` argument is set, then none of "
1549 "the individual field arguments should be set."
1550 )
1551
1552 # - Use the request object if provided (there's no risk of modifying the input as
1553 # there are no flattened fields), or create one.
1554 if not isinstance(request, pubsub.ModifyPushConfigRequest):
1555 request = pubsub.ModifyPushConfigRequest(request)
1556
1557 # If we have keyword arguments corresponding to fields on the
1558 # request, apply these.
1559 if subscription is not None:
1560 request.subscription = subscription
1561 if push_config is not None:
1562 request.push_config = push_config
1563
1564 # Wrap the RPC method; this adds retry and timeout information,
1565 # and friendly error handling.
1566 rpc = self._client._transport._wrapped_methods[
1567 self._client._transport.modify_push_config
1568 ]
1569
1570 # Certain fields should be provided within the metadata header;
1571 # add these here.
1572 metadata = tuple(metadata) + (
1573 gapic_v1.routing_header.to_grpc_metadata(
1574 (("subscription", request.subscription),)
1575 ),
1576 )
1577
1578 # Validate the universe domain.
1579 self._client._validate_universe_domain()
1580
1581 # Send the request.
1582 await rpc(
1583 request,
1584 retry=retry,
1585 timeout=timeout,
1586 metadata=metadata,
1587 )
1588
1589 async def get_snapshot(
1590 self,
1591 request: Optional[Union[pubsub.GetSnapshotRequest, dict]] = None,
1592 *,
1593 snapshot: Optional[str] = None,
1594 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1595 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1596 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1597 ) -> pubsub.Snapshot:
1598 r"""Gets the configuration details of a snapshot. Snapshots are used
1599 in
1600 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
1601 operations, which allow you to manage message acknowledgments in
1602 bulk. That is, you can set the acknowledgment state of messages
1603 in an existing subscription to the state captured by a snapshot.
1604
1605 .. code-block:: python
1606
1607 # This snippet has been automatically generated and should be regarded as a
1608 # code template only.
1609 # It will require modifications to work:
1610 # - It may require correct/in-range values for request initialization.
1611 # - It may require specifying regional endpoints when creating the service
1612 # client as shown in:
1613 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1614 from google import pubsub_v1
1615
1616 async def sample_get_snapshot():
1617 # Create a client
1618 client = pubsub_v1.SubscriberAsyncClient()
1619
1620 # Initialize request argument(s)
1621 request = pubsub_v1.GetSnapshotRequest(
1622 snapshot="snapshot_value",
1623 )
1624
1625 # Make the request
1626 response = await client.get_snapshot(request=request)
1627
1628 # Handle the response
1629 print(response)
1630
1631 Args:
1632 request (Optional[Union[google.pubsub_v1.types.GetSnapshotRequest, dict]]):
1633 The request object. Request for the GetSnapshot method.
1634 snapshot (:class:`str`):
1635 Required. The name of the snapshot to get. Format is
1636 ``projects/{project}/snapshots/{snap}``.
1637
1638 This corresponds to the ``snapshot`` field
1639 on the ``request`` instance; if ``request`` is provided, this
1640 should not be set.
1641 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
1642 should be retried.
1643 timeout (float): The timeout for this request.
1644 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1645 sent along with the request as metadata. Normally, each value must be of type `str`,
1646 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1647 be of type `bytes`.
1648
1649 Returns:
1650 google.pubsub_v1.types.Snapshot:
1651 A snapshot resource. Snapshots are used in
1652 [Seek](https://cloud.google.com/pubsub/docs/replay-overview)
1653 operations, which allow you to manage message
1654 acknowledgments in bulk. That is, you can set the
1655 acknowledgment state of messages in an existing
1656 subscription to the state captured by a snapshot.
1657
1658 """
1659 # Create or coerce a protobuf request object.
1660 # - Quick check: If we got a request object, we should *not* have
1661 # gotten any keyword arguments that map to the request.
1662 flattened_params = [snapshot]
1663 has_flattened_params = (
1664 len([param for param in flattened_params if param is not None]) > 0
1665 )
1666 if request is not None and has_flattened_params:
1667 raise ValueError(
1668 "If the `request` argument is set, then none of "
1669 "the individual field arguments should be set."
1670 )
1671
1672 # - Use the request object if provided (there's no risk of modifying the input as
1673 # there are no flattened fields), or create one.
1674 if not isinstance(request, pubsub.GetSnapshotRequest):
1675 request = pubsub.GetSnapshotRequest(request)
1676
1677 # If we have keyword arguments corresponding to fields on the
1678 # request, apply these.
1679 if snapshot is not None:
1680 request.snapshot = snapshot
1681
1682 # Wrap the RPC method; this adds retry and timeout information,
1683 # and friendly error handling.
1684 rpc = self._client._transport._wrapped_methods[
1685 self._client._transport.get_snapshot
1686 ]
1687
1688 # Certain fields should be provided within the metadata header;
1689 # add these here.
1690 metadata = tuple(metadata) + (
1691 gapic_v1.routing_header.to_grpc_metadata((("snapshot", request.snapshot),)),
1692 )
1693
1694 # Validate the universe domain.
1695 self._client._validate_universe_domain()
1696
1697 # Send the request.
1698 response = await rpc(
1699 request,
1700 retry=retry,
1701 timeout=timeout,
1702 metadata=metadata,
1703 )
1704
1705 # Done; return the response.
1706 return response
1707
1708 async def list_snapshots(
1709 self,
1710 request: Optional[Union[pubsub.ListSnapshotsRequest, dict]] = None,
1711 *,
1712 project: Optional[str] = None,
1713 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1714 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1715 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1716 ) -> pagers.ListSnapshotsAsyncPager:
1717 r"""Lists the existing snapshots. Snapshots are used in
1718 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
1719 operations, which allow you to manage message acknowledgments in
1720 bulk. That is, you can set the acknowledgment state of messages
1721 in an existing subscription to the state captured by a snapshot.
1722
1723 .. code-block:: python
1724
1725 # This snippet has been automatically generated and should be regarded as a
1726 # code template only.
1727 # It will require modifications to work:
1728 # - It may require correct/in-range values for request initialization.
1729 # - It may require specifying regional endpoints when creating the service
1730 # client as shown in:
1731 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1732 from google import pubsub_v1
1733
1734 async def sample_list_snapshots():
1735 # Create a client
1736 client = pubsub_v1.SubscriberAsyncClient()
1737
1738 # Initialize request argument(s)
1739 request = pubsub_v1.ListSnapshotsRequest(
1740 project="project_value",
1741 )
1742
1743 # Make the request
1744 page_result = client.list_snapshots(request=request)
1745
1746 # Handle the response
1747 async for response in page_result:
1748 print(response)
1749
1750 Args:
1751 request (Optional[Union[google.pubsub_v1.types.ListSnapshotsRequest, dict]]):
1752 The request object. Request for the ``ListSnapshots`` method.
1753 project (:class:`str`):
1754 Required. The name of the project in which to list
1755 snapshots. Format is ``projects/{project-id}``.
1756
1757 This corresponds to the ``project`` field
1758 on the ``request`` instance; if ``request`` is provided, this
1759 should not be set.
1760 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
1761 should be retried.
1762 timeout (float): The timeout for this request.
1763 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1764 sent along with the request as metadata. Normally, each value must be of type `str`,
1765 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1766 be of type `bytes`.
1767
1768 Returns:
1769 google.pubsub_v1.services.subscriber.pagers.ListSnapshotsAsyncPager:
1770 Response for the ListSnapshots method.
1771
1772 Iterating over this object will yield results and
1773 resolve additional pages automatically.
1774
1775 """
1776 # Create or coerce a protobuf request object.
1777 # - Quick check: If we got a request object, we should *not* have
1778 # gotten any keyword arguments that map to the request.
1779 flattened_params = [project]
1780 has_flattened_params = (
1781 len([param for param in flattened_params if param is not None]) > 0
1782 )
1783 if request is not None and has_flattened_params:
1784 raise ValueError(
1785 "If the `request` argument is set, then none of "
1786 "the individual field arguments should be set."
1787 )
1788
1789 # - Use the request object if provided (there's no risk of modifying the input as
1790 # there are no flattened fields), or create one.
1791 if not isinstance(request, pubsub.ListSnapshotsRequest):
1792 request = pubsub.ListSnapshotsRequest(request)
1793
1794 # If we have keyword arguments corresponding to fields on the
1795 # request, apply these.
1796 if project is not None:
1797 request.project = project
1798
1799 # Wrap the RPC method; this adds retry and timeout information,
1800 # and friendly error handling.
1801 rpc = self._client._transport._wrapped_methods[
1802 self._client._transport.list_snapshots
1803 ]
1804
1805 # Certain fields should be provided within the metadata header;
1806 # add these here.
1807 metadata = tuple(metadata) + (
1808 gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)),
1809 )
1810
1811 # Validate the universe domain.
1812 self._client._validate_universe_domain()
1813
1814 # Send the request.
1815 response = await rpc(
1816 request,
1817 retry=retry,
1818 timeout=timeout,
1819 metadata=metadata,
1820 )
1821
1822 # This method is paged; wrap the response in a pager, which provides
1823 # an `__aiter__` convenience method.
1824 response = pagers.ListSnapshotsAsyncPager(
1825 method=rpc,
1826 request=request,
1827 response=response,
1828 retry=retry,
1829 timeout=timeout,
1830 metadata=metadata,
1831 )
1832
1833 # Done; return the response.
1834 return response
1835
1836 async def create_snapshot(
1837 self,
1838 request: Optional[Union[pubsub.CreateSnapshotRequest, dict]] = None,
1839 *,
1840 name: Optional[str] = None,
1841 subscription: Optional[str] = None,
1842 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1843 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1844 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1845 ) -> pubsub.Snapshot:
1846 r"""Creates a snapshot from the requested subscription. Snapshots
1847 are used in
1848 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
1849 operations, which allow you to manage message acknowledgments in
1850 bulk. That is, you can set the acknowledgment state of messages
1851 in an existing subscription to the state captured by a snapshot.
1852 If the snapshot already exists, returns ``ALREADY_EXISTS``. If
1853 the requested subscription doesn't exist, returns ``NOT_FOUND``.
1854 If the backlog in the subscription is too old -- and the
1855 resulting snapshot would expire in less than 1 hour -- then
1856 ``FAILED_PRECONDITION`` is returned. See also the
1857 ``Snapshot.expire_time`` field. If the name is not provided in
1858 the request, the server will assign a random name for this
1859 snapshot on the same project as the subscription, conforming to
1860 the [resource name format]
1861 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
1862 The generated name is populated in the returned Snapshot object.
1863 Note that for REST API requests, you must specify a name in the
1864 request.
1865
1866 .. code-block:: python
1867
1868 # This snippet has been automatically generated and should be regarded as a
1869 # code template only.
1870 # It will require modifications to work:
1871 # - It may require correct/in-range values for request initialization.
1872 # - It may require specifying regional endpoints when creating the service
1873 # client as shown in:
1874 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1875 from google import pubsub_v1
1876
1877 async def sample_create_snapshot():
1878 # Create a client
1879 client = pubsub_v1.SubscriberAsyncClient()
1880
1881 # Initialize request argument(s)
1882 request = pubsub_v1.CreateSnapshotRequest(
1883 name="name_value",
1884 subscription="subscription_value",
1885 )
1886
1887 # Make the request
1888 response = await client.create_snapshot(request=request)
1889
1890 # Handle the response
1891 print(response)
1892
1893 Args:
1894 request (Optional[Union[google.pubsub_v1.types.CreateSnapshotRequest, dict]]):
1895 The request object. Request for the ``CreateSnapshot`` method.
1896 name (:class:`str`):
1897 Required. User-provided name for this snapshot. If the
1898 name is not provided in the request, the server will
1899 assign a random name for this snapshot on the same
1900 project as the subscription. Note that for REST API
1901 requests, you must specify a name. See the `resource
1902 name
1903 rules <https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names>`__.
1904 Format is ``projects/{project}/snapshots/{snap}``.
1905
1906 This corresponds to the ``name`` field
1907 on the ``request`` instance; if ``request`` is provided, this
1908 should not be set.
1909 subscription (:class:`str`):
1910 Required. The subscription whose backlog the snapshot
1911 retains. Specifically, the created snapshot is
1912 guaranteed to retain: (a) The existing backlog on the
1913 subscription. More precisely, this is defined as the
1914 messages in the subscription's backlog that are
1915 unacknowledged upon the successful completion of the
1916 ``CreateSnapshot`` request; as well as: (b) Any messages
1917 published to the subscription's topic following the
1918 successful completion of the CreateSnapshot request.
1919 Format is ``projects/{project}/subscriptions/{sub}``.
1920
1921 This corresponds to the ``subscription`` field
1922 on the ``request`` instance; if ``request`` is provided, this
1923 should not be set.
1924 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
1925 should be retried.
1926 timeout (float): The timeout for this request.
1927 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1928 sent along with the request as metadata. Normally, each value must be of type `str`,
1929 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1930 be of type `bytes`.
1931
1932 Returns:
1933 google.pubsub_v1.types.Snapshot:
1934 A snapshot resource. Snapshots are used in
1935 [Seek](https://cloud.google.com/pubsub/docs/replay-overview)
1936 operations, which allow you to manage message
1937 acknowledgments in bulk. That is, you can set the
1938 acknowledgment state of messages in an existing
1939 subscription to the state captured by a snapshot.
1940
1941 """
1942 # Create or coerce a protobuf request object.
1943 # - Quick check: If we got a request object, we should *not* have
1944 # gotten any keyword arguments that map to the request.
1945 flattened_params = [name, subscription]
1946 has_flattened_params = (
1947 len([param for param in flattened_params if param is not None]) > 0
1948 )
1949 if request is not None and has_flattened_params:
1950 raise ValueError(
1951 "If the `request` argument is set, then none of "
1952 "the individual field arguments should be set."
1953 )
1954
1955 # - Use the request object if provided (there's no risk of modifying the input as
1956 # there are no flattened fields), or create one.
1957 if not isinstance(request, pubsub.CreateSnapshotRequest):
1958 request = pubsub.CreateSnapshotRequest(request)
1959
1960 # If we have keyword arguments corresponding to fields on the
1961 # request, apply these.
1962 if name is not None:
1963 request.name = name
1964 if subscription is not None:
1965 request.subscription = subscription
1966
1967 # Wrap the RPC method; this adds retry and timeout information,
1968 # and friendly error handling.
1969 rpc = self._client._transport._wrapped_methods[
1970 self._client._transport.create_snapshot
1971 ]
1972
1973 # Certain fields should be provided within the metadata header;
1974 # add these here.
1975 metadata = tuple(metadata) + (
1976 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
1977 )
1978
1979 # Validate the universe domain.
1980 self._client._validate_universe_domain()
1981
1982 # Send the request.
1983 response = await rpc(
1984 request,
1985 retry=retry,
1986 timeout=timeout,
1987 metadata=metadata,
1988 )
1989
1990 # Done; return the response.
1991 return response
1992
1993 async def update_snapshot(
1994 self,
1995 request: Optional[Union[pubsub.UpdateSnapshotRequest, dict]] = None,
1996 *,
1997 snapshot: Optional[pubsub.Snapshot] = None,
1998 update_mask: Optional[field_mask_pb2.FieldMask] = None,
1999 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2000 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2001 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2002 ) -> pubsub.Snapshot:
2003 r"""Updates an existing snapshot by updating the fields specified in
2004 the update mask. Snapshots are used in
2005 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
2006 operations, which allow you to manage message acknowledgments in
2007 bulk. That is, you can set the acknowledgment state of messages
2008 in an existing subscription to the state captured by a snapshot.
2009
2010 .. code-block:: python
2011
2012 # This snippet has been automatically generated and should be regarded as a
2013 # code template only.
2014 # It will require modifications to work:
2015 # - It may require correct/in-range values for request initialization.
2016 # - It may require specifying regional endpoints when creating the service
2017 # client as shown in:
2018 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2019 from google import pubsub_v1
2020
2021 async def sample_update_snapshot():
2022 # Create a client
2023 client = pubsub_v1.SubscriberAsyncClient()
2024
2025 # Initialize request argument(s)
2026 request = pubsub_v1.UpdateSnapshotRequest(
2027 )
2028
2029 # Make the request
2030 response = await client.update_snapshot(request=request)
2031
2032 # Handle the response
2033 print(response)
2034
2035 Args:
2036 request (Optional[Union[google.pubsub_v1.types.UpdateSnapshotRequest, dict]]):
2037 The request object. Request for the UpdateSnapshot
2038 method.
2039 snapshot (:class:`google.pubsub_v1.types.Snapshot`):
2040 Required. The updated snapshot
2041 object.
2042
2043 This corresponds to the ``snapshot`` field
2044 on the ``request`` instance; if ``request`` is provided, this
2045 should not be set.
2046 update_mask (:class:`google.protobuf.field_mask_pb2.FieldMask`):
2047 Required. Indicates which fields in
2048 the provided snapshot to update. Must be
2049 specified and non-empty.
2050
2051 This corresponds to the ``update_mask`` field
2052 on the ``request`` instance; if ``request`` is provided, this
2053 should not be set.
2054 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
2055 should be retried.
2056 timeout (float): The timeout for this request.
2057 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2058 sent along with the request as metadata. Normally, each value must be of type `str`,
2059 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2060 be of type `bytes`.
2061
2062 Returns:
2063 google.pubsub_v1.types.Snapshot:
2064 A snapshot resource. Snapshots are used in
2065 [Seek](https://cloud.google.com/pubsub/docs/replay-overview)
2066 operations, which allow you to manage message
2067 acknowledgments in bulk. That is, you can set the
2068 acknowledgment state of messages in an existing
2069 subscription to the state captured by a snapshot.
2070
2071 """
2072 # Create or coerce a protobuf request object.
2073 # - Quick check: If we got a request object, we should *not* have
2074 # gotten any keyword arguments that map to the request.
2075 flattened_params = [snapshot, update_mask]
2076 has_flattened_params = (
2077 len([param for param in flattened_params if param is not None]) > 0
2078 )
2079 if request is not None and has_flattened_params:
2080 raise ValueError(
2081 "If the `request` argument is set, then none of "
2082 "the individual field arguments should be set."
2083 )
2084
2085 # - Use the request object if provided (there's no risk of modifying the input as
2086 # there are no flattened fields), or create one.
2087 if not isinstance(request, pubsub.UpdateSnapshotRequest):
2088 request = pubsub.UpdateSnapshotRequest(request)
2089
2090 # If we have keyword arguments corresponding to fields on the
2091 # request, apply these.
2092 if snapshot is not None:
2093 request.snapshot = snapshot
2094 if update_mask is not None:
2095 request.update_mask = update_mask
2096
2097 # Wrap the RPC method; this adds retry and timeout information,
2098 # and friendly error handling.
2099 rpc = self._client._transport._wrapped_methods[
2100 self._client._transport.update_snapshot
2101 ]
2102
2103 # Certain fields should be provided within the metadata header;
2104 # add these here.
2105 metadata = tuple(metadata) + (
2106 gapic_v1.routing_header.to_grpc_metadata(
2107 (("snapshot.name", request.snapshot.name),)
2108 ),
2109 )
2110
2111 # Validate the universe domain.
2112 self._client._validate_universe_domain()
2113
2114 # Send the request.
2115 response = await rpc(
2116 request,
2117 retry=retry,
2118 timeout=timeout,
2119 metadata=metadata,
2120 )
2121
2122 # Done; return the response.
2123 return response
2124
2125 async def delete_snapshot(
2126 self,
2127 request: Optional[Union[pubsub.DeleteSnapshotRequest, dict]] = None,
2128 *,
2129 snapshot: Optional[str] = None,
2130 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2131 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2132 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2133 ) -> None:
2134 r"""Removes an existing snapshot. Snapshots are used in [Seek]
2135 (https://cloud.google.com/pubsub/docs/replay-overview)
2136 operations, which allow you to manage message acknowledgments in
2137 bulk. That is, you can set the acknowledgment state of messages
2138 in an existing subscription to the state captured by a snapshot.
2139 When the snapshot is deleted, all messages retained in the
2140 snapshot are immediately dropped. After a snapshot is deleted, a
2141 new one may be created with the same name, but the new one has
2142 no association with the old snapshot or its subscription, unless
2143 the same subscription is specified.
2144
2145 .. code-block:: python
2146
2147 # This snippet has been automatically generated and should be regarded as a
2148 # code template only.
2149 # It will require modifications to work:
2150 # - It may require correct/in-range values for request initialization.
2151 # - It may require specifying regional endpoints when creating the service
2152 # client as shown in:
2153 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2154 from google import pubsub_v1
2155
2156 async def sample_delete_snapshot():
2157 # Create a client
2158 client = pubsub_v1.SubscriberAsyncClient()
2159
2160 # Initialize request argument(s)
2161 request = pubsub_v1.DeleteSnapshotRequest(
2162 snapshot="snapshot_value",
2163 )
2164
2165 # Make the request
2166 await client.delete_snapshot(request=request)
2167
2168 Args:
2169 request (Optional[Union[google.pubsub_v1.types.DeleteSnapshotRequest, dict]]):
2170 The request object. Request for the ``DeleteSnapshot`` method.
2171 snapshot (:class:`str`):
2172 Required. The name of the snapshot to delete. Format is
2173 ``projects/{project}/snapshots/{snap}``.
2174
2175 This corresponds to the ``snapshot`` field
2176 on the ``request`` instance; if ``request`` is provided, this
2177 should not be set.
2178 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
2179 should be retried.
2180 timeout (float): The timeout for this request.
2181 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2182 sent along with the request as metadata. Normally, each value must be of type `str`,
2183 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2184 be of type `bytes`.
2185 """
2186 # Create or coerce a protobuf request object.
2187 # - Quick check: If we got a request object, we should *not* have
2188 # gotten any keyword arguments that map to the request.
2189 flattened_params = [snapshot]
2190 has_flattened_params = (
2191 len([param for param in flattened_params if param is not None]) > 0
2192 )
2193 if request is not None and has_flattened_params:
2194 raise ValueError(
2195 "If the `request` argument is set, then none of "
2196 "the individual field arguments should be set."
2197 )
2198
2199 # - Use the request object if provided (there's no risk of modifying the input as
2200 # there are no flattened fields), or create one.
2201 if not isinstance(request, pubsub.DeleteSnapshotRequest):
2202 request = pubsub.DeleteSnapshotRequest(request)
2203
2204 # If we have keyword arguments corresponding to fields on the
2205 # request, apply these.
2206 if snapshot is not None:
2207 request.snapshot = snapshot
2208
2209 # Wrap the RPC method; this adds retry and timeout information,
2210 # and friendly error handling.
2211 rpc = self._client._transport._wrapped_methods[
2212 self._client._transport.delete_snapshot
2213 ]
2214
2215 # Certain fields should be provided within the metadata header;
2216 # add these here.
2217 metadata = tuple(metadata) + (
2218 gapic_v1.routing_header.to_grpc_metadata((("snapshot", request.snapshot),)),
2219 )
2220
2221 # Validate the universe domain.
2222 self._client._validate_universe_domain()
2223
2224 # Send the request.
2225 await rpc(
2226 request,
2227 retry=retry,
2228 timeout=timeout,
2229 metadata=metadata,
2230 )
2231
2232 async def seek(
2233 self,
2234 request: Optional[Union[pubsub.SeekRequest, dict]] = None,
2235 *,
2236 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2237 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2238 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2239 ) -> pubsub.SeekResponse:
2240 r"""Seeks an existing subscription to a point in time or to a given
2241 snapshot, whichever is provided in the request. Snapshots are
2242 used in [Seek]
2243 (https://cloud.google.com/pubsub/docs/replay-overview)
2244 operations, which allow you to manage message acknowledgments in
2245 bulk. That is, you can set the acknowledgment state of messages
2246 in an existing subscription to the state captured by a snapshot.
2247 Note that both the subscription and the snapshot must be on the
2248 same topic.
2249
2250 .. code-block:: python
2251
2252 # This snippet has been automatically generated and should be regarded as a
2253 # code template only.
2254 # It will require modifications to work:
2255 # - It may require correct/in-range values for request initialization.
2256 # - It may require specifying regional endpoints when creating the service
2257 # client as shown in:
2258 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2259 from google import pubsub_v1
2260
2261 async def sample_seek():
2262 # Create a client
2263 client = pubsub_v1.SubscriberAsyncClient()
2264
2265 # Initialize request argument(s)
2266 request = pubsub_v1.SeekRequest(
2267 subscription="subscription_value",
2268 )
2269
2270 # Make the request
2271 response = await client.seek(request=request)
2272
2273 # Handle the response
2274 print(response)
2275
2276 Args:
2277 request (Optional[Union[google.pubsub_v1.types.SeekRequest, dict]]):
2278 The request object. Request for the ``Seek`` method.
2279 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
2280 should be retried.
2281 timeout (float): The timeout for this request.
2282 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2283 sent along with the request as metadata. Normally, each value must be of type `str`,
2284 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2285 be of type `bytes`.
2286
2287 Returns:
2288 google.pubsub_v1.types.SeekResponse:
2289 Response for the Seek method (this response is empty).
2290 """
2291 # Create or coerce a protobuf request object.
2292 # - Use the request object if provided (there's no risk of modifying the input as
2293 # there are no flattened fields), or create one.
2294 if not isinstance(request, pubsub.SeekRequest):
2295 request = pubsub.SeekRequest(request)
2296
2297 # Wrap the RPC method; this adds retry and timeout information,
2298 # and friendly error handling.
2299 rpc = self._client._transport._wrapped_methods[self._client._transport.seek]
2300
2301 # Certain fields should be provided within the metadata header;
2302 # add these here.
2303 metadata = tuple(metadata) + (
2304 gapic_v1.routing_header.to_grpc_metadata(
2305 (("subscription", request.subscription),)
2306 ),
2307 )
2308
2309 # Validate the universe domain.
2310 self._client._validate_universe_domain()
2311
2312 # Send the request.
2313 response = await rpc(
2314 request,
2315 retry=retry,
2316 timeout=timeout,
2317 metadata=metadata,
2318 )
2319
2320 # Done; return the response.
2321 return response
2322
2323 async def set_iam_policy(
2324 self,
2325 request: Optional[iam_policy_pb2.SetIamPolicyRequest] = None,
2326 *,
2327 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2328 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2329 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2330 ) -> policy_pb2.Policy:
2331 r"""Sets the IAM access control policy on the specified function.
2332
2333 Replaces any existing policy.
2334
2335 Args:
2336 request (:class:`~.iam_policy_pb2.SetIamPolicyRequest`):
2337 The request object. Request message for `SetIamPolicy`
2338 method.
2339 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
2340 should be retried.
2341 timeout (float): The timeout for this request.
2342 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2343 sent along with the request as metadata. Normally, each value must be of type `str`,
2344 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2345 be of type `bytes`.
2346 Returns:
2347 ~.policy_pb2.Policy:
2348 Defines an Identity and Access Management (IAM) policy.
2349 It is used to specify access control policies for Cloud
2350 Platform resources.
2351 A ``Policy`` is a collection of ``bindings``. A
2352 ``binding`` binds one or more ``members`` to a single
2353 ``role``. Members can be user accounts, service
2354 accounts, Google groups, and domains (such as G Suite).
2355 A ``role`` is a named list of permissions (defined by
2356 IAM or configured by users). A ``binding`` can
2357 optionally specify a ``condition``, which is a logic
2358 expression that further constrains the role binding
2359 based on attributes about the request and/or target
2360 resource.
2361
2362 **JSON Example**
2363
2364 ::
2365
2366 {
2367 "bindings": [
2368 {
2369 "role": "roles/resourcemanager.organizationAdmin",
2370 "members": [
2371 "user:mike@example.com",
2372 "group:admins@example.com",
2373 "domain:google.com",
2374 "serviceAccount:my-project-id@appspot.gserviceaccount.com"
2375 ]
2376 },
2377 {
2378 "role": "roles/resourcemanager.organizationViewer",
2379 "members": ["user:eve@example.com"],
2380 "condition": {
2381 "title": "expirable access",
2382 "description": "Does not grant access after Sep 2020",
2383 "expression": "request.time <
2384 timestamp('2020-10-01T00:00:00.000Z')",
2385 }
2386 }
2387 ]
2388 }
2389
2390 **YAML Example**
2391
2392 ::
2393
2394 bindings:
2395 - members:
2396 - user:mike@example.com
2397 - group:admins@example.com
2398 - domain:google.com
2399 - serviceAccount:my-project-id@appspot.gserviceaccount.com
2400 role: roles/resourcemanager.organizationAdmin
2401 - members:
2402 - user:eve@example.com
2403 role: roles/resourcemanager.organizationViewer
2404 condition:
2405 title: expirable access
2406 description: Does not grant access after Sep 2020
2407 expression: request.time < timestamp('2020-10-01T00:00:00.000Z')
2408
2409 For a description of IAM and its features, see the `IAM
2410 developer's
2411 guide <https://cloud.google.com/iam/docs>`__.
2412 """
2413 # Create or coerce a protobuf request object.
2414
2415 # The request isn't a proto-plus wrapped type,
2416 # so it must be constructed via keyword expansion.
2417 if isinstance(request, dict):
2418 request = iam_policy_pb2.SetIamPolicyRequest(**request)
2419
2420 # Wrap the RPC method; this adds retry and timeout information,
2421 # and friendly error handling.
2422 rpc = self.transport._wrapped_methods[self._client._transport.set_iam_policy]
2423
2424 # Certain fields should be provided within the metadata header;
2425 # add these here.
2426 metadata = tuple(metadata) + (
2427 gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
2428 )
2429
2430 # Validate the universe domain.
2431 self._client._validate_universe_domain()
2432
2433 # Send the request.
2434 response = await rpc(
2435 request,
2436 retry=retry,
2437 timeout=timeout,
2438 metadata=metadata,
2439 )
2440
2441 # Done; return the response.
2442 return response
2443
2444 async def get_iam_policy(
2445 self,
2446 request: Optional[iam_policy_pb2.GetIamPolicyRequest] = None,
2447 *,
2448 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2449 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2450 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2451 ) -> policy_pb2.Policy:
2452 r"""Gets the IAM access control policy for a function.
2453
2454 Returns an empty policy if the function exists and does not have a
2455 policy set.
2456
2457 Args:
2458 request (:class:`~.iam_policy_pb2.GetIamPolicyRequest`):
2459 The request object. Request message for `GetIamPolicy`
2460 method.
2461 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if
2462 any, should be retried.
2463 timeout (float): The timeout for this request.
2464 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2465 sent along with the request as metadata. Normally, each value must be of type `str`,
2466 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2467 be of type `bytes`.
2468 Returns:
2469 ~.policy_pb2.Policy:
2470 Defines an Identity and Access Management (IAM) policy.
2471 It is used to specify access control policies for Cloud
2472 Platform resources.
2473 A ``Policy`` is a collection of ``bindings``. A
2474 ``binding`` binds one or more ``members`` to a single
2475 ``role``. Members can be user accounts, service
2476 accounts, Google groups, and domains (such as G Suite).
2477 A ``role`` is a named list of permissions (defined by
2478 IAM or configured by users). A ``binding`` can
2479 optionally specify a ``condition``, which is a logic
2480 expression that further constrains the role binding
2481 based on attributes about the request and/or target
2482 resource.
2483
2484 **JSON Example**
2485
2486 ::
2487
2488 {
2489 "bindings": [
2490 {
2491 "role": "roles/resourcemanager.organizationAdmin",
2492 "members": [
2493 "user:mike@example.com",
2494 "group:admins@example.com",
2495 "domain:google.com",
2496 "serviceAccount:my-project-id@appspot.gserviceaccount.com"
2497 ]
2498 },
2499 {
2500 "role": "roles/resourcemanager.organizationViewer",
2501 "members": ["user:eve@example.com"],
2502 "condition": {
2503 "title": "expirable access",
2504 "description": "Does not grant access after Sep 2020",
2505 "expression": "request.time <
2506 timestamp('2020-10-01T00:00:00.000Z')",
2507 }
2508 }
2509 ]
2510 }
2511
2512 **YAML Example**
2513
2514 ::
2515
2516 bindings:
2517 - members:
2518 - user:mike@example.com
2519 - group:admins@example.com
2520 - domain:google.com
2521 - serviceAccount:my-project-id@appspot.gserviceaccount.com
2522 role: roles/resourcemanager.organizationAdmin
2523 - members:
2524 - user:eve@example.com
2525 role: roles/resourcemanager.organizationViewer
2526 condition:
2527 title: expirable access
2528 description: Does not grant access after Sep 2020
2529 expression: request.time < timestamp('2020-10-01T00:00:00.000Z')
2530
2531 For a description of IAM and its features, see the `IAM
2532 developer's
2533 guide <https://cloud.google.com/iam/docs>`__.
2534 """
2535 # Create or coerce a protobuf request object.
2536
2537 # The request isn't a proto-plus wrapped type,
2538 # so it must be constructed via keyword expansion.
2539 if isinstance(request, dict):
2540 request = iam_policy_pb2.GetIamPolicyRequest(**request)
2541
2542 # Wrap the RPC method; this adds retry and timeout information,
2543 # and friendly error handling.
2544 rpc = self.transport._wrapped_methods[self._client._transport.get_iam_policy]
2545
2546 # Certain fields should be provided within the metadata header;
2547 # add these here.
2548 metadata = tuple(metadata) + (
2549 gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
2550 )
2551
2552 # Validate the universe domain.
2553 self._client._validate_universe_domain()
2554
2555 # Send the request.
2556 response = await rpc(
2557 request,
2558 retry=retry,
2559 timeout=timeout,
2560 metadata=metadata,
2561 )
2562
2563 # Done; return the response.
2564 return response
2565
2566 async def test_iam_permissions(
2567 self,
2568 request: Optional[iam_policy_pb2.TestIamPermissionsRequest] = None,
2569 *,
2570 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2571 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2572 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2573 ) -> iam_policy_pb2.TestIamPermissionsResponse:
2574 r"""Tests the specified IAM permissions against the IAM access control
2575 policy for a function.
2576
2577 If the function does not exist, this will return an empty set
2578 of permissions, not a NOT_FOUND error.
2579
2580 Args:
2581 request (:class:`~.iam_policy_pb2.TestIamPermissionsRequest`):
2582 The request object. Request message for
2583 `TestIamPermissions` method.
2584 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors,
2585 if any, should be retried.
2586 timeout (float): The timeout for this request.
2587 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2588 sent along with the request as metadata. Normally, each value must be of type `str`,
2589 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2590 be of type `bytes`.
2591 Returns:
2592 ~.iam_policy_pb2.TestIamPermissionsResponse:
2593 Response message for ``TestIamPermissions`` method.
2594 """
2595 # Create or coerce a protobuf request object.
2596
2597 # The request isn't a proto-plus wrapped type,
2598 # so it must be constructed via keyword expansion.
2599 if isinstance(request, dict):
2600 request = iam_policy_pb2.TestIamPermissionsRequest(**request)
2601
2602 # Wrap the RPC method; this adds retry and timeout information,
2603 # and friendly error handling.
2604 rpc = self.transport._wrapped_methods[
2605 self._client._transport.test_iam_permissions
2606 ]
2607
2608 # Certain fields should be provided within the metadata header;
2609 # add these here.
2610 metadata = tuple(metadata) + (
2611 gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
2612 )
2613
2614 # Validate the universe domain.
2615 self._client._validate_universe_domain()
2616
2617 # Send the request.
2618 response = await rpc(
2619 request,
2620 retry=retry,
2621 timeout=timeout,
2622 metadata=metadata,
2623 )
2624
2625 # Done; return the response.
2626 return response
2627
2628 async def __aenter__(self) -> "SubscriberAsyncClient":
2629 return self
2630
2631 async def __aexit__(self, exc_type, exc, tb):
2632 await self.transport.close()
2633
2634
2635DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
2636 client_library_version=package_version.__version__
2637)
2638
2639if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
2640 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
2641
2642
2643__all__ = ("SubscriberAsyncClient",)