Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/client.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1# Copyright 2019, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import sys 

18import os 

19import typing 

20from typing import cast, Any, Callable, Optional, Sequence, Union 

21import warnings 

22 

23from google.auth.credentials import AnonymousCredentials # type: ignore 

24from google.oauth2 import service_account # type: ignore 

25 

26from google.cloud.pubsub_v1 import types 

27from google.cloud.pubsub_v1.subscriber import futures 

28from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager 

29from google.pubsub_v1.services.subscriber import client as subscriber_client 

30from google.pubsub_v1 import gapic_version as package_version 

31 

32if typing.TYPE_CHECKING: # pragma: NO COVER 

33 from google.cloud.pubsub_v1 import subscriber 

34 from google.pubsub_v1.services.subscriber.transports.grpc import ( 

35 SubscriberGrpcTransport, 

36 ) 

37 

38__version__ = package_version.__version__ 

39 

40 

41class Client(subscriber_client.SubscriberClient): 

42 """A subscriber client for Google Cloud Pub/Sub. 

43 

44 This creates an object that is capable of subscribing to messages. 

45 Generally, you can instantiate this client with no arguments, and you 

46 get sensible defaults. 

47 

48 Args: 

49 kwargs: Any additional arguments provided are sent as keyword 

50 keyword arguments to the underlying 

51 :class:`~google.cloud.pubsub_v1.gapic.subscriber_client.SubscriberClient`. 

52 Generally you should not need to set additional keyword 

53 arguments. Optionally, regional endpoints can be set via 

54 ``client_options`` that takes a single key-value pair that 

55 defines the endpoint. 

56 

57 Example: 

58 

59 .. code-block:: python 

60 

61 from google.cloud import pubsub_v1 

62 

63 subscriber_client = pubsub_v1.SubscriberClient( 

64 # Optional 

65 client_options = { 

66 "api_endpoint": REGIONAL_ENDPOINT 

67 } 

68 ) 

69 """ 

70 

71 def __init__( 

72 self, 

73 subscriber_options: Union[types.SubscriberOptions, Sequence] = (), 

74 **kwargs: Any 

75 ): 

76 assert ( 

77 isinstance(subscriber_options, types.SubscriberOptions) 

78 or len(subscriber_options) == 0 

79 ), "subscriber_options must be of type SubscriberOptions or an empty sequence." 

80 

81 # Sanity check: Is our goal to use the emulator? 

82 # If so, create a grpc insecure channel with the emulator host 

83 # as the target. 

84 # TODO(https://github.com/googleapis/python-pubsub/issues/1349): Move the emulator 

85 # code below to test files. 

86 if os.environ.get("PUBSUB_EMULATOR_HOST"): 

87 kwargs["client_options"] = { 

88 "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST") 

89 } 

90 # Configure credentials directly to transport, if provided. 

91 if "transport" not in kwargs: 

92 kwargs["credentials"] = AnonymousCredentials() 

93 

94 # Instantiate the underlying GAPIC client. 

95 super().__init__(**kwargs) 

96 self._target = self._transport._host 

97 self._closed = False 

98 

99 self.subscriber_options = types.SubscriberOptions(*subscriber_options) 

100 

101 # Set / override Open Telemetry option. 

102 self._open_telemetry_enabled = ( 

103 self.subscriber_options.enable_open_telemetry_tracing 

104 ) 

105 # OpenTelemetry features used by the library are not supported in Python versions <= 3.7. 

106 # Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389 

107 if ( 

108 self.subscriber_options.enable_open_telemetry_tracing 

109 and sys.version_info.major == 3 

110 and sys.version_info.minor < 8 

111 ): 

112 warnings.warn( 

113 message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.", 

114 category=RuntimeWarning, 

115 ) 

116 self._open_telemetry_enabled = False 

117 

118 @property 

119 def open_telemetry_enabled(self) -> bool: 

120 """ 

121 Returns True if Open Telemetry is enabled. False otherwise. 

122 """ 

123 return self._open_telemetry_enabled # pragma: NO COVER 

124 

125 @classmethod 

126 def from_service_account_file( # type: ignore[override] 

127 cls, filename: str, **kwargs: Any 

128 ) -> "Client": 

129 """Creates an instance of this client using the provided credentials 

130 file. 

131 

132 Args: 

133 filename: The path to the service account private key json file. 

134 kwargs: Additional arguments to pass to the constructor. 

135 

136 Returns: 

137 A Subscriber :class:`~google.cloud.pubsub_v1.subscriber.client.Client` 

138 instance that is the constructed client. 

139 """ 

140 credentials = service_account.Credentials.from_service_account_file(filename) 

141 kwargs["credentials"] = credentials 

142 return cls(**kwargs) 

143 

144 from_service_account_json = from_service_account_file # type: ignore[assignment] 

145 

146 @property 

147 def target(self) -> str: 

148 """Return the target (where the API is). 

149 

150 Returns: 

151 The location of the API. 

152 """ 

153 return self._target 

154 

155 @property 

156 def closed(self) -> bool: 

157 """Return whether the client has been closed and cannot be used anymore. 

158 

159 .. versionadded:: 2.8.0 

160 """ 

161 return self._closed 

162 

163 @property 

164 def api(self): 

165 """The underlying gapic API client. 

166 

167 .. versionchanged:: 2.10.0 

168 Instead of a GAPIC ``SubscriberClient`` client instance, this property is a 

169 proxy object to it with the same interface. 

170 

171 .. deprecated:: 2.10.0 

172 Use the GAPIC methods and properties on the client instance directly 

173 instead of through the :attr:`api` attribute. 

174 """ 

