Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/client.py: 52%

54 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2019, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import os 

18import typing 

19from typing import cast, Any, Callable, Optional, Sequence, Union 

20import warnings 

21 

22from google.auth.credentials import AnonymousCredentials # type: ignore 

23from google.oauth2 import service_account # type: ignore 

24 

25from google.cloud.pubsub_v1 import types 

26from google.cloud.pubsub_v1.subscriber import futures 

27from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager 

28from google.pubsub_v1.services.subscriber import client as subscriber_client 

29from google.pubsub_v1 import gapic_version as package_version 

30 

31if typing.TYPE_CHECKING: # pragma: NO COVER 

32 from google.cloud.pubsub_v1 import subscriber 

33 from google.pubsub_v1.services.subscriber.transports.grpc import ( 

34 SubscriberGrpcTransport, 

35 ) 

36 

37__version__ = package_version.__version__ 

38 

39 

40class Client(subscriber_client.SubscriberClient): 

41 """A subscriber client for Google Cloud Pub/Sub. 

42 

43 This creates an object that is capable of subscribing to messages. 

44 Generally, you can instantiate this client with no arguments, and you 

45 get sensible defaults. 

46 

47 Args: 

48 kwargs: Any additional arguments provided are sent as keyword 

49 keyword arguments to the underlying 

50 :class:`~google.cloud.pubsub_v1.gapic.subscriber_client.SubscriberClient`. 

51 Generally you should not need to set additional keyword 

52 arguments. Optionally, regional endpoints can be set via 

53 ``client_options`` that takes a single key-value pair that 

54 defines the endpoint. 

55 

56 Example: 

57 

58 .. code-block:: python 

59 

60 from google.cloud import pubsub_v1 

61 

62 subscriber_client = pubsub_v1.SubscriberClient( 

63 # Optional 

64 client_options = { 

65 "api_endpoint": REGIONAL_ENDPOINT 

66 } 

67 ) 

68 """ 

69 

70 def __init__(self, **kwargs: Any): 

71 # Sanity check: Is our goal to use the emulator? 

72 # If so, create a grpc insecure channel with the emulator host 

73 # as the target. 

74 if os.environ.get("PUBSUB_EMULATOR_HOST"): 

75 kwargs["client_options"] = { 

76 "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST") 

77 } 

78 kwargs["credentials"] = AnonymousCredentials() 

79 

80 # Instantiate the underlying GAPIC client. 

81 super().__init__(**kwargs) 

82 self._target = self._transport._host 

83 self._closed = False 

84 

85 @classmethod 

86 def from_service_account_file( # type: ignore[override] 

87 cls, filename: str, **kwargs: Any 

88 ) -> "Client": 

89 """Creates an instance of this client using the provided credentials 

90 file. 

91 

92 Args: 

93 filename: The path to the service account private key json file. 

94 kwargs: Additional arguments to pass to the constructor. 

95 

96 Returns: 

97 A Subscriber :class:`~google.cloud.pubsub_v1.subscriber.client.Client` 

98 instance that is the constructed client. 

99 """ 

100 credentials = service_account.Credentials.from_service_account_file(filename) 

101 kwargs["credentials"] = credentials 

102 return cls(**kwargs) 

103 

104 from_service_account_json = from_service_account_file # type: ignore[assignment] 

105 

106 @property 

107 def target(self) -> str: 

108 """Return the target (where the API is). 

109 

110 Returns: 

111 The location of the API. 

112 """ 

113 return self._target 

114 

115 @property 

116 def closed(self) -> bool: 

117 """Return whether the client has been closed and cannot be used anymore. 

118 

119 .. versionadded:: 2.8.0 

120 """ 

121 return self._closed 

122 

123 @property 

124 def api(self): 

125 """The underlying gapic API client. 

126 

127 .. versionchanged:: 2.10.0 

128 Instead of a GAPIC ``SubscriberClient`` client instance, this property is a 

129 proxy object to it with the same interface. 

130 

131 .. deprecated:: 2.10.0 

132 Use the GAPIC methods and properties on the client instance directly 

133 instead of through the :attr:`api` attribute. 

134 """ 

135 msg = ( 

136 'The "api" property only exists for backward compatibility, access its ' 

137 'attributes directly thorugh the client instance (e.g. "client.foo" ' 

138 'instead of "client.api.foo").' 

139 ) 

140 warnings.warn(msg, category=DeprecationWarning) 

141 return super() 

142 

143 def subscribe( 

144 self, 

145 subscription: str, 

146 callback: Callable[["subscriber.message.Message"], Any], 

147 flow_control: Union[types.FlowControl, Sequence] = (), 

148 scheduler: Optional["subscriber.scheduler.ThreadScheduler"] = None, 

149 use_legacy_flow_control: bool = False, 

150 await_callbacks_on_shutdown: bool = False, 

151 ) -> futures.StreamingPullFuture: 

