Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_simple_stubs.py: 39%
119 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
1# Copyright 2020 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Functions that obviate explicit stubs and explicit channels."""
16import collections
17import datetime
18import logging
19import os
20import threading
21from typing import (
22 Any,
23 AnyStr,
24 Callable,
25 Dict,
26 Iterator,
27 Optional,
28 Sequence,
29 Tuple,
30 TypeVar,
31 Union,
32)
34import grpc
35from grpc.experimental import experimental_api
37RequestType = TypeVar("RequestType")
38ResponseType = TypeVar("ResponseType")
40OptionsType = Sequence[Tuple[str, str]]
41CacheKey = Tuple[
42 str,
43 OptionsType,
44 Optional[grpc.ChannelCredentials],
45 Optional[grpc.Compression],
46]
48_LOGGER = logging.getLogger(__name__)
50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
51if _EVICTION_PERIOD_KEY in os.environ:
52 _EVICTION_PERIOD = datetime.timedelta(
53 seconds=float(os.environ[_EVICTION_PERIOD_KEY])
54 )
55 _LOGGER.debug(
56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD
57 )
58else:
59 _EVICTION_PERIOD = datetime.timedelta(minutes=10)
61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
62if _MAXIMUM_CHANNELS_KEY in os.environ:
63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
65else:
66 _MAXIMUM_CHANNELS = 2**8
68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
69if _DEFAULT_TIMEOUT_KEY in os.environ:
70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
72else:
73 _DEFAULT_TIMEOUT = 60.0
76def _create_channel(
77 target: str,
78 options: Sequence[Tuple[str, str]],
79 channel_credentials: Optional[grpc.ChannelCredentials],
80 compression: Optional[grpc.Compression],
81) -> grpc.Channel:
82 _LOGGER.debug(
83 f"Creating secure channel with credentials '{channel_credentials}', "
84 + f"options '{options}' and compression '{compression}'"
85 )
86 return grpc.secure_channel(
87 target,
88 credentials=channel_credentials,
89 options=options,
90 compression=compression,
91 )
94class ChannelCache:
95 # NOTE(rbellevi): Untyped due to reference cycle.
96 _singleton = None
97 _lock: threading.RLock = threading.RLock()
98 _condition: threading.Condition = threading.Condition(lock=_lock)
99 _eviction_ready: threading.Event = threading.Event()
101 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
102 _eviction_thread: threading.Thread
104 def __init__(self):
105 self._mapping = collections.OrderedDict()
106 self._eviction_thread = threading.Thread(
107 target=ChannelCache._perform_evictions, daemon=True
108 )
109 self._eviction_thread.start()
111 @staticmethod
112 def get():
113 with ChannelCache._lock:
114 if ChannelCache._singleton is None:
115 ChannelCache._singleton = ChannelCache()
116 ChannelCache._eviction_ready.wait()
117 return ChannelCache._singleton
119 def _evict_locked(self, key: CacheKey):
120 channel, _ = self._mapping.pop(key)
121 _LOGGER.debug(
122 "Evicting channel %s with configuration %s.", channel, key
123 )
124 channel.close()
125 del channel
127 @staticmethod
128 def _perform_evictions():
129 while True:
130 with ChannelCache._lock:
131 ChannelCache._eviction_ready.set()
132 if not ChannelCache._singleton._mapping:
133 ChannelCache._condition.wait()
134 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
135 key = next(iter(ChannelCache._singleton._mapping.keys()))
136 ChannelCache._singleton._evict_locked(key)
137 # And immediately reevaluate.
138 else:
139 key, (_, eviction_time) = next(
140 iter(ChannelCache._singleton._mapping.items())
141 )
142 now = datetime.datetime.now()
143 if eviction_time <= now:
144 ChannelCache._singleton._evict_locked(key)
145 continue
146 else:
147 time_to_eviction = (eviction_time - now).total_seconds()
148 # NOTE: We aim to *eventually* coalesce to a state in
149 # which no overdue channels are in the cache and the
150 # length of the cache is longer than _MAXIMUM_CHANNELS.
151 # We tolerate momentary states in which these two
152 # criteria are not met.
153 ChannelCache._condition.wait(timeout=time_to_eviction)
155 def get_channel(
156 self,
157 target: str,
158 options: Sequence[Tuple[str, str]],
159 channel_credentials: Optional[grpc.ChannelCredentials],
160 insecure: bool,
161 compression: Optional[grpc.Compression],
162 ) -> grpc.Channel:
163 if insecure and channel_credentials:
164 raise ValueError(
165 "The insecure option is mutually exclusive with "
166 + "the channel_credentials option. Please use one "
167 + "or the other."
168 )
169 if insecure:
170 channel_credentials = (
171 grpc.experimental.insecure_channel_credentials()
172 )
173 elif channel_credentials is None:
174 _LOGGER.debug("Defaulting to SSL channel credentials.")
175 channel_credentials = grpc.ssl_channel_credentials()
176 key = (target, options, channel_credentials, compression)
177 with self._lock:
178 channel_data = self._mapping.get(key, None)
179 if channel_data is not None:
180 channel = channel_data[0]
181 self._mapping.pop(key)
182 self._mapping[key] = (
183 channel,
184 datetime.datetime.now() + _EVICTION_PERIOD,
185 )
186 return channel
187 else:
188 channel = _create_channel(
189 target, options, channel_credentials, compression
190 )
191 self._mapping[key] = (
192 channel,
193 datetime.datetime.now() + _EVICTION_PERIOD,
194 )
195 if (
196 len(self._mapping) == 1
197 or len(self._mapping) >= _MAXIMUM_CHANNELS
198 ):
199 self._condition.notify()
200 return channel
202 def _test_only_channel_count(self) -> int:
203 with self._lock:
204 return len(self._mapping)
207@experimental_api
208def unary_unary(
209 request: RequestType,
210 target: str,
211 method: str,
212 request_serializer: Optional[Callable[[Any], bytes]] = None,
213 response_deserializer: Optional[Callable[[bytes], Any]] = None,
214 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
215 channel_credentials: Optional[grpc.ChannelCredentials] = None,
216 insecure: bool = False,
217 call_credentials: Optional[grpc.CallCredentials] = None,
218 compression: Optional[grpc.Compression] = None,
219 wait_for_ready: Optional[bool] = None,
220 timeout: Optional[float] = _DEFAULT_TIMEOUT,
221 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
222) -> ResponseType:
223 """Invokes a unary-unary RPC without an explicitly specified channel.
225 THIS IS AN EXPERIMENTAL API.
227 This is backed by a per-process cache of channels. Channels are evicted
228 from the cache after a fixed period by a background. Channels will also be
229 evicted if more than a configured maximum accumulate.
231 The default eviction period is 10 minutes. One may set the environment
232 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
234 The default maximum number of channels is 256. One may set the
235 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
236 this.
238 Args:
239 request: An iterator that yields request values for the RPC.
240 target: The server address.
241 method: The name of the RPC method.
242 request_serializer: Optional :term:`serializer` for serializing the request
243 message. Request goes unserialized in case None is passed.
244 response_deserializer: Optional :term:`deserializer` for deserializing the response
245 message. Response goes undeserialized in case None is passed.
246 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
247 runtime) to configure the channel.
248 channel_credentials: A credential applied to the whole channel, e.g. the
249 return value of grpc.ssl_channel_credentials() or
250 grpc.insecure_channel_credentials().
251 insecure: If True, specifies channel_credentials as
252 :term:`grpc.insecure_channel_credentials()`. This option is mutually
253 exclusive with the `channel_credentials` option.
254 call_credentials: A call credential applied to each call individually,
255 e.g. the output of grpc.metadata_call_credentials() or
256 grpc.access_token_call_credentials().
257 compression: An optional value indicating the compression method to be
258 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
259 wait_for_ready: An optional flag indicating whether the RPC should fail
260 immediately if the connection is not ready at the time the RPC is
261 invoked, or if it should wait until the connection to the server
262 becomes ready. When using this option, the user will likely also want
263 to set a timeout. Defaults to True.
264 timeout: An optional duration of time in seconds to allow for the RPC,
265 after which an exception will be raised. If timeout is unspecified,
266 defaults to a timeout controlled by the
267 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
268 unset, defaults to 60 seconds. Supply a value of None to indicate that
269 no timeout should be enforced.
270 metadata: Optional metadata to send to the server.
272 Returns:
273 The response to the RPC.
274 """
275 channel = ChannelCache.get().get_channel(
276 target, options, channel_credentials, insecure, compression
277 )
278 multicallable = channel.unary_unary(
279 method, request_serializer, response_deserializer
280 )
281 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
282 return multicallable(
283 request,
284 metadata=metadata,
285 wait_for_ready=wait_for_ready,
286 credentials=call_credentials,
287 timeout=timeout,
288 )
291@experimental_api
292def unary_stream(
293 request: RequestType,
294 target: str,
295 method: str,
296 request_serializer: Optional[Callable[[Any], bytes]] = None,
297 response_deserializer: Optional[Callable[[bytes], Any]] = None,
298 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
299 channel_credentials: Optional[grpc.ChannelCredentials] = None,
300 insecure: bool = False,
301 call_credentials: Optional[grpc.CallCredentials] = None,
302 compression: Optional[grpc.Compression] = None,
303 wait_for_ready: Optional[bool] = None,
304 timeout: Optional[float] = _DEFAULT_TIMEOUT,
305 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
306) -> Iterator[ResponseType]:
307 """Invokes a unary-stream RPC without an explicitly specified channel.
309 THIS IS AN EXPERIMENTAL API.
311 This is backed by a per-process cache of channels. Channels are evicted
312 from the cache after a fixed period by a background. Channels will also be
313 evicted if more than a configured maximum accumulate.
315 The default eviction period is 10 minutes. One may set the environment
316 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
318 The default maximum number of channels is 256. One may set the
319 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
320 this.
322 Args:
323 request: An iterator that yields request values for the RPC.
324 target: The server address.
325 method: The name of the RPC method.
326 request_serializer: Optional :term:`serializer` for serializing the request
327 message. Request goes unserialized in case None is passed.
328 response_deserializer: Optional :term:`deserializer` for deserializing the response
329 message. Response goes undeserialized in case None is passed.
330 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
331 runtime) to configure the channel.
332 channel_credentials: A credential applied to the whole channel, e.g. the
333 return value of grpc.ssl_channel_credentials().
334 insecure: If True, specifies channel_credentials as
335 :term:`grpc.insecure_channel_credentials()`. This option is mutually
336 exclusive with the `channel_credentials` option.
337 call_credentials: A call credential applied to each call individually,
338 e.g. the output of grpc.metadata_call_credentials() or
339 grpc.access_token_call_credentials().
340 compression: An optional value indicating the compression method to be
341 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
342 wait_for_ready: An optional flag indicating whether the RPC should fail
343 immediately if the connection is not ready at the time the RPC is
344 invoked, or if it should wait until the connection to the server
345 becomes ready. When using this option, the user will likely also want
346 to set a timeout. Defaults to True.
347 timeout: An optional duration of time in seconds to allow for the RPC,
348 after which an exception will be raised. If timeout is unspecified,
349 defaults to a timeout controlled by the
350 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
351 unset, defaults to 60 seconds. Supply a value of None to indicate that
352 no timeout should be enforced.
353 metadata: Optional metadata to send to the server.
355 Returns:
356 An iterator of responses.
357 """
358 channel = ChannelCache.get().get_channel(
359 target, options, channel_credentials, insecure, compression
360 )
361 multicallable = channel.unary_stream(
362 method, request_serializer, response_deserializer
363 )
364 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
365 return multicallable(
366 request,
367 metadata=metadata,
368 wait_for_ready=wait_for_ready,
369 credentials=call_credentials,
370 timeout=timeout,
371 )
374@experimental_api
375def stream_unary(
376 request_iterator: Iterator[RequestType],
377 target: str,
378 method: str,
379 request_serializer: Optional[Callable[[Any], bytes]] = None,
380 response_deserializer: Optional[Callable[[bytes], Any]] = None,
381 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
382 channel_credentials: Optional[grpc.ChannelCredentials] = None,
383 insecure: bool = False,
384 call_credentials: Optional[grpc.CallCredentials] = None,
385 compression: Optional[grpc.Compression] = None,
386 wait_for_ready: Optional[bool] = None,
387 timeout: Optional[float] = _DEFAULT_TIMEOUT,
388 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
389) -> ResponseType:
390 """Invokes a stream-unary RPC without an explicitly specified channel.
392 THIS IS AN EXPERIMENTAL API.
394 This is backed by a per-process cache of channels. Channels are evicted
395 from the cache after a fixed period by a background. Channels will also be
396 evicted if more than a configured maximum accumulate.
398 The default eviction period is 10 minutes. One may set the environment
399 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
401 The default maximum number of channels is 256. One may set the
402 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
403 this.
405 Args:
406 request_iterator: An iterator that yields request values for the RPC.
407 target: The server address.
408 method: The name of the RPC method.
409 request_serializer: Optional :term:`serializer` for serializing the request
410 message. Request goes unserialized in case None is passed.
411 response_deserializer: Optional :term:`deserializer` for deserializing the response
412 message. Response goes undeserialized in case None is passed.
413 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
414 runtime) to configure the channel.
415 channel_credentials: A credential applied to the whole channel, e.g. the
416 return value of grpc.ssl_channel_credentials().
417 call_credentials: A call credential applied to each call individually,
418 e.g. the output of grpc.metadata_call_credentials() or
419 grpc.access_token_call_credentials().
420 insecure: If True, specifies channel_credentials as
421 :term:`grpc.insecure_channel_credentials()`. This option is mutually
422 exclusive with the `channel_credentials` option.
423 compression: An optional value indicating the compression method to be
424 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
425 wait_for_ready: An optional flag indicating whether the RPC should fail
426 immediately if the connection is not ready at the time the RPC is
427 invoked, or if it should wait until the connection to the server
428 becomes ready. When using this option, the user will likely also want
429 to set a timeout. Defaults to True.
430 timeout: An optional duration of time in seconds to allow for the RPC,
431 after which an exception will be raised. If timeout is unspecified,
432 defaults to a timeout controlled by the
433 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
434 unset, defaults to 60 seconds. Supply a value of None to indicate that
435 no timeout should be enforced.
436 metadata: Optional metadata to send to the server.
438 Returns:
439 The response to the RPC.
440 """
441 channel = ChannelCache.get().get_channel(
442 target, options, channel_credentials, insecure, compression
443 )
444 multicallable = channel.stream_unary(
445 method, request_serializer, response_deserializer
446 )
447 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
448 return multicallable(
449 request_iterator,
450 metadata=metadata,
451 wait_for_ready=wait_for_ready,
452 credentials=call_credentials,
453 timeout=timeout,
454 )
457@experimental_api
458def stream_stream(
459 request_iterator: Iterator[RequestType],
460 target: str,
461 method: str,
462 request_serializer: Optional[Callable[[Any], bytes]] = None,
463 response_deserializer: Optional[Callable[[bytes], Any]] = None,
464 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
465 channel_credentials: Optional[grpc.ChannelCredentials] = None,
466 insecure: bool = False,
467 call_credentials: Optional[grpc.CallCredentials] = None,
468 compression: Optional[grpc.Compression] = None,
469 wait_for_ready: Optional[bool] = None,
470 timeout: Optional[float] = _DEFAULT_TIMEOUT,
471 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
472) -> Iterator[ResponseType]:
473 """Invokes a stream-stream RPC without an explicitly specified channel.
475 THIS IS AN EXPERIMENTAL API.
477 This is backed by a per-process cache of channels. Channels are evicted
478 from the cache after a fixed period by a background. Channels will also be
479 evicted if more than a configured maximum accumulate.
481 The default eviction period is 10 minutes. One may set the environment
482 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
484 The default maximum number of channels is 256. One may set the
485 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
486 this.
488 Args:
489 request_iterator: An iterator that yields request values for the RPC.
490 target: The server address.
491 method: The name of the RPC method.
492 request_serializer: Optional :term:`serializer` for serializing the request
493 message. Request goes unserialized in case None is passed.
494 response_deserializer: Optional :term:`deserializer` for deserializing the response
495 message. Response goes undeserialized in case None is passed.
496 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
497 runtime) to configure the channel.
498 channel_credentials: A credential applied to the whole channel, e.g. the
499 return value of grpc.ssl_channel_credentials().
500 call_credentials: A call credential applied to each call individually,
501 e.g. the output of grpc.metadata_call_credentials() or
502 grpc.access_token_call_credentials().
503 insecure: If True, specifies channel_credentials as
504 :term:`grpc.insecure_channel_credentials()`. This option is mutually
505 exclusive with the `channel_credentials` option.
506 compression: An optional value indicating the compression method to be
507 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
508 wait_for_ready: An optional flag indicating whether the RPC should fail
509 immediately if the connection is not ready at the time the RPC is
510 invoked, or if it should wait until the connection to the server
511 becomes ready. When using this option, the user will likely also want
512 to set a timeout. Defaults to True.
513 timeout: An optional duration of time in seconds to allow for the RPC,
514 after which an exception will be raised. If timeout is unspecified,
515 defaults to a timeout controlled by the
516 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
517 unset, defaults to 60 seconds. Supply a value of None to indicate that
518 no timeout should be enforced.
519 metadata: Optional metadata to send to the server.
521 Returns:
522 An iterator of responses.
523 """
524 channel = ChannelCache.get().get_channel(
525 target, options, channel_credentials, insecure, compression
526 )
527 multicallable = channel.stream_stream(
528 method, request_serializer, response_deserializer
529 )
530 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
531 return multicallable(
532 request_iterator,
533 metadata=metadata,
534 wait_for_ready=wait_for_ready,
535 credentials=call_credentials,
536 timeout=timeout,
537 )