175 msg = ( 

176 'The "api" property only exists for backward compatibility, access its ' 

177 'attributes directly thorugh the client instance (e.g. "client.foo" ' 

178 'instead of "client.api.foo").' 

179 ) 

180 warnings.warn(msg, category=DeprecationWarning) 

181 return super() 

182 

183 def subscribe( 

184 self, 

185 subscription: str, 

186 callback: Callable[["subscriber.message.Message"], Any], 

187 flow_control: Union[types.FlowControl, Sequence] = (), 

188 scheduler: Optional["subscriber.scheduler.ThreadScheduler"] = None, 

189 use_legacy_flow_control: bool = False, 

190 await_callbacks_on_shutdown: bool = False, 

191 ) -> futures.StreamingPullFuture: 

192 """Asynchronously start receiving messages on a given subscription. 

193 

194 This method starts a background thread to begin pulling messages from 

195 a Pub/Sub subscription and scheduling them to be processed using the 

196 provided ``callback``. 

197 

198 The ``callback`` will be called with an individual 

199 :class:`google.cloud.pubsub_v1.subscriber.message.Message`. It is the 

200 responsibility of the callback to either call ``ack()`` or ``nack()`` 

201 on the message when it finished processing. If an exception occurs in 

202 the callback during processing, the exception is logged and the message 

203 is ``nack()`` ed. 

204 

205 The ``flow_control`` argument can be used to control the rate of at 

206 which messages are pulled. The settings are relatively conservative by 

207 default to prevent "message hoarding" - a situation where the client 

208 pulls a large number of messages but can not process them fast enough 

209 leading it to "starve" other clients of messages. Increasing these 

210 settings may lead to faster throughput for messages that do not take 

211 a long time to process. 

212 

213 The ``use_legacy_flow_control`` argument disables enforcing flow control 

214 settings at the Cloud Pub/Sub server, and only the client side flow control 

215 will be enforced. 

216 

217 This method starts the receiver in the background and returns a 

218 *Future* representing its execution. Waiting on the future (calling 

219 ``result()``) will block forever or until a non-recoverable error 

220 is encountered (such as loss of network connectivity). Cancelling the 

221 future will signal the process to shutdown gracefully and exit. 

222 

223 .. note:: This uses Pub/Sub's *streaming pull* feature. This feature 

224 properties that may be surprising. Please take a look at 

225 https://cloud.google.com/pubsub/docs/pull#streamingpull for 

226 more details on how streaming pull behaves compared to the 

227 synchronous pull method. 

228 

229 Example: 

230 

231 .. code-block:: python 

232 

233 from google.cloud import pubsub_v1 

234 

235 subscriber_client = pubsub_v1.SubscriberClient() 

236 

237 # existing subscription 

238 subscription = subscriber_client.subscription_path( 

239 'my-project-id', 'my-subscription') 

240 

241 def callback(message): 

242 print(message) 

243 message.ack() 

244 

245 future = subscriber_client.subscribe( 

246 subscription, callback) 

247 

248 try: 

249 future.result() 

250 except KeyboardInterrupt: 

251 future.cancel() # Trigger the shutdown. 

252 future.result() # Block until the shutdown is complete. 

253 

254 Args: 

255 subscription: 

256 The name of the subscription. The subscription should have already been 

257 created (for example, by using :meth:`create_subscription`). 

258 callback: 

259 The callback function. This function receives the message as 

260 its only argument and will be called from a different thread/ 

261 process depending on the scheduling strategy. 

262 flow_control: 

263 The flow control settings. Use this to prevent situations where you are 

264 inundated with too many messages at once. 

265 scheduler: 

266 An optional *scheduler* to use when executing the callback. This 

267 controls how callbacks are executed concurrently. This object must not 

268 be shared across multiple ``SubscriberClient`` instances. 

269 use_legacy_flow_control (bool): 

270 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled, 

271 though client-side flow control is still enabled. If set to ``False`` 

272 (default), both server-side and client-side flow control are enabled. 

273 await_callbacks_on_shutdown: 

274 If ``True``, after canceling the returned future, the latter's 

275 ``result()`` method will block until the background stream and its 

276 helper threads have been terminated, and all currently executing message 

277 callbacks are done processing. 

278 

279 If ``False`` (default), the returned future's ``result()`` method will 

280 not block after canceling the future. The method will instead return 

281 immediately after the background stream and its helper threads have been 

282 terminated, but some of the message callback threads might still be 

283 running at that point. 

284 

285 Returns: 

286 A future instance that can be used to manage the background stream. 

287 """ 

288 flow_control = types.FlowControl(*flow_control) 

289 

290 manager = streaming_pull_manager.StreamingPullManager( 

291 self, 

292 subscription, 

293 flow_control=flow_control, 

294 scheduler=scheduler, 

295 use_legacy_flow_control=use_legacy_flow_control, 

296 await_callbacks_on_shutdown=await_callbacks_on_shutdown, 

297 ) 

298 

299 future = futures.StreamingPullFuture(manager) 

300 

301 manager.open(callback=callback, on_callback_error=future.set_exception) 

302 

303 return future 

304 

305 def close(self) -> None: 

306 """Close the underlying channel to release socket resources. 

307 

308 After a channel has been closed, the client instance cannot be used 

309 anymore. 

310 

311 This method is idempotent. 

312 """ 

313 transport = cast("SubscriberGrpcTransport", self._transport) 

314 transport.grpc_channel.close() 

315 self._closed = True 

316 

317 def __enter__(self) -> "Client": 

318 if self._closed: 

319 raise RuntimeError("Closed subscriber cannot be used as context manager.") 

320 return self 

321 

322 def __exit__(self, exc_type, exc_val, exc_tb): 

323 self.close()