1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17from http import HTTPStatus
18import json
19import logging as std_logging
20import functools
21import os
22import re
23from typing import (
24 Dict,
25 Callable,
26 Mapping,
27 MutableMapping,
28 MutableSequence,
29 Optional,
30 Iterable,
31 Iterator,
32 Sequence,
33 Tuple,
34 Type,
35 Union,
36 cast,
37)
38import warnings
39
40import warnings
41from google.pubsub_v1 import gapic_version as package_version
42
43from google.api_core import client_options as client_options_lib
44from google.api_core import exceptions as core_exceptions
45from google.api_core import gapic_v1
46from google.api_core import retry as retries
47from google.auth import credentials as ga_credentials # type: ignore
48from google.auth.transport import mtls # type: ignore
49from google.auth.transport.grpc import SslCredentials # type: ignore
50from google.auth.exceptions import MutualTLSChannelError # type: ignore
51from google.oauth2 import service_account # type: ignore
52import google.protobuf
53
54try:
55 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
56except AttributeError: # pragma: NO COVER
57 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
58
59try:
60 from google.api_core import client_logging # type: ignore
61
62 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
63except ImportError: # pragma: NO COVER
64 CLIENT_LOGGING_SUPPORTED = False
65
66_LOGGER = std_logging.getLogger(__name__)
67
68from google.iam.v1 import iam_policy_pb2 # type: ignore
69from google.iam.v1 import policy_pb2 # type: ignore
70from google.protobuf import duration_pb2 # type: ignore
71from google.protobuf import field_mask_pb2 # type: ignore
72from google.protobuf import timestamp_pb2 # type: ignore
73from google.pubsub_v1.services.subscriber import pagers
74from google.pubsub_v1.types import pubsub
75
76import grpc
77from .transports.base import SubscriberTransport, DEFAULT_CLIENT_INFO
78from .transports.grpc import SubscriberGrpcTransport
79from .transports.grpc_asyncio import SubscriberGrpcAsyncIOTransport
80from .transports.rest import SubscriberRestTransport
81
82
83class SubscriberClientMeta(type):
84 """Metaclass for the Subscriber client.
85
86 This provides class-level methods for building and retrieving
87 support objects (e.g. transport) without polluting the client instance
88 objects.
89 """
90
91 _transport_registry = OrderedDict() # type: Dict[str, Type[SubscriberTransport]]
92 _transport_registry["grpc"] = SubscriberGrpcTransport
93 _transport_registry["grpc_asyncio"] = SubscriberGrpcAsyncIOTransport
94 _transport_registry["rest"] = SubscriberRestTransport
95
96 def get_transport_class(
97 cls,
98 label: Optional[str] = None,
99 ) -> Type[SubscriberTransport]:
100 """Returns an appropriate transport class.
101
102 Args:
103 label: The name of the desired transport. If none is
104 provided, then the first transport in the registry is used.
105
106 Returns:
107 The transport class to use.
108 """
109 # If a specific transport is requested, return that one.
110 if label:
111 return cls._transport_registry[label]
112
113 # No transport is requested; return the default (that is, the first one
114 # in the dictionary).
115 return next(iter(cls._transport_registry.values()))
116
117
118class SubscriberClient(metaclass=SubscriberClientMeta):
119 """The service that an application uses to manipulate subscriptions and
120 to consume messages from a subscription via the ``Pull`` method or
121 by establishing a bi-directional stream using the ``StreamingPull``
122 method.
123 """
124
125 @staticmethod
126 def _get_default_mtls_endpoint(api_endpoint):
127 """Converts api endpoint to mTLS endpoint.
128
129 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
130 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
131 Args:
132 api_endpoint (Optional[str]): the api endpoint to convert.
133 Returns:
134 str: converted mTLS api endpoint.
135 """
136 if not api_endpoint:
137 return api_endpoint
138
139 mtls_endpoint_re = re.compile(
140 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
141 )
142
143 m = mtls_endpoint_re.match(api_endpoint)
144 name, mtls, sandbox, googledomain = m.groups()
145 if mtls or not googledomain:
146 return api_endpoint
147
148 if sandbox:
149 return api_endpoint.replace(
150 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
151 )
152
153 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")
154
155 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
156
157 # The scopes needed to make gRPC calls to all of the methods defined in
158 # this service
159 _DEFAULT_SCOPES = (
160 "https://www.googleapis.com/auth/cloud-platform",
161 "https://www.googleapis.com/auth/pubsub",
162 )
163
164 SERVICE_ADDRESS = "pubsub.googleapis.com:443"
165 """The default address of the service."""
166
167 DEFAULT_ENDPOINT = "pubsub.googleapis.com"
168 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
169 DEFAULT_ENDPOINT
170 )
171
172 _DEFAULT_ENDPOINT_TEMPLATE = "pubsub.{UNIVERSE_DOMAIN}"
173 _DEFAULT_UNIVERSE = "googleapis.com"
174
175 @classmethod
176 def from_service_account_info(cls, info: dict, *args, **kwargs):
177 """Creates an instance of this client using the provided credentials
178 info.
179
180 Args:
181 info (dict): The service account private key info.
182 args: Additional arguments to pass to the constructor.
183 kwargs: Additional arguments to pass to the constructor.
184
185 Returns:
186 SubscriberClient: The constructed client.
187 """
188 credentials = service_account.Credentials.from_service_account_info(info)
189 kwargs["credentials"] = credentials
190 return cls(*args, **kwargs)
191
192 @classmethod
193 def from_service_account_file(cls, filename: str, *args, **kwargs):
194 """Creates an instance of this client using the provided credentials
195 file.
196
197 Args:
198 filename (str): The path to the service account private key json
199 file.
200 args: Additional arguments to pass to the constructor.
201 kwargs: Additional arguments to pass to the constructor.
202
203 Returns:
204 SubscriberClient: The constructed client.
205 """
206 credentials = service_account.Credentials.from_service_account_file(filename)
207 kwargs["credentials"] = credentials
208 return cls(*args, **kwargs)
209
210 from_service_account_json = from_service_account_file
211
212 @property
213 def transport(self) -> SubscriberTransport:
214 """Returns the transport used by the client instance.
215
216 Returns:
217 SubscriberTransport: The transport used by the client
218 instance.
219 """
220 return self._transport
221
222 @staticmethod
223 def listing_path(
224 project: str,
225 location: str,
226 data_exchange: str,
227 listing: str,
228 ) -> str:
229 """Returns a fully-qualified listing string."""
230 return "projects/{project}/locations/{location}/dataExchanges/{data_exchange}/listings/{listing}".format(
231 project=project,
232 location=location,
233 data_exchange=data_exchange,
234 listing=listing,
235 )
236
237 @staticmethod
238 def parse_listing_path(path: str) -> Dict[str, str]:
239 """Parses a listing path into its component segments."""
240 m = re.match(
241 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/dataExchanges/(?P<data_exchange>.+?)/listings/(?P<listing>.+?)$",
242 path,
243 )
244 return m.groupdict() if m else {}
245
246 @staticmethod
247 def snapshot_path(
248 project: str,
249 snapshot: str,
250 ) -> str:
251 """Returns a fully-qualified snapshot string."""
252 return "projects/{project}/snapshots/{snapshot}".format(
253 project=project,
254 snapshot=snapshot,
255 )
256
257 @staticmethod
258 def parse_snapshot_path(path: str) -> Dict[str, str]:
259 """Parses a snapshot path into its component segments."""
260 m = re.match(r"^projects/(?P<project>.+?)/snapshots/(?P<snapshot>.+?)$", path)
261 return m.groupdict() if m else {}
262
263 @staticmethod
264 def subscription_path(
265 project: str,
266 subscription: str,
267 ) -> str:
268 """Returns a fully-qualified subscription string."""
269 return "projects/{project}/subscriptions/{subscription}".format(
270 project=project,
271 subscription=subscription,
272 )
273
274 @staticmethod
275 def parse_subscription_path(path: str) -> Dict[str, str]:
276 """Parses a subscription path into its component segments."""
277 m = re.match(
278 r"^projects/(?P<project>.+?)/subscriptions/(?P<subscription>.+?)$", path
279 )
280 return m.groupdict() if m else {}
281
282 @staticmethod
283 def topic_path(
284 project: str,
285 topic: str,
286 ) -> str:
287 """Returns a fully-qualified topic string."""
288 return "projects/{project}/topics/{topic}".format(
289 project=project,
290 topic=topic,
291 )
292
293 @staticmethod
294 def parse_topic_path(path: str) -> Dict[str, str]:
295 """Parses a topic path into its component segments."""
296 m = re.match(r"^projects/(?P<project>.+?)/topics/(?P<topic>.+?)$", path)
297 return m.groupdict() if m else {}
298
299 @staticmethod
300 def common_billing_account_path(
301 billing_account: str,
302 ) -> str:
303 """Returns a fully-qualified billing_account string."""
304 return "billingAccounts/{billing_account}".format(
305 billing_account=billing_account,
306 )
307
308 @staticmethod
309 def parse_common_billing_account_path(path: str) -> Dict[str, str]:
310 """Parse a billing_account path into its component segments."""
311 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
312 return m.groupdict() if m else {}
313
314 @staticmethod
315 def common_folder_path(
316 folder: str,
317 ) -> str:
318 """Returns a fully-qualified folder string."""
319 return "folders/{folder}".format(
320 folder=folder,
321 )
322
323 @staticmethod
324 def parse_common_folder_path(path: str) -> Dict[str, str]:
325 """Parse a folder path into its component segments."""
326 m = re.match(r"^folders/(?P<folder>.+?)$", path)
327 return m.groupdict() if m else {}
328
329 @staticmethod
330 def common_organization_path(
331 organization: str,
332 ) -> str:
333 """Returns a fully-qualified organization string."""
334 return "organizations/{organization}".format(
335 organization=organization,
336 )
337
338 @staticmethod
339 def parse_common_organization_path(path: str) -> Dict[str, str]:
340 """Parse a organization path into its component segments."""
341 m = re.match(r"^organizations/(?P<organization>.+?)$", path)
342 return m.groupdict() if m else {}
343
344 @staticmethod
345 def common_project_path(
346 project: str,
347 ) -> str:
348 """Returns a fully-qualified project string."""
349 return "projects/{project}".format(
350 project=project,
351 )
352
353 @staticmethod
354 def parse_common_project_path(path: str) -> Dict[str, str]:
355 """Parse a project path into its component segments."""
356 m = re.match(r"^projects/(?P<project>.+?)$", path)
357 return m.groupdict() if m else {}
358
359 @staticmethod
360 def common_location_path(
361 project: str,
362 location: str,
363 ) -> str:
364 """Returns a fully-qualified location string."""
365 return "projects/{project}/locations/{location}".format(
366 project=project,
367 location=location,
368 )
369
370 @staticmethod
371 def parse_common_location_path(path: str) -> Dict[str, str]:
372 """Parse a location path into its component segments."""
373 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
374 return m.groupdict() if m else {}
375
376 @classmethod
377 def get_mtls_endpoint_and_cert_source(
378 cls, client_options: Optional[client_options_lib.ClientOptions] = None
379 ):
380 """Deprecated. Return the API endpoint and client cert source for mutual TLS.
381
382 The client cert source is determined in the following order:
383 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
384 client cert source is None.
385 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
386 default client cert source exists, use the default one; otherwise the client cert
387 source is None.
388
389 The API endpoint is determined in the following order:
390 (1) if `client_options.api_endpoint` if provided, use the provided one.
391 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
392 default mTLS endpoint; if the environment variable is "never", use the default API
393 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
394 use the default API endpoint.
395
396 More details can be found at https://google.aip.dev/auth/4114.
397
398 Args:
399 client_options (google.api_core.client_options.ClientOptions): Custom options for the
400 client. Only the `api_endpoint` and `client_cert_source` properties may be used
401 in this method.
402
403 Returns:
404 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
405 client cert source to use.
406
407 Raises:
408 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
409 """
410
411 warnings.warn(
412 "get_mtls_endpoint_and_cert_source is deprecated. Use the api_endpoint property instead.",
413 DeprecationWarning,
414 )
415 if client_options is None:
416 client_options = client_options_lib.ClientOptions()
417 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false")
418 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
419 if use_client_cert not in ("true", "false"):
420 raise ValueError(
421 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
422 )
423 if use_mtls_endpoint not in ("auto", "never", "always"):
424 raise MutualTLSChannelError(
425 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
426 )
427
428 # Figure out the client cert source to use.
429 client_cert_source = None
430 if use_client_cert == "true":
431 if client_options.client_cert_source:
432 client_cert_source = client_options.client_cert_source
433 elif mtls.has_default_client_cert_source():
434 client_cert_source = mtls.default_client_cert_source()
435
436 # Figure out which api endpoint to use.
437 if client_options.api_endpoint is not None:
438 api_endpoint = client_options.api_endpoint
439 elif use_mtls_endpoint == "always" or (
440 use_mtls_endpoint == "auto" and client_cert_source
441 ):
442 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT
443 else:
444 api_endpoint = cls.DEFAULT_ENDPOINT
445
446 return api_endpoint, client_cert_source
447
448 @staticmethod
449 def _read_environment_variables():
450 """Returns the environment variables used by the client.
451
452 Returns:
453 Tuple[bool, str, str]: returns the GOOGLE_API_USE_CLIENT_CERTIFICATE,
454 GOOGLE_API_USE_MTLS_ENDPOINT, and GOOGLE_CLOUD_UNIVERSE_DOMAIN environment variables.
455
456 Raises:
457 ValueError: If GOOGLE_API_USE_CLIENT_CERTIFICATE is not
458 any of ["true", "false"].
459 google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT
460 is not any of ["auto", "never", "always"].
461 """
462 use_client_cert = os.getenv(
463 "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false"
464 ).lower()
465 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower()
466 universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN")
467 if use_client_cert not in ("true", "false"):
468 raise ValueError(
469 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
470 )
471 if use_mtls_endpoint not in ("auto", "never", "always"):
472 raise MutualTLSChannelError(
473 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
474 )
475 return use_client_cert == "true", use_mtls_endpoint, universe_domain_env
476
477 @staticmethod
478 def _get_client_cert_source(provided_cert_source, use_cert_flag):
479 """Return the client cert source to be used by the client.
480
481 Args:
482 provided_cert_source (bytes): The client certificate source provided.
483 use_cert_flag (bool): A flag indicating whether to use the client certificate.
484
485 Returns:
486 bytes or None: The client cert source to be used by the client.
487 """
488 client_cert_source = None
489 if use_cert_flag:
490 if provided_cert_source:
491 client_cert_source = provided_cert_source
492 elif mtls.has_default_client_cert_source():
493 client_cert_source = mtls.default_client_cert_source()
494 return client_cert_source
495
496 @staticmethod
497 def _get_api_endpoint(
498 api_override, client_cert_source, universe_domain, use_mtls_endpoint
499 ):
500 """Return the API endpoint used by the client.
501
502 Args:
503 api_override (str): The API endpoint override. If specified, this is always
504 the return value of this function and the other arguments are not used.
505 client_cert_source (bytes): The client certificate source used by the client.
506 universe_domain (str): The universe domain used by the client.
507 use_mtls_endpoint (str): How to use the mTLS endpoint, which depends also on the other parameters.
508 Possible values are "always", "auto", or "never".
509
510 Returns:
511 str: The API endpoint to be used by the client.
512 """
513 if api_override is not None:
514 api_endpoint = api_override
515 elif use_mtls_endpoint == "always" or (
516 use_mtls_endpoint == "auto" and client_cert_source
517 ):
518 _default_universe = SubscriberClient._DEFAULT_UNIVERSE
519 if universe_domain != _default_universe:
520 raise MutualTLSChannelError(
521 f"mTLS is not supported in any universe other than {_default_universe}."
522 )
523 api_endpoint = SubscriberClient.DEFAULT_MTLS_ENDPOINT
524 else:
525 api_endpoint = SubscriberClient._DEFAULT_ENDPOINT_TEMPLATE.format(
526 UNIVERSE_DOMAIN=universe_domain
527 )
528 return api_endpoint
529
530 @staticmethod
531 def _get_universe_domain(
532 client_universe_domain: Optional[str], universe_domain_env: Optional[str]
533 ) -> str:
534 """Return the universe domain used by the client.
535
536 Args:
537 client_universe_domain (Optional[str]): The universe domain configured via the client options.
538 universe_domain_env (Optional[str]): The universe domain configured via the "GOOGLE_CLOUD_UNIVERSE_DOMAIN" environment variable.
539
540 Returns:
541 str: The universe domain to be used by the client.
542
543 Raises:
544 ValueError: If the universe domain is an empty string.
545 """
546 universe_domain = SubscriberClient._DEFAULT_UNIVERSE
547 if client_universe_domain is not None:
548 universe_domain = client_universe_domain
549 elif universe_domain_env is not None:
550 universe_domain = universe_domain_env
551 if len(universe_domain.strip()) == 0:
552 raise ValueError("Universe Domain cannot be an empty string.")
553 return universe_domain
554
555 def _validate_universe_domain(self):
556 """Validates client's and credentials' universe domains are consistent.
557
558 Returns:
559 bool: True iff the configured universe domain is valid.
560
561 Raises:
562 ValueError: If the configured universe domain is not valid.
563 """
564
565 # NOTE (b/349488459): universe validation is disabled until further notice.
566 return True
567
568 def _add_cred_info_for_auth_errors(
569 self, error: core_exceptions.GoogleAPICallError
570 ) -> None:
571 """Adds credential info string to error details for 401/403/404 errors.
572
573 Args:
574 error (google.api_core.exceptions.GoogleAPICallError): The error to add the cred info.
575 """
576 if error.code not in [
577 HTTPStatus.UNAUTHORIZED,
578 HTTPStatus.FORBIDDEN,
579 HTTPStatus.NOT_FOUND,
580 ]:
581 return
582
583 cred = self._transport._credentials
584
585 # get_cred_info is only available in google-auth>=2.35.0
586 if not hasattr(cred, "get_cred_info"):
587 return
588
589 # ignore the type check since pypy test fails when get_cred_info
590 # is not available
591 cred_info = cred.get_cred_info() # type: ignore
592 if cred_info and hasattr(error._details, "append"):
593 error._details.append(json.dumps(cred_info))
594
595 @property
596 def api_endpoint(self):
597 """Return the API endpoint used by the client instance.
598
599 Returns:
600 str: The API endpoint used by the client instance.
601 """
602 return self._api_endpoint
603
604 @property
605 def universe_domain(self) -> str:
606 """Return the universe domain used by the client instance.
607
608 Returns:
609 str: The universe domain used by the client instance.
610 """
611 return self._universe_domain
612
613 def __init__(
614 self,
615 *,
616 credentials: Optional[ga_credentials.Credentials] = None,
617 transport: Optional[
618 Union[str, SubscriberTransport, Callable[..., SubscriberTransport]]
619 ] = None,
620 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
621 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
622 ) -> None:
623 """Instantiates the subscriber client.
624
625 Args:
626 credentials (Optional[google.auth.credentials.Credentials]): The
627 authorization credentials to attach to requests. These
628 credentials identify the application to the service; if none
629 are specified, the client will attempt to ascertain the
630 credentials from the environment.
631 transport (Optional[Union[str,SubscriberTransport,Callable[..., SubscriberTransport]]]):
632 The transport to use, or a Callable that constructs and returns a new transport.
633 If a Callable is given, it will be called with the same set of initialization
634 arguments as used in the SubscriberTransport constructor.
635 If set to None, a transport is chosen automatically.
636 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
637 Custom options for the client.
638
639 1. The ``api_endpoint`` property can be used to override the
640 default endpoint provided by the client when ``transport`` is
641 not explicitly provided. Only if this property is not set and
642 ``transport`` was not explicitly provided, the endpoint is
643 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
644 variable, which have one of the following values:
645 "always" (always use the default mTLS endpoint), "never" (always
646 use the default regular endpoint) and "auto" (auto-switch to the
647 default mTLS endpoint if client certificate is present; this is
648 the default value).
649
650 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
651 is "true", then the ``client_cert_source`` property can be used
652 to provide a client certificate for mTLS transport. If
653 not provided, the default SSL client certificate will be used if
654 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
655 set, no client certificate will be used.
656
657 3. The ``universe_domain`` property can be used to override the
658 default "googleapis.com" universe. Note that the ``api_endpoint``
659 property still takes precedence; and ``universe_domain`` is
660 currently not supported for mTLS.
661
662 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
663 The client info used to send a user-agent string along with
664 API requests. If ``None``, then default info will be used.
665 Generally, you only need to set this if you're developing
666 your own client library.
667
668 Raises:
669 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
670 creation failed for any reason.
671 """
672 self._client_options = client_options
673 if isinstance(self._client_options, dict):
674 self._client_options = client_options_lib.from_dict(self._client_options)
675 if self._client_options is None:
676 self._client_options = client_options_lib.ClientOptions()
677 self._client_options = cast(
678 client_options_lib.ClientOptions, self._client_options
679 )
680
681 universe_domain_opt = getattr(self._client_options, "universe_domain", None)
682
683 (
684 self._use_client_cert,
685 self._use_mtls_endpoint,
686 self._universe_domain_env,
687 ) = SubscriberClient._read_environment_variables()
688 self._client_cert_source = SubscriberClient._get_client_cert_source(
689 self._client_options.client_cert_source, self._use_client_cert
690 )
691 self._universe_domain = SubscriberClient._get_universe_domain(
692 universe_domain_opt, self._universe_domain_env
693 )
694 self._api_endpoint = None # updated below, depending on `transport`
695
696 # Initialize the universe domain validation.
697 self._is_universe_domain_valid = False
698
699 if CLIENT_LOGGING_SUPPORTED: # pragma: NO COVER
700 # Setup logging.
701 client_logging.initialize_logging()
702
703 api_key_value = getattr(self._client_options, "api_key", None)
704 if api_key_value and credentials:
705 raise ValueError(
706 "client_options.api_key and credentials are mutually exclusive"
707 )
708
709 # Save or instantiate the transport.
710 # Ordinarily, we provide the transport, but allowing a custom transport
711 # instance provides an extensibility point for unusual situations.
712 transport_provided = isinstance(transport, SubscriberTransport)
713 if transport_provided:
714 # transport is a SubscriberTransport instance.
715 if credentials or self._client_options.credentials_file or api_key_value:
716 raise ValueError(
717 "When providing a transport instance, "
718 "provide its credentials directly."
719 )
720 if self._client_options.scopes:
721 raise ValueError(
722 "When providing a transport instance, provide its scopes "
723 "directly."
724 )
725 self._transport = cast(SubscriberTransport, transport)
726 self._api_endpoint = self._transport.host
727
728 self._api_endpoint = self._api_endpoint or SubscriberClient._get_api_endpoint(
729 self._client_options.api_endpoint,
730 self._client_cert_source,
731 self._universe_domain,
732 self._use_mtls_endpoint,
733 )
734
735 if not transport_provided:
736 import google.auth._default # type: ignore
737
738 if api_key_value and hasattr(
739 google.auth._default, "get_api_key_credentials"
740 ):
741 credentials = google.auth._default.get_api_key_credentials(
742 api_key_value
743 )
744
745 transport_init: Union[
746 Type[SubscriberTransport], Callable[..., SubscriberTransport]
747 ] = (
748 SubscriberClient.get_transport_class(transport)
749 if isinstance(transport, str) or transport is None
750 else cast(Callable[..., SubscriberTransport], transport)
751 )
752 # initialize with the provided callable or the passed in class
753
754 emulator_host = os.environ.get("PUBSUB_EMULATOR_HOST")
755 if emulator_host:
756 if issubclass(transport_init, type(self)._transport_registry["grpc"]): # type: ignore
757 channel = grpc.insecure_channel(target=emulator_host)
758 else:
759 channel = grpc.aio.insecure_channel(target=emulator_host)
760 transport_init = functools.partial(transport_init, channel=channel)
761
762 self._transport = transport_init(
763 credentials=credentials,
764 credentials_file=self._client_options.credentials_file,
765 host=self._api_endpoint,
766 scopes=self._client_options.scopes,
767 client_cert_source_for_mtls=self._client_cert_source,
768 quota_project_id=self._client_options.quota_project_id,
769 client_info=client_info,
770 always_use_jwt_access=True,
771 api_audience=self._client_options.api_audience,
772 )
773
774 if "async" not in str(self._transport):
775 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
776 std_logging.DEBUG
777 ): # pragma: NO COVER
778 _LOGGER.debug(
779 "Created client `google.pubsub_v1.SubscriberClient`.",
780 extra={
781 "serviceName": "google.pubsub.v1.Subscriber",
782 "universeDomain": getattr(
783 self._transport._credentials, "universe_domain", ""
784 ),
785 "credentialsType": f"{type(self._transport._credentials).__module__}.{type(self._transport._credentials).__qualname__}",
786 "credentialsInfo": getattr(
787 self.transport._credentials, "get_cred_info", lambda: None
788 )(),
789 }
790 if hasattr(self._transport, "_credentials")
791 else {
792 "serviceName": "google.pubsub.v1.Subscriber",
793 "credentialsType": None,
794 },
795 )
796
797 def create_subscription(
798 self,
799 request: Optional[Union[pubsub.Subscription, dict]] = None,
800 *,
801 name: Optional[str] = None,
802 topic: Optional[str] = None,
803 push_config: Optional[pubsub.PushConfig] = None,
804 ack_deadline_seconds: Optional[int] = None,
805 retry: OptionalRetry = gapic_v1.method.DEFAULT,
806 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
807 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
808 ) -> pubsub.Subscription:
809 r"""Creates a subscription to a given topic. See the [resource name
810 rules]
811 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
812 If the subscription already exists, returns ``ALREADY_EXISTS``.
813 If the corresponding topic doesn't exist, returns ``NOT_FOUND``.
814
815 If the name is not provided in the request, the server will
816 assign a random name for this subscription on the same project
817 as the topic, conforming to the [resource name format]
818 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
819 The generated name is populated in the returned Subscription
820 object. Note that for REST API requests, you must specify a name
821 in the request.
822
823 .. code-block:: python
824
825 # This snippet has been automatically generated and should be regarded as a
826 # code template only.
827 # It will require modifications to work:
828 # - It may require correct/in-range values for request initialization.
829 # - It may require specifying regional endpoints when creating the service
830 # client as shown in:
831 # https://googleapis.dev/python/google-api-core/latest/client_options.html
832 from google import pubsub_v1
833
834 def sample_create_subscription():
835 # Create a client
836 client = pubsub_v1.SubscriberClient()
837
838 # Initialize request argument(s)
839 request = pubsub_v1.Subscription(
840 name="name_value",
841 topic="topic_value",
842 )
843
844 # Make the request
845 response = client.create_subscription(request=request)
846
847 # Handle the response
848 print(response)
849
850 Args:
851 request (Union[google.pubsub_v1.types.Subscription, dict]):
852 The request object. A subscription resource. If none of ``push_config``,
853 ``bigquery_config``, or ``cloud_storage_config`` is set,
854 then the subscriber will pull and ack messages using API
855 methods. At most one of these fields may be set.
856 name (str):
857 Required. The name of the subscription. It must have the
858 format
859 ``"projects/{project}/subscriptions/{subscription}"``.
860 ``{subscription}`` must start with a letter, and contain
861 only letters (``[A-Za-z]``), numbers (``[0-9]``), dashes
862 (``-``), underscores (``_``), periods (``.``), tildes
863 (``~``), plus (``+``) or percent signs (``%``). It must
864 be between 3 and 255 characters in length, and it must
865 not start with ``"goog"``.
866
867 This corresponds to the ``name`` field
868 on the ``request`` instance; if ``request`` is provided, this
869 should not be set.
870 topic (str):
871 Required. The name of the topic from which this
872 subscription is receiving messages. Format is
873 ``projects/{project}/topics/{topic}``. The value of this
874 field will be ``_deleted-topic_`` if the topic has been
875 deleted.
876
877 This corresponds to the ``topic`` field
878 on the ``request`` instance; if ``request`` is provided, this
879 should not be set.
880 push_config (google.pubsub_v1.types.PushConfig):
881 Optional. If push delivery is used
882 with this subscription, this field is
883 used to configure it.
884
885 This corresponds to the ``push_config`` field
886 on the ``request`` instance; if ``request`` is provided, this
887 should not be set.
888 ack_deadline_seconds (int):
889 Optional. The approximate amount of time (on a
890 best-effort basis) Pub/Sub waits for the subscriber to
891 acknowledge receipt before resending the message. In the
892 interval after the message is delivered and before it is
893 acknowledged, it is considered to be *outstanding*.
894 During that time period, the message will not be
895 redelivered (on a best-effort basis).
896
897 For pull subscriptions, this value is used as the
898 initial value for the ack deadline. To override this
899 value for a given message, call ``ModifyAckDeadline``
900 with the corresponding ``ack_id`` if using non-streaming
901 pull or send the ``ack_id`` in a
902 ``StreamingModifyAckDeadlineRequest`` if using streaming
903 pull. The minimum custom deadline you can specify is 10
904 seconds. The maximum custom deadline you can specify is
905 600 seconds (10 minutes). If this parameter is 0, a
906 default value of 10 seconds is used.
907
908 For push delivery, this value is also used to set the
909 request timeout for the call to the push endpoint.
910
911 If the subscriber never acknowledges the message, the
912 Pub/Sub system will eventually redeliver the message.
913
914 This corresponds to the ``ack_deadline_seconds`` field
915 on the ``request`` instance; if ``request`` is provided, this
916 should not be set.
917 retry (google.api_core.retry.Retry): Designation of what errors, if any,
918 should be retried.
919 timeout (float): The timeout for this request.
920 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
921 sent along with the request as metadata. Normally, each value must be of type `str`,
922 but for metadata keys ending with the suffix `-bin`, the corresponding values must
923 be of type `bytes`.
924
925 Returns:
926 google.pubsub_v1.types.Subscription:
927 A subscription resource. If none of push_config, bigquery_config, or
928 cloud_storage_config is set, then the subscriber will
929 pull and ack messages using API methods. At most one
930 of these fields may be set.
931
932 """
933 # Create or coerce a protobuf request object.
934 # - Quick check: If we got a request object, we should *not* have
935 # gotten any keyword arguments that map to the request.
936 flattened_params = [name, topic, push_config, ack_deadline_seconds]
937 has_flattened_params = (
938 len([param for param in flattened_params if param is not None]) > 0
939 )
940 if request is not None and has_flattened_params:
941 raise ValueError(
942 "If the `request` argument is set, then none of "
943 "the individual field arguments should be set."
944 )
945
946 # - Use the request object if provided (there's no risk of modifying the input as
947 # there are no flattened fields), or create one.
948 if not isinstance(request, pubsub.Subscription):
949 request = pubsub.Subscription(request)
950 # If we have keyword arguments corresponding to fields on the
951 # request, apply these.
952 if name is not None:
953 request.name = name
954 if topic is not None:
955 request.topic = topic
956 if push_config is not None:
957 request.push_config = push_config
958 if ack_deadline_seconds is not None:
959 request.ack_deadline_seconds = ack_deadline_seconds
960
961 # Wrap the RPC method; this adds retry and timeout information,
962 # and friendly error handling.
963 rpc = self._transport._wrapped_methods[self._transport.create_subscription]
964
965 # Certain fields should be provided within the metadata header;
966 # add these here.
967 metadata = tuple(metadata) + (
968 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
969 )
970
971 # Validate the universe domain.
972 self._validate_universe_domain()
973
974 # Send the request.
975 response = rpc(
976 request,
977 retry=retry,
978 timeout=timeout,
979 metadata=metadata,
980 )
981
982 # Done; return the response.
983 return response
984
985 def get_subscription(
986 self,
987 request: Optional[Union[pubsub.GetSubscriptionRequest, dict]] = None,
988 *,
989 subscription: Optional[str] = None,
990 retry: OptionalRetry = gapic_v1.method.DEFAULT,
991 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
992 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
993 ) -> pubsub.Subscription:
994 r"""Gets the configuration details of a subscription.
995
996 .. code-block:: python
997
998 # This snippet has been automatically generated and should be regarded as a
999 # code template only.
1000 # It will require modifications to work:
1001 # - It may require correct/in-range values for request initialization.
1002 # - It may require specifying regional endpoints when creating the service
1003 # client as shown in:
1004 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1005 from google import pubsub_v1
1006
1007 def sample_get_subscription():
1008 # Create a client
1009 client = pubsub_v1.SubscriberClient()
1010
1011 # Initialize request argument(s)
1012 request = pubsub_v1.GetSubscriptionRequest(
1013 subscription="subscription_value",
1014 )
1015
1016 # Make the request
1017 response = client.get_subscription(request=request)
1018
1019 # Handle the response
1020 print(response)
1021
1022 Args:
1023 request (Union[google.pubsub_v1.types.GetSubscriptionRequest, dict]):
1024 The request object. Request for the GetSubscription
1025 method.
1026 subscription (str):
1027 Required. The name of the subscription to get. Format is
1028 ``projects/{project}/subscriptions/{sub}``.
1029
1030 This corresponds to the ``subscription`` field
1031 on the ``request`` instance; if ``request`` is provided, this
1032 should not be set.
1033 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1034 should be retried.
1035 timeout (float): The timeout for this request.
1036 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1037 sent along with the request as metadata. Normally, each value must be of type `str`,
1038 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1039 be of type `bytes`.
1040
1041 Returns:
1042 google.pubsub_v1.types.Subscription:
1043 A subscription resource. If none of push_config, bigquery_config, or
1044 cloud_storage_config is set, then the subscriber will
1045 pull and ack messages using API methods. At most one
1046 of these fields may be set.
1047
1048 """
1049 # Create or coerce a protobuf request object.
1050 # - Quick check: If we got a request object, we should *not* have
1051 # gotten any keyword arguments that map to the request.
1052 flattened_params = [subscription]
1053 has_flattened_params = (
1054 len([param for param in flattened_params if param is not None]) > 0
1055 )
1056 if request is not None and has_flattened_params:
1057 raise ValueError(
1058 "If the `request` argument is set, then none of "
1059 "the individual field arguments should be set."
1060 )
1061
1062 # - Use the request object if provided (there's no risk of modifying the input as
1063 # there are no flattened fields), or create one.
1064 if not isinstance(request, pubsub.GetSubscriptionRequest):
1065 request = pubsub.GetSubscriptionRequest(request)
1066 # If we have keyword arguments corresponding to fields on the
1067 # request, apply these.
1068 if subscription is not None:
1069 request.subscription = subscription
1070
1071 # Wrap the RPC method; this adds retry and timeout information,
1072 # and friendly error handling.
1073 rpc = self._transport._wrapped_methods[self._transport.get_subscription]
1074
1075 # Certain fields should be provided within the metadata header;
1076 # add these here.
1077 metadata = tuple(metadata) + (
1078 gapic_v1.routing_header.to_grpc_metadata(
1079 (("subscription", request.subscription),)
1080 ),
1081 )
1082
1083 # Validate the universe domain.
1084 self._validate_universe_domain()
1085
1086 # Send the request.
1087 response = rpc(
1088 request,
1089 retry=retry,
1090 timeout=timeout,
1091 metadata=metadata,
1092 )
1093
1094 # Done; return the response.
1095 return response
1096
1097 def update_subscription(
1098 self,
1099 request: Optional[Union[pubsub.UpdateSubscriptionRequest, dict]] = None,
1100 *,
1101 subscription: Optional[pubsub.Subscription] = None,
1102 update_mask: Optional[field_mask_pb2.FieldMask] = None,
1103 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1104 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1105 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1106 ) -> pubsub.Subscription:
1107 r"""Updates an existing subscription by updating the
1108 fields specified in the update mask. Note that certain
1109 properties of a subscription, such as its topic, are not
1110 modifiable.
1111
1112 .. code-block:: python
1113
1114 # This snippet has been automatically generated and should be regarded as a
1115 # code template only.
1116 # It will require modifications to work:
1117 # - It may require correct/in-range values for request initialization.
1118 # - It may require specifying regional endpoints when creating the service
1119 # client as shown in:
1120 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1121 from google import pubsub_v1
1122
1123 def sample_update_subscription():
1124 # Create a client
1125 client = pubsub_v1.SubscriberClient()
1126
1127 # Initialize request argument(s)
1128 subscription = pubsub_v1.Subscription()
1129 subscription.name = "name_value"
1130 subscription.topic = "topic_value"
1131
1132 request = pubsub_v1.UpdateSubscriptionRequest(
1133 subscription=subscription,
1134 )
1135
1136 # Make the request
1137 response = client.update_subscription(request=request)
1138
1139 # Handle the response
1140 print(response)
1141
1142 Args:
1143 request (Union[google.pubsub_v1.types.UpdateSubscriptionRequest, dict]):
1144 The request object. Request for the UpdateSubscription
1145 method.
1146 subscription (google.pubsub_v1.types.Subscription):
1147 Required. The updated subscription
1148 object.
1149
1150 This corresponds to the ``subscription`` field
1151 on the ``request`` instance; if ``request`` is provided, this
1152 should not be set.
1153 update_mask (google.protobuf.field_mask_pb2.FieldMask):
1154 Required. Indicates which fields in
1155 the provided subscription to update.
1156 Must be specified and non-empty.
1157
1158 This corresponds to the ``update_mask`` field
1159 on the ``request`` instance; if ``request`` is provided, this
1160 should not be set.
1161 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1162 should be retried.
1163 timeout (float): The timeout for this request.
1164 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1165 sent along with the request as metadata. Normally, each value must be of type `str`,
1166 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1167 be of type `bytes`.
1168
1169 Returns:
1170 google.pubsub_v1.types.Subscription:
1171 A subscription resource. If none of push_config, bigquery_config, or
1172 cloud_storage_config is set, then the subscriber will
1173 pull and ack messages using API methods. At most one
1174 of these fields may be set.
1175
1176 """
1177 # Create or coerce a protobuf request object.
1178 # - Quick check: If we got a request object, we should *not* have
1179 # gotten any keyword arguments that map to the request.
1180 flattened_params = [subscription, update_mask]
1181 has_flattened_params = (
1182 len([param for param in flattened_params if param is not None]) > 0
1183 )
1184 if request is not None and has_flattened_params:
1185 raise ValueError(
1186 "If the `request` argument is set, then none of "
1187 "the individual field arguments should be set."
1188 )
1189
1190 # - Use the request object if provided (there's no risk of modifying the input as
1191 # there are no flattened fields), or create one.
1192 if not isinstance(request, pubsub.UpdateSubscriptionRequest):
1193 request = pubsub.UpdateSubscriptionRequest(request)
1194 # If we have keyword arguments corresponding to fields on the
1195 # request, apply these.
1196 if subscription is not None:
1197 request.subscription = subscription
1198 if update_mask is not None:
1199 request.update_mask = update_mask
1200
1201 # Wrap the RPC method; this adds retry and timeout information,
1202 # and friendly error handling.
1203 rpc = self._transport._wrapped_methods[self._transport.update_subscription]
1204
1205 # Certain fields should be provided within the metadata header;
1206 # add these here.
1207 metadata = tuple(metadata) + (
1208 gapic_v1.routing_header.to_grpc_metadata(
1209 (("subscription.name", request.subscription.name),)
1210 ),
1211 )
1212
1213 # Validate the universe domain.
1214 self._validate_universe_domain()
1215
1216 # Send the request.
1217 response = rpc(
1218 request,
1219 retry=retry,
1220 timeout=timeout,
1221 metadata=metadata,
1222 )
1223
1224 # Done; return the response.
1225 return response
1226
1227 def list_subscriptions(
1228 self,
1229 request: Optional[Union[pubsub.ListSubscriptionsRequest, dict]] = None,
1230 *,
1231 project: Optional[str] = None,
1232 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1233 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1234 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1235 ) -> pagers.ListSubscriptionsPager:
1236 r"""Lists matching subscriptions.
1237
1238 .. code-block:: python
1239
1240 # This snippet has been automatically generated and should be regarded as a
1241 # code template only.
1242 # It will require modifications to work:
1243 # - It may require correct/in-range values for request initialization.
1244 # - It may require specifying regional endpoints when creating the service
1245 # client as shown in:
1246 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1247 from google import pubsub_v1
1248
1249 def sample_list_subscriptions():
1250 # Create a client
1251 client = pubsub_v1.SubscriberClient()
1252
1253 # Initialize request argument(s)
1254 request = pubsub_v1.ListSubscriptionsRequest(
1255 project="project_value",
1256 )
1257
1258 # Make the request
1259 page_result = client.list_subscriptions(request=request)
1260
1261 # Handle the response
1262 for response in page_result:
1263 print(response)
1264
1265 Args:
1266 request (Union[google.pubsub_v1.types.ListSubscriptionsRequest, dict]):
1267 The request object. Request for the ``ListSubscriptions`` method.
1268 project (str):
1269 Required. The name of the project in which to list
1270 subscriptions. Format is ``projects/{project-id}``.
1271
1272 This corresponds to the ``project`` field
1273 on the ``request`` instance; if ``request`` is provided, this
1274 should not be set.
1275 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1276 should be retried.
1277 timeout (float): The timeout for this request.
1278 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1279 sent along with the request as metadata. Normally, each value must be of type `str`,
1280 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1281 be of type `bytes`.
1282
1283 Returns:
1284 google.pubsub_v1.services.subscriber.pagers.ListSubscriptionsPager:
1285 Response for the ListSubscriptions method.
1286
1287 Iterating over this object will yield results and
1288 resolve additional pages automatically.
1289
1290 """
1291 # Create or coerce a protobuf request object.
1292 # - Quick check: If we got a request object, we should *not* have
1293 # gotten any keyword arguments that map to the request.
1294 flattened_params = [project]
1295 has_flattened_params = (
1296 len([param for param in flattened_params if param is not None]) > 0
1297 )
1298 if request is not None and has_flattened_params:
1299 raise ValueError(
1300 "If the `request` argument is set, then none of "
1301 "the individual field arguments should be set."
1302 )
1303
1304 # - Use the request object if provided (there's no risk of modifying the input as
1305 # there are no flattened fields), or create one.
1306 if not isinstance(request, pubsub.ListSubscriptionsRequest):
1307 request = pubsub.ListSubscriptionsRequest(request)
1308 # If we have keyword arguments corresponding to fields on the
1309 # request, apply these.
1310 if project is not None:
1311 request.project = project
1312
1313 # Wrap the RPC method; this adds retry and timeout information,
1314 # and friendly error handling.
1315 rpc = self._transport._wrapped_methods[self._transport.list_subscriptions]
1316
1317 # Certain fields should be provided within the metadata header;
1318 # add these here.
1319 metadata = tuple(metadata) + (
1320 gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)),
1321 )
1322
1323 # Validate the universe domain.
1324 self._validate_universe_domain()
1325
1326 # Send the request.
1327 response = rpc(
1328 request,
1329 retry=retry,
1330 timeout=timeout,
1331 metadata=metadata,
1332 )
1333
1334 # This method is paged; wrap the response in a pager, which provides
1335 # an `__iter__` convenience method.
1336 response = pagers.ListSubscriptionsPager(
1337 method=rpc,
1338 request=request,
1339 response=response,
1340 retry=retry,
1341 timeout=timeout,
1342 metadata=metadata,
1343 )
1344
1345 # Done; return the response.
1346 return response
1347
1348 def delete_subscription(
1349 self,
1350 request: Optional[Union[pubsub.DeleteSubscriptionRequest, dict]] = None,
1351 *,
1352 subscription: Optional[str] = None,
1353 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1354 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1355 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1356 ) -> None:
1357 r"""Deletes an existing subscription. All messages retained in the
1358 subscription are immediately dropped. Calls to ``Pull`` after
1359 deletion will return ``NOT_FOUND``. After a subscription is
1360 deleted, a new one may be created with the same name, but the
1361 new one has no association with the old subscription or its
1362 topic unless the same topic is specified.
1363
1364 .. code-block:: python
1365
1366 # This snippet has been automatically generated and should be regarded as a
1367 # code template only.
1368 # It will require modifications to work:
1369 # - It may require correct/in-range values for request initialization.
1370 # - It may require specifying regional endpoints when creating the service
1371 # client as shown in:
1372 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1373 from google import pubsub_v1
1374
1375 def sample_delete_subscription():
1376 # Create a client
1377 client = pubsub_v1.SubscriberClient()
1378
1379 # Initialize request argument(s)
1380 request = pubsub_v1.DeleteSubscriptionRequest(
1381 subscription="subscription_value",
1382 )
1383
1384 # Make the request
1385 client.delete_subscription(request=request)
1386
1387 Args:
1388 request (Union[google.pubsub_v1.types.DeleteSubscriptionRequest, dict]):
1389 The request object. Request for the DeleteSubscription
1390 method.
1391 subscription (str):
1392 Required. The subscription to delete. Format is
1393 ``projects/{project}/subscriptions/{sub}``.
1394
1395 This corresponds to the ``subscription`` field
1396 on the ``request`` instance; if ``request`` is provided, this
1397 should not be set.
1398 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1399 should be retried.
1400 timeout (float): The timeout for this request.
1401 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1402 sent along with the request as metadata. Normally, each value must be of type `str`,
1403 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1404 be of type `bytes`.
1405 """
1406 # Create or coerce a protobuf request object.
1407 # - Quick check: If we got a request object, we should *not* have
1408 # gotten any keyword arguments that map to the request.
1409 flattened_params = [subscription]
1410 has_flattened_params = (
1411 len([param for param in flattened_params if param is not None]) > 0
1412 )
1413 if request is not None and has_flattened_params:
1414 raise ValueError(
1415 "If the `request` argument is set, then none of "
1416 "the individual field arguments should be set."
1417 )
1418
1419 # - Use the request object if provided (there's no risk of modifying the input as
1420 # there are no flattened fields), or create one.
1421 if not isinstance(request, pubsub.DeleteSubscriptionRequest):
1422 request = pubsub.DeleteSubscriptionRequest(request)
1423 # If we have keyword arguments corresponding to fields on the
1424 # request, apply these.
1425 if subscription is not None:
1426 request.subscription = subscription
1427
1428 # Wrap the RPC method; this adds retry and timeout information,
1429 # and friendly error handling.
1430 rpc = self._transport._wrapped_methods[self._transport.delete_subscription]
1431
1432 # Certain fields should be provided within the metadata header;
1433 # add these here.
1434 metadata = tuple(metadata) + (
1435 gapic_v1.routing_header.to_grpc_metadata(
1436 (("subscription", request.subscription),)
1437 ),
1438 )
1439
1440 # Validate the universe domain.
1441 self._validate_universe_domain()
1442
1443 # Send the request.
1444 rpc(
1445 request,
1446 retry=retry,
1447 timeout=timeout,
1448 metadata=metadata,
1449 )
1450
1451 def modify_ack_deadline(
1452 self,
1453 request: Optional[Union[pubsub.ModifyAckDeadlineRequest, dict]] = None,
1454 *,
1455 subscription: Optional[str] = None,
1456 ack_ids: Optional[MutableSequence[str]] = None,
1457 ack_deadline_seconds: Optional[int] = None,
1458 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1459 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1460 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1461 ) -> None:
1462 r"""Modifies the ack deadline for a specific message. This method is
1463 useful to indicate that more time is needed to process a message
1464 by the subscriber, or to make the message available for
1465 redelivery if the processing was interrupted. Note that this
1466 does not modify the subscription-level ``ackDeadlineSeconds``
1467 used for subsequent messages.
1468
1469 .. code-block:: python
1470
1471 # This snippet has been automatically generated and should be regarded as a
1472 # code template only.
1473 # It will require modifications to work:
1474 # - It may require correct/in-range values for request initialization.
1475 # - It may require specifying regional endpoints when creating the service
1476 # client as shown in:
1477 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1478 from google import pubsub_v1
1479
1480 def sample_modify_ack_deadline():
1481 # Create a client
1482 client = pubsub_v1.SubscriberClient()
1483
1484 # Initialize request argument(s)
1485 request = pubsub_v1.ModifyAckDeadlineRequest(
1486 subscription="subscription_value",
1487 ack_ids=['ack_ids_value1', 'ack_ids_value2'],
1488 ack_deadline_seconds=2066,
1489 )
1490
1491 # Make the request
1492 client.modify_ack_deadline(request=request)
1493
1494 Args:
1495 request (Union[google.pubsub_v1.types.ModifyAckDeadlineRequest, dict]):
1496 The request object. Request for the ModifyAckDeadline
1497 method.
1498 subscription (str):
1499 Required. The name of the subscription. Format is
1500 ``projects/{project}/subscriptions/{sub}``.
1501
1502 This corresponds to the ``subscription`` field
1503 on the ``request`` instance; if ``request`` is provided, this
1504 should not be set.
1505 ack_ids (MutableSequence[str]):
1506 Required. List of acknowledgment IDs.
1507 This corresponds to the ``ack_ids`` field
1508 on the ``request`` instance; if ``request`` is provided, this
1509 should not be set.
1510 ack_deadline_seconds (int):
1511 Required. The new ack deadline with respect to the time
1512 this request was sent to the Pub/Sub system. For
1513 example, if the value is 10, the new ack deadline will
1514 expire 10 seconds after the ``ModifyAckDeadline`` call
1515 was made. Specifying zero might immediately make the
1516 message available for delivery to another subscriber
1517 client. This typically results in an increase in the
1518 rate of message redeliveries (that is, duplicates). The
1519 minimum deadline you can specify is 0 seconds. The
1520 maximum deadline you can specify in a single request is
1521 600 seconds (10 minutes).
1522
1523 This corresponds to the ``ack_deadline_seconds`` field
1524 on the ``request`` instance; if ``request`` is provided, this
1525 should not be set.
1526 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1527 should be retried.
1528 timeout (float): The timeout for this request.
1529 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1530 sent along with the request as metadata. Normally, each value must be of type `str`,
1531 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1532 be of type `bytes`.
1533 """
1534 # Create or coerce a protobuf request object.
1535 # - Quick check: If we got a request object, we should *not* have
1536 # gotten any keyword arguments that map to the request.
1537 flattened_params = [subscription, ack_ids, ack_deadline_seconds]
1538 has_flattened_params = (
1539 len([param for param in flattened_params if param is not None]) > 0
1540 )
1541 if request is not None and has_flattened_params:
1542 raise ValueError(
1543 "If the `request` argument is set, then none of "
1544 "the individual field arguments should be set."
1545 )
1546
1547 # - Use the request object if provided (there's no risk of modifying the input as
1548 # there are no flattened fields), or create one.
1549 if not isinstance(request, pubsub.ModifyAckDeadlineRequest):
1550 request = pubsub.ModifyAckDeadlineRequest(request)
1551 # If we have keyword arguments corresponding to fields on the
1552 # request, apply these.
1553 if subscription is not None:
1554 request.subscription = subscription
1555 if ack_ids is not None:
1556 request.ack_ids = ack_ids
1557 if ack_deadline_seconds is not None:
1558 request.ack_deadline_seconds = ack_deadline_seconds
1559
1560 # Wrap the RPC method; this adds retry and timeout information,
1561 # and friendly error handling.
1562 rpc = self._transport._wrapped_methods[self._transport.modify_ack_deadline]
1563
1564 # Certain fields should be provided within the metadata header;
1565 # add these here.
1566 metadata = tuple(metadata) + (
1567 gapic_v1.routing_header.to_grpc_metadata(
1568 (("subscription", request.subscription),)
1569 ),
1570 )
1571
1572 # Validate the universe domain.
1573 self._validate_universe_domain()
1574
1575 # Send the request.
1576 rpc(
1577 request,
1578 retry=retry,
1579 timeout=timeout,
1580 metadata=metadata,
1581 )
1582
1583 def acknowledge(
1584 self,
1585 request: Optional[Union[pubsub.AcknowledgeRequest, dict]] = None,
1586 *,
1587 subscription: Optional[str] = None,
1588 ack_ids: Optional[MutableSequence[str]] = None,
1589 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1590 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1591 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1592 ) -> None:
1593 r"""Acknowledges the messages associated with the ``ack_ids`` in the
1594 ``AcknowledgeRequest``. The Pub/Sub system can remove the
1595 relevant messages from the subscription.
1596
1597 Acknowledging a message whose ack deadline has expired may
1598 succeed, but such a message may be redelivered later.
1599 Acknowledging a message more than once will not result in an
1600 error.
1601
1602 .. code-block:: python
1603
1604 # This snippet has been automatically generated and should be regarded as a
1605 # code template only.
1606 # It will require modifications to work:
1607 # - It may require correct/in-range values for request initialization.
1608 # - It may require specifying regional endpoints when creating the service
1609 # client as shown in:
1610 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1611 from google import pubsub_v1
1612
1613 def sample_acknowledge():
1614 # Create a client
1615 client = pubsub_v1.SubscriberClient()
1616
1617 # Initialize request argument(s)
1618 request = pubsub_v1.AcknowledgeRequest(
1619 subscription="subscription_value",
1620 ack_ids=['ack_ids_value1', 'ack_ids_value2'],
1621 )
1622
1623 # Make the request
1624 client.acknowledge(request=request)
1625
1626 Args:
1627 request (Union[google.pubsub_v1.types.AcknowledgeRequest, dict]):
1628 The request object. Request for the Acknowledge method.
1629 subscription (str):
1630 Required. The subscription whose message is being
1631 acknowledged. Format is
1632 ``projects/{project}/subscriptions/{sub}``.
1633
1634 This corresponds to the ``subscription`` field
1635 on the ``request`` instance; if ``request`` is provided, this
1636 should not be set.
1637 ack_ids (MutableSequence[str]):
1638 Required. The acknowledgment ID for the messages being
1639 acknowledged that was returned by the Pub/Sub system in
1640 the ``Pull`` response. Must not be empty.
1641
1642 This corresponds to the ``ack_ids`` field
1643 on the ``request`` instance; if ``request`` is provided, this
1644 should not be set.
1645 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1646 should be retried.
1647 timeout (float): The timeout for this request.
1648 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1649 sent along with the request as metadata. Normally, each value must be of type `str`,
1650 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1651 be of type `bytes`.
1652 """
1653 # Create or coerce a protobuf request object.
1654 # - Quick check: If we got a request object, we should *not* have
1655 # gotten any keyword arguments that map to the request.
1656 flattened_params = [subscription, ack_ids]
1657 has_flattened_params = (
1658 len([param for param in flattened_params if param is not None]) > 0
1659 )
1660 if request is not None and has_flattened_params:
1661 raise ValueError(
1662 "If the `request` argument is set, then none of "
1663 "the individual field arguments should be set."
1664 )
1665
1666 # - Use the request object if provided (there's no risk of modifying the input as
1667 # there are no flattened fields), or create one.
1668 if not isinstance(request, pubsub.AcknowledgeRequest):
1669 request = pubsub.AcknowledgeRequest(request)
1670 # If we have keyword arguments corresponding to fields on the
1671 # request, apply these.
1672 if subscription is not None:
1673 request.subscription = subscription
1674 if ack_ids is not None:
1675 request.ack_ids = ack_ids
1676
1677 # Wrap the RPC method; this adds retry and timeout information,
1678 # and friendly error handling.
1679 rpc = self._transport._wrapped_methods[self._transport.acknowledge]
1680
1681 # Certain fields should be provided within the metadata header;
1682 # add these here.
1683 metadata = tuple(metadata) + (
1684 gapic_v1.routing_header.to_grpc_metadata(
1685 (("subscription", request.subscription),)
1686 ),
1687 )
1688
1689 # Validate the universe domain.
1690 self._validate_universe_domain()
1691
1692 # Send the request.
1693 rpc(
1694 request,
1695 retry=retry,
1696 timeout=timeout,
1697 metadata=metadata,
1698 )
1699
1700 def pull(
1701 self,
1702 request: Optional[Union[pubsub.PullRequest, dict]] = None,
1703 *,
1704 subscription: Optional[str] = None,
1705 return_immediately: Optional[bool] = None,
1706 max_messages: Optional[int] = None,
1707 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1708 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1709 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1710 ) -> pubsub.PullResponse:
1711 r"""Pulls messages from the server.
1712
1713 .. code-block:: python
1714
1715 # This snippet has been automatically generated and should be regarded as a
1716 # code template only.
1717 # It will require modifications to work:
1718 # - It may require correct/in-range values for request initialization.
1719 # - It may require specifying regional endpoints when creating the service
1720 # client as shown in:
1721 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1722 from google import pubsub_v1
1723
1724 def sample_pull():
1725 # Create a client
1726 client = pubsub_v1.SubscriberClient()
1727
1728 # Initialize request argument(s)
1729 request = pubsub_v1.PullRequest(
1730 subscription="subscription_value",
1731 max_messages=1277,
1732 )
1733
1734 # Make the request
1735 response = client.pull(request=request)
1736
1737 # Handle the response
1738 print(response)
1739
1740 Args:
1741 request (Union[google.pubsub_v1.types.PullRequest, dict]):
1742 The request object. Request for the ``Pull`` method.
1743 subscription (str):
1744 Required. The subscription from which messages should be
1745 pulled. Format is
1746 ``projects/{project}/subscriptions/{sub}``.
1747
1748 This corresponds to the ``subscription`` field
1749 on the ``request`` instance; if ``request`` is provided, this
1750 should not be set.
1751 return_immediately (bool):
1752 Optional. If this field set to true, the system will
1753 respond immediately even if it there are no messages
1754 available to return in the ``Pull`` response. Otherwise,
1755 the system may wait (for a bounded amount of time) until
1756 at least one message is available, rather than returning
1757 no messages. Warning: setting this field to ``true`` is
1758 discouraged because it adversely impacts the performance
1759 of ``Pull`` operations. We recommend that users do not
1760 set this field.
1761
1762 This corresponds to the ``return_immediately`` field
1763 on the ``request`` instance; if ``request`` is provided, this
1764 should not be set.
1765 max_messages (int):
1766 Required. The maximum number of
1767 messages to return for this request.
1768 Must be a positive integer. The Pub/Sub
1769 system may return fewer than the number
1770 specified.
1771
1772 This corresponds to the ``max_messages`` field
1773 on the ``request`` instance; if ``request`` is provided, this
1774 should not be set.
1775 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1776 should be retried.
1777 timeout (float): The timeout for this request.
1778 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1779 sent along with the request as metadata. Normally, each value must be of type `str`,
1780 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1781 be of type `bytes`.
1782
1783 Returns:
1784 google.pubsub_v1.types.PullResponse:
1785 Response for the Pull method.
1786 """
1787 # Create or coerce a protobuf request object.
1788 # - Quick check: If we got a request object, we should *not* have
1789 # gotten any keyword arguments that map to the request.
1790 flattened_params = [subscription, return_immediately, max_messages]
1791 has_flattened_params = (
1792 len([param for param in flattened_params if param is not None]) > 0
1793 )
1794 if request is not None and has_flattened_params:
1795 raise ValueError(
1796 "If the `request` argument is set, then none of "
1797 "the individual field arguments should be set."
1798 )
1799
1800 # - Use the request object if provided (there's no risk of modifying the input as
1801 # there are no flattened fields), or create one.
1802 if not isinstance(request, pubsub.PullRequest):
1803 request = pubsub.PullRequest(request)
1804 # If we have keyword arguments corresponding to fields on the
1805 # request, apply these.
1806 if subscription is not None:
1807 request.subscription = subscription
1808 if return_immediately is not None:
1809 request.return_immediately = return_immediately
1810 if max_messages is not None:
1811 request.max_messages = max_messages
1812
1813 if request.return_immediately:
1814 warnings.warn(
1815 "The return_immediately flag is deprecated and should be set to False.",
1816 category=DeprecationWarning,
1817 )
1818
1819 # Wrap the RPC method; this adds retry and timeout information,
1820 # and friendly error handling.
1821 rpc = self._transport._wrapped_methods[self._transport.pull]
1822
1823 # Certain fields should be provided within the metadata header;
1824 # add these here.
1825 metadata = tuple(metadata) + (
1826 gapic_v1.routing_header.to_grpc_metadata(
1827 (("subscription", request.subscription),)
1828 ),
1829 )
1830
1831 # Validate the universe domain.
1832 self._validate_universe_domain()
1833
1834 # Send the request.
1835 response = rpc(
1836 request,
1837 retry=retry,
1838 timeout=timeout,
1839 metadata=metadata,
1840 )
1841
1842 # Done; return the response.
1843 return response
1844
1845 def streaming_pull(
1846 self,
1847 requests: Optional[Iterator[pubsub.StreamingPullRequest]] = None,
1848 *,
1849 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1850 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1851 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1852 ) -> Iterable[pubsub.StreamingPullResponse]:
1853 r"""Establishes a stream with the server, which sends messages down
1854 to the client. The client streams acknowledgments and ack
1855 deadline modifications back to the server. The server will close
1856 the stream and return the status on any error. The server may
1857 close the stream with status ``UNAVAILABLE`` to reassign
1858 server-side resources, in which case, the client should
1859 re-establish the stream. Flow control can be achieved by
1860 configuring the underlying RPC channel.
1861
1862 .. code-block:: python
1863
1864 # This snippet has been automatically generated and should be regarded as a
1865 # code template only.
1866 # It will require modifications to work:
1867 # - It may require correct/in-range values for request initialization.
1868 # - It may require specifying regional endpoints when creating the service
1869 # client as shown in:
1870 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1871 from google import pubsub_v1
1872
1873 def sample_streaming_pull():
1874 # Create a client
1875 client = pubsub_v1.SubscriberClient()
1876
1877 # Initialize request argument(s)
1878 request = pubsub_v1.StreamingPullRequest(
1879 subscription="subscription_value",
1880 stream_ack_deadline_seconds=2813,
1881 )
1882
1883 # This method expects an iterator which contains
1884 # 'pubsub_v1.StreamingPullRequest' objects
1885 # Here we create a generator that yields a single `request` for
1886 # demonstrative purposes.
1887 requests = [request]
1888
1889 def request_generator():
1890 for request in requests:
1891 yield request
1892
1893 # Make the request
1894 stream = client.streaming_pull(requests=request_generator())
1895
1896 # Handle the response
1897 for response in stream:
1898 print(response)
1899
1900 Args:
1901 requests (Iterator[google.pubsub_v1.types.StreamingPullRequest]):
1902 The request object iterator. Request for the ``StreamingPull`` streaming RPC method.
1903 This request is used to establish the initial stream as
1904 well as to stream acknowledgments and ack deadline
1905 modifications from the client to the server.
1906 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1907 should be retried.
1908 timeout (float): The timeout for this request.
1909 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1910 sent along with the request as metadata. Normally, each value must be of type `str`,
1911 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1912 be of type `bytes`.
1913
1914 Returns:
1915 Iterable[google.pubsub_v1.types.StreamingPullResponse]:
1916 Response for the StreamingPull method. This response is used to stream
1917 messages from the server to the client.
1918
1919 """
1920
1921 # Wrappers in api-core should not automatically pre-fetch the first
1922 # stream result, as this breaks the stream when re-opening it.
1923 # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
1924 self._transport.streaming_pull._prefetch_first_result_ = False
1925
1926 # Wrap the RPC method; this adds retry and timeout information,
1927 # and friendly error handling.
1928 rpc = self._transport._wrapped_methods[self._transport.streaming_pull]
1929
1930 # Validate the universe domain.
1931 self._validate_universe_domain()
1932
1933 # Send the request.
1934 response = rpc(
1935 requests,
1936 retry=retry,
1937 timeout=timeout,
1938 metadata=metadata,
1939 )
1940
1941 # Done; return the response.
1942 return response
1943
1944 def modify_push_config(
1945 self,
1946 request: Optional[Union[pubsub.ModifyPushConfigRequest, dict]] = None,
1947 *,
1948 subscription: Optional[str] = None,
1949 push_config: Optional[pubsub.PushConfig] = None,
1950 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1951 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1952 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1953 ) -> None:
1954 r"""Modifies the ``PushConfig`` for a specified subscription.
1955
1956 This may be used to change a push subscription to a pull one
1957 (signified by an empty ``PushConfig``) or vice versa, or change
1958 the endpoint URL and other attributes of a push subscription.
1959 Messages will accumulate for delivery continuously through the
1960 call regardless of changes to the ``PushConfig``.
1961
1962 .. code-block:: python
1963
1964 # This snippet has been automatically generated and should be regarded as a
1965 # code template only.
1966 # It will require modifications to work:
1967 # - It may require correct/in-range values for request initialization.
1968 # - It may require specifying regional endpoints when creating the service
1969 # client as shown in:
1970 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1971 from google import pubsub_v1
1972
1973 def sample_modify_push_config():
1974 # Create a client
1975 client = pubsub_v1.SubscriberClient()
1976
1977 # Initialize request argument(s)
1978 request = pubsub_v1.ModifyPushConfigRequest(
1979 subscription="subscription_value",
1980 )
1981
1982 # Make the request
1983 client.modify_push_config(request=request)
1984
1985 Args:
1986 request (Union[google.pubsub_v1.types.ModifyPushConfigRequest, dict]):
1987 The request object. Request for the ModifyPushConfig
1988 method.
1989 subscription (str):
1990 Required. The name of the subscription. Format is
1991 ``projects/{project}/subscriptions/{sub}``.
1992
1993 This corresponds to the ``subscription`` field
1994 on the ``request`` instance; if ``request`` is provided, this
1995 should not be set.
1996 push_config (google.pubsub_v1.types.PushConfig):
1997 Required. The push configuration for future deliveries.
1998
1999 An empty ``pushConfig`` indicates that the Pub/Sub
2000 system should stop pushing messages from the given
2001 subscription and allow messages to be pulled and
2002 acknowledged - effectively pausing the subscription if
2003 ``Pull`` or ``StreamingPull`` is not called.
2004
2005 This corresponds to the ``push_config`` field
2006 on the ``request`` instance; if ``request`` is provided, this
2007 should not be set.
2008 retry (google.api_core.retry.Retry): Designation of what errors, if any,
2009 should be retried.
2010 timeout (float): The timeout for this request.
2011 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2012 sent along with the request as metadata. Normally, each value must be of type `str`,
2013 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2014 be of type `bytes`.
2015 """
2016 # Create or coerce a protobuf request object.
2017 # - Quick check: If we got a request object, we should *not* have
2018 # gotten any keyword arguments that map to the request.
2019 flattened_params = [subscription, push_config]
2020 has_flattened_params = (
2021 len([param for param in flattened_params if param is not None]) > 0
2022 )
2023 if request is not None and has_flattened_params:
2024 raise ValueError(
2025 "If the `request` argument is set, then none of "
2026 "the individual field arguments should be set."
2027 )
2028
2029 # - Use the request object if provided (there's no risk of modifying the input as
2030 # there are no flattened fields), or create one.
2031 if not isinstance(request, pubsub.ModifyPushConfigRequest):
2032 request = pubsub.ModifyPushConfigRequest(request)
2033 # If we have keyword arguments corresponding to fields on the
2034 # request, apply these.
2035 if subscription is not None:
2036 request.subscription = subscription
2037 if push_config is not None:
2038 request.push_config = push_config
2039
2040 # Wrap the RPC method; this adds retry and timeout information,
2041 # and friendly error handling.
2042 rpc = self._transport._wrapped_methods[self._transport.modify_push_config]
2043
2044 # Certain fields should be provided within the metadata header;
2045 # add these here.
2046 metadata = tuple(metadata) + (
2047 gapic_v1.routing_header.to_grpc_metadata(
2048 (("subscription", request.subscription),)
2049 ),
2050 )
2051
2052 # Validate the universe domain.
2053 self._validate_universe_domain()
2054
2055 # Send the request.
2056 rpc(
2057 request,
2058 retry=retry,
2059 timeout=timeout,
2060 metadata=metadata,
2061 )
2062
2063 def get_snapshot(
2064 self,
2065 request: Optional[Union[pubsub.GetSnapshotRequest, dict]] = None,
2066 *,
2067 snapshot: Optional[str] = None,
2068 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2069 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2070 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2071 ) -> pubsub.Snapshot:
2072 r"""Gets the configuration details of a snapshot. Snapshots are used
2073 in
2074 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
2075 operations, which allow you to manage message acknowledgments in
2076 bulk. That is, you can set the acknowledgment state of messages
2077 in an existing subscription to the state captured by a snapshot.
2078
2079 .. code-block:: python
2080
2081 # This snippet has been automatically generated and should be regarded as a
2082 # code template only.
2083 # It will require modifications to work:
2084 # - It may require correct/in-range values for request initialization.
2085 # - It may require specifying regional endpoints when creating the service
2086 # client as shown in:
2087 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2088 from google import pubsub_v1
2089
2090 def sample_get_snapshot():
2091 # Create a client
2092 client = pubsub_v1.SubscriberClient()
2093
2094 # Initialize request argument(s)
2095 request = pubsub_v1.GetSnapshotRequest(
2096 snapshot="snapshot_value",
2097 )
2098
2099 # Make the request
2100 response = client.get_snapshot(request=request)
2101
2102 # Handle the response
2103 print(response)
2104
2105 Args:
2106 request (Union[google.pubsub_v1.types.GetSnapshotRequest, dict]):
2107 The request object. Request for the GetSnapshot method.
2108 snapshot (str):
2109 Required. The name of the snapshot to get. Format is
2110 ``projects/{project}/snapshots/{snap}``.
2111
2112 This corresponds to the ``snapshot`` field
2113 on the ``request`` instance; if ``request`` is provided, this
2114 should not be set.
2115 retry (google.api_core.retry.Retry): Designation of what errors, if any,
2116 should be retried.
2117 timeout (float): The timeout for this request.
2118 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2119 sent along with the request as metadata. Normally, each value must be of type `str`,
2120 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2121 be of type `bytes`.
2122
2123 Returns:
2124 google.pubsub_v1.types.Snapshot:
2125 A snapshot resource. Snapshots are used in
2126 [Seek](https://cloud.google.com/pubsub/docs/replay-overview)
2127 operations, which allow you to manage message
2128 acknowledgments in bulk. That is, you can set the
2129 acknowledgment state of messages in an existing
2130 subscription to the state captured by a snapshot.
2131
2132 """
2133 # Create or coerce a protobuf request object.
2134 # - Quick check: If we got a request object, we should *not* have
2135 # gotten any keyword arguments that map to the request.
2136 flattened_params = [snapshot]
2137 has_flattened_params = (
2138 len([param for param in flattened_params if param is not None]) > 0
2139 )
2140 if request is not None and has_flattened_params:
2141 raise ValueError(
2142 "If the `request` argument is set, then none of "
2143 "the individual field arguments should be set."
2144 )
2145
2146 # - Use the request object if provided (there's no risk of modifying the input as
2147 # there are no flattened fields), or create one.
2148 if not isinstance(request, pubsub.GetSnapshotRequest):
2149 request = pubsub.GetSnapshotRequest(request)
2150 # If we have keyword arguments corresponding to fields on the
2151 # request, apply these.
2152 if snapshot is not None:
2153 request.snapshot = snapshot
2154
2155 # Wrap the RPC method; this adds retry and timeout information,
2156 # and friendly error handling.
2157 rpc = self._transport._wrapped_methods[self._transport.get_snapshot]
2158
2159 # Certain fields should be provided within the metadata header;
2160 # add these here.
2161 metadata = tuple(metadata) + (
2162 gapic_v1.routing_header.to_grpc_metadata((("snapshot", request.snapshot),)),
2163 )
2164
2165 # Validate the universe domain.
2166 self._validate_universe_domain()
2167
2168 # Send the request.
2169 response = rpc(
2170 request,
2171 retry=retry,
2172 timeout=timeout,
2173 metadata=metadata,
2174 )
2175
2176 # Done; return the response.
2177 return response
2178
2179 def list_snapshots(
2180 self,
2181 request: Optional[Union[pubsub.ListSnapshotsRequest, dict]] = None,
2182 *,
2183 project: Optional[str] = None,
2184 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2185 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2186 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2187 ) -> pagers.ListSnapshotsPager:
2188 r"""Lists the existing snapshots. Snapshots are used in
2189 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
2190 operations, which allow you to manage message acknowledgments in
2191 bulk. That is, you can set the acknowledgment state of messages
2192 in an existing subscription to the state captured by a snapshot.
2193
2194 .. code-block:: python
2195
2196 # This snippet has been automatically generated and should be regarded as a
2197 # code template only.
2198 # It will require modifications to work:
2199 # - It may require correct/in-range values for request initialization.
2200 # - It may require specifying regional endpoints when creating the service
2201 # client as shown in:
2202 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2203 from google import pubsub_v1
2204
2205 def sample_list_snapshots():
2206 # Create a client
2207 client = pubsub_v1.SubscriberClient()
2208
2209 # Initialize request argument(s)
2210 request = pubsub_v1.ListSnapshotsRequest(
2211 project="project_value",
2212 )
2213
2214 # Make the request
2215 page_result = client.list_snapshots(request=request)
2216
2217 # Handle the response
2218 for response in page_result:
2219 print(response)
2220
2221 Args:
2222 request (Union[google.pubsub_v1.types.ListSnapshotsRequest, dict]):
2223 The request object. Request for the ``ListSnapshots`` method.
2224 project (str):
2225 Required. The name of the project in which to list
2226 snapshots. Format is ``projects/{project-id}``.
2227
2228 This corresponds to the ``project`` field
2229 on the ``request`` instance; if ``request`` is provided, this
2230 should not be set.
2231 retry (google.api_core.retry.Retry): Designation of what errors, if any,
2232 should be retried.
2233 timeout (float): The timeout for this request.
2234 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2235 sent along with the request as metadata. Normally, each value must be of type `str`,
2236 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2237 be of type `bytes`.
2238
2239 Returns:
2240 google.pubsub_v1.services.subscriber.pagers.ListSnapshotsPager:
2241 Response for the ListSnapshots method.
2242
2243 Iterating over this object will yield results and
2244 resolve additional pages automatically.
2245
2246 """
2247 # Create or coerce a protobuf request object.
2248 # - Quick check: If we got a request object, we should *not* have
2249 # gotten any keyword arguments that map to the request.
2250 flattened_params = [project]
2251 has_flattened_params = (
2252 len([param for param in flattened_params if param is not None]) > 0
2253 )
2254 if request is not None and has_flattened_params:
2255 raise ValueError(
2256 "If the `request` argument is set, then none of "
2257 "the individual field arguments should be set."
2258 )
2259
2260 # - Use the request object if provided (there's no risk of modifying the input as
2261 # there are no flattened fields), or create one.
2262 if not isinstance(request, pubsub.ListSnapshotsRequest):
2263 request = pubsub.ListSnapshotsRequest(request)
2264 # If we have keyword arguments corresponding to fields on the
2265 # request, apply these.
2266 if project is not None:
2267 request.project = project
2268
2269 # Wrap the RPC method; this adds retry and timeout information,
2270 # and friendly error handling.
2271 rpc = self._transport._wrapped_methods[self._transport.list_snapshots]
2272
2273 # Certain fields should be provided within the metadata header;
2274 # add these here.
2275 metadata = tuple(metadata) + (
2276 gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)),
2277 )
2278
2279 # Validate the universe domain.
2280 self._validate_universe_domain()
2281
2282 # Send the request.
2283 response = rpc(
2284 request,
2285 retry=retry,
2286 timeout=timeout,
2287 metadata=metadata,
2288 )
2289
2290 # This method is paged; wrap the response in a pager, which provides
2291 # an `__iter__` convenience method.
2292 response = pagers.ListSnapshotsPager(
2293 method=rpc,
2294 request=request,
2295 response=response,
2296 retry=retry,
2297 timeout=timeout,
2298 metadata=metadata,
2299 )
2300
2301 # Done; return the response.
2302 return response
2303
2304 def create_snapshot(
2305 self,
2306 request: Optional[Union[pubsub.CreateSnapshotRequest, dict]] = None,
2307 *,
2308 name: Optional[str] = None,
2309 subscription: Optional[str] = None,
2310 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2311 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2312 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2313 ) -> pubsub.Snapshot:
2314 r"""Creates a snapshot from the requested subscription. Snapshots
2315 are used in
2316 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
2317 operations, which allow you to manage message acknowledgments in
2318 bulk. That is, you can set the acknowledgment state of messages
2319 in an existing subscription to the state captured by a snapshot.
2320 If the snapshot already exists, returns ``ALREADY_EXISTS``. If
2321 the requested subscription doesn't exist, returns ``NOT_FOUND``.
2322 If the backlog in the subscription is too old -- and the
2323 resulting snapshot would expire in less than 1 hour -- then
2324 ``FAILED_PRECONDITION`` is returned. See also the
2325 ``Snapshot.expire_time`` field. If the name is not provided in
2326 the request, the server will assign a random name for this
2327 snapshot on the same project as the subscription, conforming to
2328 the [resource name format]
2329 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
2330 The generated name is populated in the returned Snapshot object.
2331 Note that for REST API requests, you must specify a name in the
2332 request.
2333
2334 .. code-block:: python
2335
2336 # This snippet has been automatically generated and should be regarded as a
2337 # code template only.
2338 # It will require modifications to work:
2339 # - It may require correct/in-range values for request initialization.
2340 # - It may require specifying regional endpoints when creating the service
2341 # client as shown in:
2342 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2343 from google import pubsub_v1
2344
2345 def sample_create_snapshot():
2346 # Create a client
2347 client = pubsub_v1.SubscriberClient()
2348
2349 # Initialize request argument(s)
2350 request = pubsub_v1.CreateSnapshotRequest(
2351 name="name_value",
2352 subscription="subscription_value",
2353 )
2354
2355 # Make the request
2356 response = client.create_snapshot(request=request)
2357
2358 # Handle the response
2359 print(response)
2360
2361 Args:
2362 request (Union[google.pubsub_v1.types.CreateSnapshotRequest, dict]):
2363 The request object. Request for the ``CreateSnapshot`` method.
2364 name (str):
2365 Required. User-provided name for this snapshot. If the
2366 name is not provided in the request, the server will
2367 assign a random name for this snapshot on the same
2368 project as the subscription. Note that for REST API
2369 requests, you must specify a name. See the `resource
2370 name
2371 rules <https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names>`__.
2372 Format is ``projects/{project}/snapshots/{snap}``.
2373
2374 This corresponds to the ``name`` field
2375 on the ``request`` instance; if ``request`` is provided, this
2376 should not be set.
2377 subscription (str):
2378 Required. The subscription whose backlog the snapshot
2379 retains. Specifically, the created snapshot is
2380 guaranteed to retain: (a) The existing backlog on the
2381 subscription. More precisely, this is defined as the
2382 messages in the subscription's backlog that are
2383 unacknowledged upon the successful completion of the
2384 ``CreateSnapshot`` request; as well as: (b) Any messages
2385 published to the subscription's topic following the
2386 successful completion of the CreateSnapshot request.
2387 Format is ``projects/{project}/subscriptions/{sub}``.
2388
2389 This corresponds to the ``subscription`` field
2390 on the ``request`` instance; if ``request`` is provided, this
2391 should not be set.
2392 retry (google.api_core.retry.Retry): Designation of what errors, if any,
2393 should be retried.
2394 timeout (float): The timeout for this request.
2395 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2396 sent along with the request as metadata. Normally, each value must be of type `str`,
2397 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2398 be of type `bytes`.
2399
2400 Returns:
2401 google.pubsub_v1.types.Snapshot:
2402 A snapshot resource. Snapshots are used in
2403 [Seek](https://cloud.google.com/pubsub/docs/replay-overview)
2404 operations, which allow you to manage message
2405 acknowledgments in bulk. That is, you can set the
2406 acknowledgment state of messages in an existing
2407 subscription to the state captured by a snapshot.
2408
2409 """
2410 # Create or coerce a protobuf request object.
2411 # - Quick check: If we got a request object, we should *not* have
2412 # gotten any keyword arguments that map to the request.
2413 flattened_params = [name, subscription]
2414 has_flattened_params = (
2415 len([param for param in flattened_params if param is not None]) > 0
2416 )
2417 if request is not None and has_flattened_params:
2418 raise ValueError(
2419 "If the `request` argument is set, then none of "
2420 "the individual field arguments should be set."
2421 )
2422
2423 # - Use the request object if provided (there's no risk of modifying the input as
2424 # there are no flattened fields), or create one.
2425 if not isinstance(request, pubsub.CreateSnapshotRequest):
2426 request = pubsub.CreateSnapshotRequest(request)
2427 # If we have keyword arguments corresponding to fields on the
2428 # request, apply these.
2429 if name is not None:
2430 request.name = name
2431 if subscription is not None:
2432 request.subscription = subscription
2433
2434 # Wrap the RPC method; this adds retry and timeout information,
2435 # and friendly error handling.
2436 rpc = self._transport._wrapped_methods[self._transport.create_snapshot]
2437
2438 # Certain fields should be provided within the metadata header;
2439 # add these here.
2440 metadata = tuple(metadata) + (
2441 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
2442 )
2443
2444 # Validate the universe domain.
2445 self._validate_universe_domain()
2446
2447 # Send the request.
2448 response = rpc(
2449 request,
2450 retry=retry,
2451 timeout=timeout,
2452 metadata=metadata,
2453 )
2454
2455 # Done; return the response.
2456 return response
2457
2458 def update_snapshot(
2459 self,
2460 request: Optional[Union[pubsub.UpdateSnapshotRequest, dict]] = None,
2461 *,
2462 snapshot: Optional[pubsub.Snapshot] = None,
2463 update_mask: Optional[field_mask_pb2.FieldMask] = None,
2464 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2465 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2466 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2467 ) -> pubsub.Snapshot:
2468 r"""Updates an existing snapshot by updating the fields specified in
2469 the update mask. Snapshots are used in
2470 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
2471 operations, which allow you to manage message acknowledgments in
2472 bulk. That is, you can set the acknowledgment state of messages
2473 in an existing subscription to the state captured by a snapshot.
2474
2475 .. code-block:: python
2476
2477 # This snippet has been automatically generated and should be regarded as a
2478 # code template only.
2479 # It will require modifications to work:
2480 # - It may require correct/in-range values for request initialization.
2481 # - It may require specifying regional endpoints when creating the service
2482 # client as shown in:
2483 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2484 from google import pubsub_v1
2485
2486 def sample_update_snapshot():
2487 # Create a client
2488 client = pubsub_v1.SubscriberClient()
2489
2490 # Initialize request argument(s)
2491 request = pubsub_v1.UpdateSnapshotRequest(
2492 )
2493
2494 # Make the request
2495 response = client.update_snapshot(request=request)
2496
2497 # Handle the response
2498 print(response)
2499
2500 Args:
2501 request (Union[google.pubsub_v1.types.UpdateSnapshotRequest, dict]):
2502 The request object. Request for the UpdateSnapshot
2503 method.
2504 snapshot (google.pubsub_v1.types.Snapshot):
2505 Required. The updated snapshot
2506 object.
2507
2508 This corresponds to the ``snapshot`` field
2509 on the ``request`` instance; if ``request`` is provided, this
2510 should not be set.
2511 update_mask (google.protobuf.field_mask_pb2.FieldMask):
2512 Required. Indicates which fields in
2513 the provided snapshot to update. Must be
2514 specified and non-empty.
2515
2516 This corresponds to the ``update_mask`` field
2517 on the ``request`` instance; if ``request`` is provided, this
2518 should not be set.
2519 retry (google.api_core.retry.Retry): Designation of what errors, if any,
2520 should be retried.
2521 timeout (float): The timeout for this request.
2522 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2523 sent along with the request as metadata. Normally, each value must be of type `str`,
2524 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2525 be of type `bytes`.
2526
2527 Returns:
2528 google.pubsub_v1.types.Snapshot:
2529 A snapshot resource. Snapshots are used in
2530 [Seek](https://cloud.google.com/pubsub/docs/replay-overview)
2531 operations, which allow you to manage message
2532 acknowledgments in bulk. That is, you can set the
2533 acknowledgment state of messages in an existing
2534 subscription to the state captured by a snapshot.
2535
2536 """
2537 # Create or coerce a protobuf request object.
2538 # - Quick check: If we got a request object, we should *not* have
2539 # gotten any keyword arguments that map to the request.
2540 flattened_params = [snapshot, update_mask]
2541 has_flattened_params = (
2542 len([param for param in flattened_params if param is not None]) > 0
2543 )
2544 if request is not None and has_flattened_params:
2545 raise ValueError(
2546 "If the `request` argument is set, then none of "
2547 "the individual field arguments should be set."
2548 )
2549
2550 # - Use the request object if provided (there's no risk of modifying the input as
2551 # there are no flattened fields), or create one.
2552 if not isinstance(request, pubsub.UpdateSnapshotRequest):
2553 request = pubsub.UpdateSnapshotRequest(request)
2554 # If we have keyword arguments corresponding to fields on the
2555 # request, apply these.
2556 if snapshot is not None:
2557 request.snapshot = snapshot
2558 if update_mask is not None:
2559 request.update_mask = update_mask
2560
2561 # Wrap the RPC method; this adds retry and timeout information,
2562 # and friendly error handling.
2563 rpc = self._transport._wrapped_methods[self._transport.update_snapshot]
2564
2565 # Certain fields should be provided within the metadata header;
2566 # add these here.
2567 metadata = tuple(metadata) + (
2568 gapic_v1.routing_header.to_grpc_metadata(
2569 (("snapshot.name", request.snapshot.name),)
2570 ),
2571 )
2572
2573 # Validate the universe domain.
2574 self._validate_universe_domain()
2575
2576 # Send the request.
2577 response = rpc(
2578 request,
2579 retry=retry,
2580 timeout=timeout,
2581 metadata=metadata,
2582 )
2583
2584 # Done; return the response.
2585 return response
2586
2587 def delete_snapshot(
2588 self,
2589 request: Optional[Union[pubsub.DeleteSnapshotRequest, dict]] = None,
2590 *,
2591 snapshot: Optional[str] = None,
2592 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2593 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2594 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2595 ) -> None:
2596 r"""Removes an existing snapshot. Snapshots are used in [Seek]
2597 (https://cloud.google.com/pubsub/docs/replay-overview)
2598 operations, which allow you to manage message acknowledgments in
2599 bulk. That is, you can set the acknowledgment state of messages
2600 in an existing subscription to the state captured by a snapshot.
2601 When the snapshot is deleted, all messages retained in the
2602 snapshot are immediately dropped. After a snapshot is deleted, a
2603 new one may be created with the same name, but the new one has
2604 no association with the old snapshot or its subscription, unless
2605 the same subscription is specified.
2606
2607 .. code-block:: python
2608
2609 # This snippet has been automatically generated and should be regarded as a
2610 # code template only.
2611 # It will require modifications to work:
2612 # - It may require correct/in-range values for request initialization.
2613 # - It may require specifying regional endpoints when creating the service
2614 # client as shown in:
2615 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2616 from google import pubsub_v1
2617
2618 def sample_delete_snapshot():
2619 # Create a client
2620 client = pubsub_v1.SubscriberClient()
2621
2622 # Initialize request argument(s)
2623 request = pubsub_v1.DeleteSnapshotRequest(
2624 snapshot="snapshot_value",
2625 )
2626
2627 # Make the request
2628 client.delete_snapshot(request=request)
2629
2630 Args:
2631 request (Union[google.pubsub_v1.types.DeleteSnapshotRequest, dict]):
2632 The request object. Request for the ``DeleteSnapshot`` method.
2633 snapshot (str):
2634 Required. The name of the snapshot to delete. Format is
2635 ``projects/{project}/snapshots/{snap}``.
2636
2637 This corresponds to the ``snapshot`` field
2638 on the ``request`` instance; if ``request`` is provided, this
2639 should not be set.
2640 retry (google.api_core.retry.Retry): Designation of what errors, if any,
2641 should be retried.
2642 timeout (float): The timeout for this request.
2643 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2644 sent along with the request as metadata. Normally, each value must be of type `str`,
2645 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2646 be of type `bytes`.
2647 """
2648 # Create or coerce a protobuf request object.
2649 # - Quick check: If we got a request object, we should *not* have
2650 # gotten any keyword arguments that map to the request.
2651 flattened_params = [snapshot]
2652 has_flattened_params = (
2653 len([param for param in flattened_params if param is not None]) > 0
2654 )
2655 if request is not None and has_flattened_params:
2656 raise ValueError(
2657 "If the `request` argument is set, then none of "
2658 "the individual field arguments should be set."
2659 )
2660
2661 # - Use the request object if provided (there's no risk of modifying the input as
2662 # there are no flattened fields), or create one.
2663 if not isinstance(request, pubsub.DeleteSnapshotRequest):
2664 request = pubsub.DeleteSnapshotRequest(request)
2665 # If we have keyword arguments corresponding to fields on the
2666 # request, apply these.
2667 if snapshot is not None:
2668 request.snapshot = snapshot
2669
2670 # Wrap the RPC method; this adds retry and timeout information,
2671 # and friendly error handling.
2672 rpc = self._transport._wrapped_methods[self._transport.delete_snapshot]
2673
2674 # Certain fields should be provided within the metadata header;
2675 # add these here.
2676 metadata = tuple(metadata) + (
2677 gapic_v1.routing_header.to_grpc_metadata((("snapshot", request.snapshot),)),
2678 )
2679
2680 # Validate the universe domain.
2681 self._validate_universe_domain()
2682
2683 # Send the request.
2684 rpc(
2685 request,
2686 retry=retry,
2687 timeout=timeout,
2688 metadata=metadata,
2689 )
2690
2691 def seek(
2692 self,
2693 request: Optional[Union[pubsub.SeekRequest, dict]] = None,
2694 *,
2695 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2696 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2697 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2698 ) -> pubsub.SeekResponse:
2699 r"""Seeks an existing subscription to a point in time or to a given
2700 snapshot, whichever is provided in the request. Snapshots are
2701 used in [Seek]
2702 (https://cloud.google.com/pubsub/docs/replay-overview)
2703 operations, which allow you to manage message acknowledgments in
2704 bulk. That is, you can set the acknowledgment state of messages
2705 in an existing subscription to the state captured by a snapshot.
2706 Note that both the subscription and the snapshot must be on the
2707 same topic.
2708
2709 .. code-block:: python
2710
2711 # This snippet has been automatically generated and should be regarded as a
2712 # code template only.
2713 # It will require modifications to work:
2714 # - It may require correct/in-range values for request initialization.
2715 # - It may require specifying regional endpoints when creating the service
2716 # client as shown in:
2717 # https://googleapis.dev/python/google-api-core/latest/client_options.html
2718 from google import pubsub_v1
2719
2720 def sample_seek():
2721 # Create a client
2722 client = pubsub_v1.SubscriberClient()
2723
2724 # Initialize request argument(s)
2725 request = pubsub_v1.SeekRequest(
2726 subscription="subscription_value",
2727 )
2728
2729 # Make the request
2730 response = client.seek(request=request)
2731
2732 # Handle the response
2733 print(response)
2734
2735 Args:
2736 request (Union[google.pubsub_v1.types.SeekRequest, dict]):
2737 The request object. Request for the ``Seek`` method.
2738 retry (google.api_core.retry.Retry): Designation of what errors, if any,
2739 should be retried.
2740 timeout (float): The timeout for this request.
2741 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2742 sent along with the request as metadata. Normally, each value must be of type `str`,
2743 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2744 be of type `bytes`.
2745
2746 Returns:
2747 google.pubsub_v1.types.SeekResponse:
2748 Response for the Seek method (this response is empty).
2749 """
2750 # Create or coerce a protobuf request object.
2751 # - Use the request object if provided (there's no risk of modifying the input as
2752 # there are no flattened fields), or create one.
2753 if not isinstance(request, pubsub.SeekRequest):
2754 request = pubsub.SeekRequest(request)
2755
2756 # Wrap the RPC method; this adds retry and timeout information,
2757 # and friendly error handling.
2758 rpc = self._transport._wrapped_methods[self._transport.seek]
2759
2760 # Certain fields should be provided within the metadata header;
2761 # add these here.
2762 metadata = tuple(metadata) + (
2763 gapic_v1.routing_header.to_grpc_metadata(
2764 (("subscription", request.subscription),)
2765 ),
2766 )
2767
2768 # Validate the universe domain.
2769 self._validate_universe_domain()
2770
2771 # Send the request.
2772 response = rpc(
2773 request,
2774 retry=retry,
2775 timeout=timeout,
2776 metadata=metadata,
2777 )
2778
2779 # Done; return the response.
2780 return response
2781
2782 def __enter__(self) -> "SubscriberClient":
2783 return self
2784
2785 def __exit__(self, type, value, traceback):
2786 """Releases underlying transport's resources.
2787
2788 .. warning::
2789 ONLY use as a context manager if the transport is NOT shared
2790 with other clients! Exiting the with block will CLOSE the transport
2791 and may cause errors in other clients!
2792 """
2793 self.transport.close()
2794
2795 def set_iam_policy(
2796 self,
2797 request: Optional[iam_policy_pb2.SetIamPolicyRequest] = None,
2798 *,
2799 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2800 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2801 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2802 ) -> policy_pb2.Policy:
2803 r"""Sets the IAM access control policy on the specified function.
2804
2805 Replaces any existing policy.
2806
2807 Args:
2808 request (:class:`~.iam_policy_pb2.SetIamPolicyRequest`):
2809 The request object. Request message for `SetIamPolicy`
2810 method.
2811 retry (google.api_core.retry.Retry): Designation of what errors, if any,
2812 should be retried.
2813 timeout (float): The timeout for this request.
2814 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2815 sent along with the request as metadata. Normally, each value must be of type `str`,
2816 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2817 be of type `bytes`.
2818 Returns:
2819 ~.policy_pb2.Policy:
2820 Defines an Identity and Access Management (IAM) policy.
2821 It is used to specify access control policies for Cloud
2822 Platform resources.
2823 A ``Policy`` is a collection of ``bindings``. A
2824 ``binding`` binds one or more ``members`` to a single
2825 ``role``. Members can be user accounts, service
2826 accounts, Google groups, and domains (such as G Suite).
2827 A ``role`` is a named list of permissions (defined by
2828 IAM or configured by users). A ``binding`` can
2829 optionally specify a ``condition``, which is a logic
2830 expression that further constrains the role binding
2831 based on attributes about the request and/or target
2832 resource.
2833
2834 **JSON Example**
2835
2836 ::
2837
2838 {
2839 "bindings": [
2840 {
2841 "role": "roles/resourcemanager.organizationAdmin",
2842 "members": [
2843 "user:mike@example.com",
2844 "group:admins@example.com",
2845 "domain:google.com",
2846 "serviceAccount:my-project-id@appspot.gserviceaccount.com"
2847 ]
2848 },
2849 {
2850 "role": "roles/resourcemanager.organizationViewer",
2851 "members": ["user:eve@example.com"],
2852 "condition": {
2853 "title": "expirable access",
2854 "description": "Does not grant access after Sep 2020",
2855 "expression": "request.time <
2856 timestamp('2020-10-01T00:00:00.000Z')",
2857 }
2858 }
2859 ]
2860 }
2861
2862 **YAML Example**
2863
2864 ::
2865
2866 bindings:
2867 - members:
2868 - user:mike@example.com
2869 - group:admins@example.com
2870 - domain:google.com
2871 - serviceAccount:my-project-id@appspot.gserviceaccount.com
2872 role: roles/resourcemanager.organizationAdmin
2873 - members:
2874 - user:eve@example.com
2875 role: roles/resourcemanager.organizationViewer
2876 condition:
2877 title: expirable access
2878 description: Does not grant access after Sep 2020
2879 expression: request.time < timestamp('2020-10-01T00:00:00.000Z')
2880
2881 For a description of IAM and its features, see the `IAM
2882 developer's
2883 guide <https://cloud.google.com/iam/docs>`__.
2884 """
2885 # Create or coerce a protobuf request object.
2886
2887 # The request isn't a proto-plus wrapped type,
2888 # so it must be constructed via keyword expansion.
2889 if isinstance(request, dict):
2890 request = iam_policy_pb2.SetIamPolicyRequest(**request)
2891
2892 # Wrap the RPC method; this adds retry and timeout information,
2893 # and friendly error handling.
2894 rpc = self._transport._wrapped_methods[self._transport.set_iam_policy]
2895
2896 # Certain fields should be provided within the metadata header;
2897 # add these here.
2898 metadata = tuple(metadata) + (
2899 gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
2900 )
2901
2902 # Validate the universe domain.
2903 self._validate_universe_domain()
2904
2905 try:
2906 # Send the request.
2907 response = rpc(
2908 request,
2909 retry=retry,
2910 timeout=timeout,
2911 metadata=metadata,
2912 )
2913
2914 # Done; return the response.
2915 return response
2916 except core_exceptions.GoogleAPICallError as e:
2917 self._add_cred_info_for_auth_errors(e)
2918 raise e
2919
2920 def get_iam_policy(
2921 self,
2922 request: Optional[iam_policy_pb2.GetIamPolicyRequest] = None,
2923 *,
2924 retry: OptionalRetry = gapic_v1.method.DEFAULT,
2925 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
2926 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
2927 ) -> policy_pb2.Policy:
2928 r"""Gets the IAM access control policy for a function.
2929
2930 Returns an empty policy if the function exists and does not have a
2931 policy set.
2932
2933 Args:
2934 request (:class:`~.iam_policy_pb2.GetIamPolicyRequest`):
2935 The request object. Request message for `GetIamPolicy`
2936 method.
2937 retry (google.api_core.retry.Retry): Designation of what errors, if
2938 any, should be retried.
2939 timeout (float): The timeout for this request.
2940 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
2941 sent along with the request as metadata. Normally, each value must be of type `str`,
2942 but for metadata keys ending with the suffix `-bin`, the corresponding values must
2943 be of type `bytes`.
2944 Returns:
2945 ~.policy_pb2.Policy:
2946 Defines an Identity and Access Management (IAM) policy.
2947 It is used to specify access control policies for Cloud
2948 Platform resources.
2949 A ``Policy`` is a collection of ``bindings``. A
2950 ``binding`` binds one or more ``members`` to a single
2951 ``role``. Members can be user accounts, service
2952 accounts, Google groups, and domains (such as G Suite).
2953 A ``role`` is a named list of permissions (defined by
2954 IAM or configured by users). A ``binding`` can
2955 optionally specify a ``condition``, which is a logic
2956 expression that further constrains the role binding
2957 based on attributes about the request and/or target
2958 resource.
2959
2960 **JSON Example**
2961
2962 ::
2963
2964 {
2965 "bindings": [
2966 {
2967 "role": "roles/resourcemanager.organizationAdmin",
2968 "members": [
2969 "user:mike@example.com",
2970 "group:admins@example.com",
2971 "domain:google.com",
2972 "serviceAccount:my-project-id@appspot.gserviceaccount.com"
2973 ]
2974 },
2975 {
2976 "role": "roles/resourcemanager.organizationViewer",
2977 "members": ["user:eve@example.com"],
2978 "condition": {
2979 "title": "expirable access",
2980 "description": "Does not grant access after Sep 2020",
2981 "expression": "request.time <
2982 timestamp('2020-10-01T00:00:00.000Z')",
2983 }
2984 }
2985 ]
2986 }
2987
2988 **YAML Example**
2989
2990 ::
2991
2992 bindings:
2993 - members:
2994 - user:mike@example.com
2995 - group:admins@example.com
2996 - domain:google.com
2997 - serviceAccount:my-project-id@appspot.gserviceaccount.com
2998 role: roles/resourcemanager.organizationAdmin
2999 - members:
3000 - user:eve@example.com
3001 role: roles/resourcemanager.organizationViewer
3002 condition:
3003 title: expirable access
3004 description: Does not grant access after Sep 2020
3005 expression: request.time < timestamp('2020-10-01T00:00:00.000Z')
3006
3007 For a description of IAM and its features, see the `IAM
3008 developer's
3009 guide <https://cloud.google.com/iam/docs>`__.
3010 """
3011 # Create or coerce a protobuf request object.
3012
3013 # The request isn't a proto-plus wrapped type,
3014 # so it must be constructed via keyword expansion.
3015 if isinstance(request, dict):
3016 request = iam_policy_pb2.GetIamPolicyRequest(**request)
3017
3018 # Wrap the RPC method; this adds retry and timeout information,
3019 # and friendly error handling.
3020 rpc = self._transport._wrapped_methods[self._transport.get_iam_policy]
3021
3022 # Certain fields should be provided within the metadata header;
3023 # add these here.
3024 metadata = tuple(metadata) + (
3025 gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
3026 )
3027
3028 # Validate the universe domain.
3029 self._validate_universe_domain()
3030
3031 try:
3032 # Send the request.
3033 response = rpc(
3034 request,
3035 retry=retry,
3036 timeout=timeout,
3037 metadata=metadata,
3038 )
3039
3040 # Done; return the response.
3041 return response
3042 except core_exceptions.GoogleAPICallError as e:
3043 self._add_cred_info_for_auth_errors(e)
3044 raise e
3045
3046 def test_iam_permissions(
3047 self,
3048 request: Optional[iam_policy_pb2.TestIamPermissionsRequest] = None,
3049 *,
3050 retry: OptionalRetry = gapic_v1.method.DEFAULT,
3051 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
3052 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
3053 ) -> iam_policy_pb2.TestIamPermissionsResponse:
3054 r"""Tests the specified IAM permissions against the IAM access control
3055 policy for a function.
3056
3057 If the function does not exist, this will return an empty set
3058 of permissions, not a NOT_FOUND error.
3059
3060 Args:
3061 request (:class:`~.iam_policy_pb2.TestIamPermissionsRequest`):
3062 The request object. Request message for
3063 `TestIamPermissions` method.
3064 retry (google.api_core.retry.Retry): Designation of what errors,
3065 if any, should be retried.
3066 timeout (float): The timeout for this request.
3067 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
3068 sent along with the request as metadata. Normally, each value must be of type `str`,
3069 but for metadata keys ending with the suffix `-bin`, the corresponding values must
3070 be of type `bytes`.
3071 Returns:
3072 ~.iam_policy_pb2.TestIamPermissionsResponse:
3073 Response message for ``TestIamPermissions`` method.
3074 """
3075 # Create or coerce a protobuf request object.
3076
3077 # The request isn't a proto-plus wrapped type,
3078 # so it must be constructed via keyword expansion.
3079 if isinstance(request, dict):
3080 request = iam_policy_pb2.TestIamPermissionsRequest(**request)
3081
3082 # Wrap the RPC method; this adds retry and timeout information,
3083 # and friendly error handling.
3084 rpc = self._transport._wrapped_methods[self._transport.test_iam_permissions]
3085
3086 # Certain fields should be provided within the metadata header;
3087 # add these here.
3088 metadata = tuple(metadata) + (
3089 gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
3090 )
3091
3092 # Validate the universe domain.
3093 self._validate_universe_domain()
3094
3095 try:
3096 # Send the request.
3097 response = rpc(
3098 request,
3099 retry=retry,
3100 timeout=timeout,
3101 metadata=metadata,
3102 )
3103
3104 # Done; return the response.
3105 return response
3106 except core_exceptions.GoogleAPICallError as e:
3107 self._add_cred_info_for_auth_errors(e)
3108 raise e
3109
3110
3111DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
3112 client_library_version=package_version.__version__
3113)
3114
3115if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
3116 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
3117
3118__all__ = ("SubscriberClient",)