Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/client.py: 52%
54 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2019, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import absolute_import
17import os
18import typing
19from typing import cast, Any, Callable, Optional, Sequence, Union
20import warnings
22from google.auth.credentials import AnonymousCredentials # type: ignore
23from google.oauth2 import service_account # type: ignore
25from google.cloud.pubsub_v1 import types
26from google.cloud.pubsub_v1.subscriber import futures
27from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager
28from google.pubsub_v1.services.subscriber import client as subscriber_client
29from google.pubsub_v1 import gapic_version as package_version
31if typing.TYPE_CHECKING: # pragma: NO COVER
32 from google.cloud.pubsub_v1 import subscriber
33 from google.pubsub_v1.services.subscriber.transports.grpc import (
34 SubscriberGrpcTransport,
35 )
37__version__ = package_version.__version__
40class Client(subscriber_client.SubscriberClient):
41 """A subscriber client for Google Cloud Pub/Sub.
43 This creates an object that is capable of subscribing to messages.
44 Generally, you can instantiate this client with no arguments, and you
45 get sensible defaults.
47 Args:
48 kwargs: Any additional arguments provided are sent as keyword
49 keyword arguments to the underlying
50 :class:`~google.cloud.pubsub_v1.gapic.subscriber_client.SubscriberClient`.
51 Generally you should not need to set additional keyword
52 arguments. Optionally, regional endpoints can be set via
53 ``client_options`` that takes a single key-value pair that
54 defines the endpoint.
56 Example:
58 .. code-block:: python
60 from google.cloud import pubsub_v1
62 subscriber_client = pubsub_v1.SubscriberClient(
63 # Optional
64 client_options = {
65 "api_endpoint": REGIONAL_ENDPOINT
66 }
67 )
68 """
70 def __init__(self, **kwargs: Any):
71 # Sanity check: Is our goal to use the emulator?
72 # If so, create a grpc insecure channel with the emulator host
73 # as the target.
74 if os.environ.get("PUBSUB_EMULATOR_HOST"):
75 kwargs["client_options"] = {
76 "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST")
77 }
78 kwargs["credentials"] = AnonymousCredentials()
80 # Instantiate the underlying GAPIC client.
81 super().__init__(**kwargs)
82 self._target = self._transport._host
83 self._closed = False
85 @classmethod
86 def from_service_account_file( # type: ignore[override]
87 cls, filename: str, **kwargs: Any
88 ) -> "Client":
89 """Creates an instance of this client using the provided credentials
90 file.
92 Args:
93 filename: The path to the service account private key json file.
94 kwargs: Additional arguments to pass to the constructor.
96 Returns:
97 A Subscriber :class:`~google.cloud.pubsub_v1.subscriber.client.Client`
98 instance that is the constructed client.
99 """
100 credentials = service_account.Credentials.from_service_account_file(filename)
101 kwargs["credentials"] = credentials
102 return cls(**kwargs)
104 from_service_account_json = from_service_account_file # type: ignore[assignment]
106 @property
107 def target(self) -> str:
108 """Return the target (where the API is).
110 Returns:
111 The location of the API.
112 """
113 return self._target
115 @property
116 def closed(self) -> bool:
117 """Return whether the client has been closed and cannot be used anymore.
119 .. versionadded:: 2.8.0
120 """
121 return self._closed
123 @property
124 def api(self):
125 """The underlying gapic API client.
127 .. versionchanged:: 2.10.0
128 Instead of a GAPIC ``SubscriberClient`` client instance, this property is a
129 proxy object to it with the same interface.
131 .. deprecated:: 2.10.0
132 Use the GAPIC methods and properties on the client instance directly
133 instead of through the :attr:`api` attribute.
134 """
135 msg = (
136 'The "api" property only exists for backward compatibility, access its '
137 'attributes directly thorugh the client instance (e.g. "client.foo" '
138 'instead of "client.api.foo").'
139 )
140 warnings.warn(msg, category=DeprecationWarning)
141 return super()
143 def subscribe(
144 self,
145 subscription: str,
146 callback: Callable[["subscriber.message.Message"], Any],
147 flow_control: Union[types.FlowControl, Sequence] = (),
148 scheduler: Optional["subscriber.scheduler.ThreadScheduler"] = None,
149 use_legacy_flow_control: bool = False,
150 await_callbacks_on_shutdown: bool = False,
151 ) -> futures.StreamingPullFuture:
152 """Asynchronously start receiving messages on a given subscription.
154 This method starts a background thread to begin pulling messages from
155 a Pub/Sub subscription and scheduling them to be processed using the
156 provided ``callback``.
158 The ``callback`` will be called with an individual
159 :class:`google.cloud.pubsub_v1.subscriber.message.Message`. It is the
160 responsibility of the callback to either call ``ack()`` or ``nack()``
161 on the message when it finished processing. If an exception occurs in
162 the callback during processing, the exception is logged and the message
163 is ``nack()`` ed.
165 The ``flow_control`` argument can be used to control the rate of at
166 which messages are pulled. The settings are relatively conservative by
167 default to prevent "message hoarding" - a situation where the client
168 pulls a large number of messages but can not process them fast enough
169 leading it to "starve" other clients of messages. Increasing these
170 settings may lead to faster throughput for messages that do not take
171 a long time to process.
173 The ``use_legacy_flow_control`` argument disables enforcing flow control
174 settings at the Cloud Pub/Sub server, and only the client side flow control
175 will be enforced.
177 This method starts the receiver in the background and returns a
178 *Future* representing its execution. Waiting on the future (calling
179 ``result()``) will block forever or until a non-recoverable error
180 is encountered (such as loss of network connectivity). Cancelling the
181 future will signal the process to shutdown gracefully and exit.
183 .. note:: This uses Pub/Sub's *streaming pull* feature. This feature
184 properties that may be surprising. Please take a look at
185 https://cloud.google.com/pubsub/docs/pull#streamingpull for
186 more details on how streaming pull behaves compared to the
187 synchronous pull method.
189 Example:
191 .. code-block:: python
193 from google.cloud import pubsub_v1
195 subscriber_client = pubsub_v1.SubscriberClient()
197 # existing subscription
198 subscription = subscriber_client.subscription_path(
199 'my-project-id', 'my-subscription')
201 def callback(message):
202 print(message)
203 message.ack()
205 future = subscriber_client.subscribe(
206 subscription, callback)
208 try:
209 future.result()
210 except KeyboardInterrupt:
211 future.cancel() # Trigger the shutdown.
212 future.result() # Block until the shutdown is complete.
214 Args:
215 subscription:
216 The name of the subscription. The subscription should have already been
217 created (for example, by using :meth:`create_subscription`).
218 callback:
219 The callback function. This function receives the message as
220 its only argument and will be called from a different thread/
221 process depending on the scheduling strategy.
222 flow_control:
223 The flow control settings. Use this to prevent situations where you are
224 inundated with too many messages at once.
225 scheduler:
226 An optional *scheduler* to use when executing the callback. This
227 controls how callbacks are executed concurrently. This object must not
228 be shared across multiple ``SubscriberClient`` instances.
229 use_legacy_flow_control (bool):
230 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled,
231 though client-side flow control is still enabled. If set to ``False``
232 (default), both server-side and client-side flow control are enabled.
233 await_callbacks_on_shutdown:
234 If ``True``, after canceling the returned future, the latter's
235 ``result()`` method will block until the background stream and its
236 helper threads have been terminated, and all currently executing message
237 callbacks are done processing.
239 If ``False`` (default), the returned future's ``result()`` method will
240 not block after canceling the future. The method will instead return
241 immediately after the background stream and its helper threads have been
242 terminated, but some of the message callback threads might still be
243 running at that point.
245 Returns:
246 A future instance that can be used to manage the background stream.
247 """
248 flow_control = types.FlowControl(*flow_control)
250 manager = streaming_pull_manager.StreamingPullManager(
251 self,
252 subscription,
253 flow_control=flow_control,
254 scheduler=scheduler,
255 use_legacy_flow_control=use_legacy_flow_control,
256 await_callbacks_on_shutdown=await_callbacks_on_shutdown,
257 )
259 future = futures.StreamingPullFuture(manager)
261 manager.open(callback=callback, on_callback_error=future.set_exception)
263 return future
265 def close(self) -> None:
266 """Close the underlying channel to release socket resources.
268 After a channel has been closed, the client instance cannot be used
269 anymore.
271 This method is idempotent.
272 """
273 transport = cast("SubscriberGrpcTransport", self._transport)
274 transport.grpc_channel.close()
275 self._closed = True
277 def __enter__(self) -> "Client":
278 if self._closed:
279 raise RuntimeError("Closed subscriber cannot be used as context manager.")
280 return self
282 def __exit__(self, exc_type, exc_val, exc_tb):
283 self.close()