152 """Asynchronously start receiving messages on a given subscription. 

153 

154 This method starts a background thread to begin pulling messages from 

155 a Pub/Sub subscription and scheduling them to be processed using the 

156 provided ``callback``. 

157 

158 The ``callback`` will be called with an individual 

159 :class:`google.cloud.pubsub_v1.subscriber.message.Message`. It is the 

160 responsibility of the callback to either call ``ack()`` or ``nack()`` 

161 on the message when it finished processing. If an exception occurs in 

162 the callback during processing, the exception is logged and the message 

163 is ``nack()`` ed. 

164 

165 The ``flow_control`` argument can be used to control the rate of at 

166 which messages are pulled. The settings are relatively conservative by 

167 default to prevent "message hoarding" - a situation where the client 

168 pulls a large number of messages but can not process them fast enough 

169 leading it to "starve" other clients of messages. Increasing these 

170 settings may lead to faster throughput for messages that do not take 

171 a long time to process. 

172 

173 The ``use_legacy_flow_control`` argument disables enforcing flow control 

174 settings at the Cloud Pub/Sub server, and only the client side flow control 

175 will be enforced. 

176 

177 This method starts the receiver in the background and returns a 

178 *Future* representing its execution. Waiting on the future (calling 

179 ``result()``) will block forever or until a non-recoverable error 

180 is encountered (such as loss of network connectivity). Cancelling the 

181 future will signal the process to shutdown gracefully and exit. 

182 

183 .. note:: This uses Pub/Sub's *streaming pull* feature. This feature 

184 properties that may be surprising. Please take a look at 

185 https://cloud.google.com/pubsub/docs/pull#streamingpull for 

186 more details on how streaming pull behaves compared to the 

187 synchronous pull method. 

188 

189 Example: 

190 

191 .. code-block:: python 

192 

193 from google.cloud import pubsub_v1 

194 

195 subscriber_client = pubsub_v1.SubscriberClient() 

196 

197 # existing subscription 

198 subscription = subscriber_client.subscription_path( 

199 'my-project-id', 'my-subscription') 

200 

201 def callback(message): 

202 print(message) 

203 message.ack() 

204 

205 future = subscriber_client.subscribe( 

206 subscription, callback) 

207 

208 try: 

209 future.result() 

210 except KeyboardInterrupt: 

211 future.cancel() # Trigger the shutdown. 

212 future.result() # Block until the shutdown is complete. 

213 

214 Args: 

215 subscription: 

216 The name of the subscription. The subscription should have already been 

217 created (for example, by using :meth:`create_subscription`). 

218 callback: 

219 The callback function. This function receives the message as 

220 its only argument and will be called from a different thread/ 

221 process depending on the scheduling strategy. 

222 flow_control: 

223 The flow control settings. Use this to prevent situations where you are 

224 inundated with too many messages at once. 

225 scheduler: 

226 An optional *scheduler* to use when executing the callback. This 

227 controls how callbacks are executed concurrently. This object must not 

228 be shared across multiple ``SubscriberClient`` instances. 

229 use_legacy_flow_control (bool): 

230 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled, 

231 though client-side flow control is still enabled. If set to ``False`` 

232 (default), both server-side and client-side flow control are enabled. 

233 await_callbacks_on_shutdown: 

234 If ``True``, after canceling the returned future, the latter's 

235 ``result()`` method will block until the background stream and its 

236 helper threads have been terminated, and all currently executing message 

237 callbacks are done processing. 

238 

239 If ``False`` (default), the returned future's ``result()`` method will 

240 not block after canceling the future. The method will instead return 

241 immediately after the background stream and its helper threads have been 

242 terminated, but some of the message callback threads might still be 

243 running at that point. 

244 

245 Returns: 

246 A future instance that can be used to manage the background stream. 

247 """ 

248 flow_control = types.FlowControl(*flow_control) 

249 

250 manager = streaming_pull_manager.StreamingPullManager( 

251 self, 

252 subscription, 

253 flow_control=flow_control, 

254 scheduler=scheduler, 

255 use_legacy_flow_control=use_legacy_flow_control, 

256 await_callbacks_on_shutdown=await_callbacks_on_shutdown, 

257 ) 

258 

259 future = futures.StreamingPullFuture(manager) 

260 

261 manager.open(callback=callback, on_callback_error=future.set_exception) 

262 

263 return future 

264 

265 def close(self) -> None: 

266 """Close the underlying channel to release socket resources. 

267 

268 After a channel has been closed, the client instance cannot be used 

269 anymore. 

270 

271 This method is idempotent. 

272 """ 

273 transport = cast("SubscriberGrpcTransport", self._transport) 

274 transport.grpc_channel.close() 

275 self._closed = True 

276 

277 def __enter__(self) -> "Client": 

278 if self._closed: 

279 raise RuntimeError("Closed subscriber cannot be used as context manager.") 

280 return self 

281 

282 def __exit__(self, exc_type, exc_val, exc_tb): 

283 self.close()