1# Copyright 2019, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import sys
18import os
19import typing
20from typing import cast, Any, Callable, Optional, Sequence, Union
21import warnings
22
23from google.auth.credentials import AnonymousCredentials # type: ignore
24from google.oauth2 import service_account # type: ignore
25
26from google.cloud.pubsub_v1 import types
27from google.cloud.pubsub_v1.subscriber import futures
28from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager
29from google.pubsub_v1.services.subscriber import client as subscriber_client
30from google.pubsub_v1 import gapic_version as package_version
31
32if typing.TYPE_CHECKING: # pragma: NO COVER
33 from google.cloud.pubsub_v1 import subscriber
34 from google.pubsub_v1.services.subscriber.transports.grpc import (
35 SubscriberGrpcTransport,
36 )
37
38__version__ = package_version.__version__
39
40
41class Client(subscriber_client.SubscriberClient):
42 """A subscriber client for Google Cloud Pub/Sub.
43
44 This creates an object that is capable of subscribing to messages.
45 Generally, you can instantiate this client with no arguments, and you
46 get sensible defaults.
47
48 Args:
49 kwargs: Any additional arguments provided are sent as keyword
50 keyword arguments to the underlying
51 :class:`~google.cloud.pubsub_v1.gapic.subscriber_client.SubscriberClient`.
52 Generally you should not need to set additional keyword
53 arguments. Optionally, regional endpoints can be set via
54 ``client_options`` that takes a single key-value pair that
55 defines the endpoint.
56
57 Example:
58
59 .. code-block:: python
60
61 from google.cloud import pubsub_v1
62
63 subscriber_client = pubsub_v1.SubscriberClient(
64 # Optional
65 client_options = {
66 "api_endpoint": REGIONAL_ENDPOINT
67 }
68 )
69 """
70
71 def __init__(
72 self,
73 subscriber_options: Union[types.SubscriberOptions, Sequence] = (),
74 **kwargs: Any
75 ):
76 assert (
77 isinstance(subscriber_options, types.SubscriberOptions)
78 or len(subscriber_options) == 0
79 ), "subscriber_options must be of type SubscriberOptions or an empty sequence."
80
81 # Sanity check: Is our goal to use the emulator?
82 # If so, create a grpc insecure channel with the emulator host
83 # as the target.
84 # TODO(https://github.com/googleapis/python-pubsub/issues/1349): Move the emulator
85 # code below to test files.
86 if os.environ.get("PUBSUB_EMULATOR_HOST"):
87 kwargs["client_options"] = {
88 "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST")
89 }
90 # Configure credentials directly to transport, if provided.
91 if "transport" not in kwargs:
92 kwargs["credentials"] = AnonymousCredentials()
93
94 # Instantiate the underlying GAPIC client.
95 super().__init__(**kwargs)
96 self._target = self._transport._host
97 self._closed = False
98
99 self.subscriber_options = types.SubscriberOptions(*subscriber_options)
100
101 # Set / override Open Telemetry option.
102 self._open_telemetry_enabled = (
103 self.subscriber_options.enable_open_telemetry_tracing
104 )
105 # OpenTelemetry features used by the library are not supported in Python versions <= 3.7.
106 # Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389
107 if (
108 self.subscriber_options.enable_open_telemetry_tracing
109 and sys.version_info.major == 3
110 and sys.version_info.minor < 8
111 ):
112 warnings.warn(
113 message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.",
114 category=RuntimeWarning,
115 )
116 self._open_telemetry_enabled = False
117
118 @property
119 def open_telemetry_enabled(self) -> bool:
120 """
121 Returns True if Open Telemetry is enabled. False otherwise.
122 """
123 return self._open_telemetry_enabled # pragma: NO COVER
124
125 @classmethod
126 def from_service_account_file( # type: ignore[override]
127 cls, filename: str, **kwargs: Any
128 ) -> "Client":
129 """Creates an instance of this client using the provided credentials
130 file.
131
132 Args:
133 filename: The path to the service account private key json file.
134 kwargs: Additional arguments to pass to the constructor.
135
136 Returns:
137 A Subscriber :class:`~google.cloud.pubsub_v1.subscriber.client.Client`
138 instance that is the constructed client.
139 """
140 credentials = service_account.Credentials.from_service_account_file(filename)
141 kwargs["credentials"] = credentials
142 return cls(**kwargs)
143
144 from_service_account_json = from_service_account_file # type: ignore[assignment]
145
146 @property
147 def target(self) -> str:
148 """Return the target (where the API is).
149
150 Returns:
151 The location of the API.
152 """
153 return self._target
154
155 @property
156 def closed(self) -> bool:
157 """Return whether the client has been closed and cannot be used anymore.
158
159 .. versionadded:: 2.8.0
160 """
161 return self._closed
162
163 @property
164 def api(self):
165 """The underlying gapic API client.
166
167 .. versionchanged:: 2.10.0
168 Instead of a GAPIC ``SubscriberClient`` client instance, this property is a
169 proxy object to it with the same interface.
170
171 .. deprecated:: 2.10.0
172 Use the GAPIC methods and properties on the client instance directly
173 instead of through the :attr:`api` attribute.
174 """
175 msg = (
176 'The "api" property only exists for backward compatibility, access its '
177 'attributes directly thorugh the client instance (e.g. "client.foo" '
178 'instead of "client.api.foo").'
179 )
180 warnings.warn(msg, category=DeprecationWarning)
181 return super()
182
183 def subscribe(
184 self,
185 subscription: str,
186 callback: Callable[["subscriber.message.Message"], Any],
187 flow_control: Union[types.FlowControl, Sequence] = (),
188 scheduler: Optional["subscriber.scheduler.ThreadScheduler"] = None,
189 use_legacy_flow_control: bool = False,
190 await_callbacks_on_shutdown: bool = False,
191 ) -> futures.StreamingPullFuture:
192 """Asynchronously start receiving messages on a given subscription.
193
194 This method starts a background thread to begin pulling messages from
195 a Pub/Sub subscription and scheduling them to be processed using the
196 provided ``callback``.
197
198 The ``callback`` will be called with an individual
199 :class:`google.cloud.pubsub_v1.subscriber.message.Message`. It is the
200 responsibility of the callback to either call ``ack()`` or ``nack()``
201 on the message when it finished processing. If an exception occurs in
202 the callback during processing, the exception is logged and the message
203 is ``nack()`` ed.
204
205 The ``flow_control`` argument can be used to control the rate of at
206 which messages are pulled. The settings are relatively conservative by
207 default to prevent "message hoarding" - a situation where the client
208 pulls a large number of messages but can not process them fast enough
209 leading it to "starve" other clients of messages. Increasing these
210 settings may lead to faster throughput for messages that do not take
211 a long time to process.
212
213 The ``use_legacy_flow_control`` argument disables enforcing flow control
214 settings at the Cloud Pub/Sub server, and only the client side flow control
215 will be enforced.
216
217 This method starts the receiver in the background and returns a
218 *Future* representing its execution. Waiting on the future (calling
219 ``result()``) will block forever or until a non-recoverable error
220 is encountered (such as loss of network connectivity). Cancelling the
221 future will signal the process to shutdown gracefully and exit.
222
223 .. note:: This uses Pub/Sub's *streaming pull* feature. This feature
224 properties that may be surprising. Please take a look at
225 https://cloud.google.com/pubsub/docs/pull#streamingpull for
226 more details on how streaming pull behaves compared to the
227 synchronous pull method.
228
229 Example:
230
231 .. code-block:: python
232
233 from google.cloud import pubsub_v1
234
235 subscriber_client = pubsub_v1.SubscriberClient()
236
237 # existing subscription
238 subscription = subscriber_client.subscription_path(
239 'my-project-id', 'my-subscription')
240
241 def callback(message):
242 print(message)
243 message.ack()
244
245 future = subscriber_client.subscribe(
246 subscription, callback)
247
248 try:
249 future.result()
250 except KeyboardInterrupt:
251 future.cancel() # Trigger the shutdown.
252 future.result() # Block until the shutdown is complete.
253
254 Args:
255 subscription:
256 The name of the subscription. The subscription should have already been
257 created (for example, by using :meth:`create_subscription`).
258 callback:
259 The callback function. This function receives the message as
260 its only argument and will be called from a different thread/
261 process depending on the scheduling strategy.
262 flow_control:
263 The flow control settings. Use this to prevent situations where you are
264 inundated with too many messages at once.
265 scheduler:
266 An optional *scheduler* to use when executing the callback. This
267 controls how callbacks are executed concurrently. This object must not
268 be shared across multiple ``SubscriberClient`` instances.
269 use_legacy_flow_control (bool):
270 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled,
271 though client-side flow control is still enabled. If set to ``False``
272 (default), both server-side and client-side flow control are enabled.
273 await_callbacks_on_shutdown:
274 If ``True``, after canceling the returned future, the latter's
275 ``result()`` method will block until the background stream and its
276 helper threads have been terminated, and all currently executing message
277 callbacks are done processing.
278
279 If ``False`` (default), the returned future's ``result()`` method will
280 not block after canceling the future. The method will instead return
281 immediately after the background stream and its helper threads have been
282 terminated, but some of the message callback threads might still be
283 running at that point.
284
285 Returns:
286 A future instance that can be used to manage the background stream.
287 """
288 flow_control = types.FlowControl(*flow_control)
289
290 manager = streaming_pull_manager.StreamingPullManager(
291 self,
292 subscription,
293 flow_control=flow_control,
294 scheduler=scheduler,
295 use_legacy_flow_control=use_legacy_flow_control,
296 await_callbacks_on_shutdown=await_callbacks_on_shutdown,
297 )
298
299 future = futures.StreamingPullFuture(manager)
300
301 manager.open(callback=callback, on_callback_error=future.set_exception)
302
303 return future
304
305 def close(self) -> None:
306 """Close the underlying channel to release socket resources.
307
308 After a channel has been closed, the client instance cannot be used
309 anymore.
310
311 This method is idempotent.
312 """
313 transport = cast("SubscriberGrpcTransport", self._transport)
314 transport.grpc_channel.close()
315 self._closed = True
316
317 def __enter__(self) -> "Client":
318 if self._closed:
319 raise RuntimeError("Closed subscriber cannot be used as context manager.")
320 return self
321
322 def __exit__(self, exc_type, exc_val, exc_tb):
323 self.close()