Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_simple_stubs.py: 37%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2020 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Functions that obviate explicit stubs and explicit channels."""
16import collections
17import datetime
18import logging
19import os
20import threading
21from typing import (
22 Any,
23 AnyStr,
24 Callable,
25 Dict,
26 Iterator,
27 Optional,
28 Sequence,
29 Tuple,
30 TypeVar,
31 Union,
32)
34import grpc
35from grpc.experimental import experimental_api
37RequestType = TypeVar("RequestType")
38ResponseType = TypeVar("ResponseType")
40OptionsType = Sequence[Tuple[str, str]]
41CacheKey = Tuple[
42 str,
43 OptionsType,
44 Optional[grpc.ChannelCredentials],
45 Optional[grpc.Compression],
46]
48_LOGGER = logging.getLogger(__name__)
50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
51if _EVICTION_PERIOD_KEY in os.environ:
52 _EVICTION_PERIOD = datetime.timedelta(
53 seconds=float(os.environ[_EVICTION_PERIOD_KEY])
54 )
55 _LOGGER.debug(
56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD
57 )
58else:
59 _EVICTION_PERIOD = datetime.timedelta(minutes=10)
61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
62if _MAXIMUM_CHANNELS_KEY in os.environ:
63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
65else:
66 _MAXIMUM_CHANNELS = 2**8
68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
69if _DEFAULT_TIMEOUT_KEY in os.environ:
70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
72else:
73 _DEFAULT_TIMEOUT = 60.0
76def _create_channel(
77 target: str,
78 options: Sequence[Tuple[str, str]],
79 channel_credentials: Optional[grpc.ChannelCredentials],
80 compression: Optional[grpc.Compression],
81) -> grpc.Channel:
82 debug_msg = (
83 f"Creating secure channel with credentials '{channel_credentials}', "
84 f"options '{options}' and compression '{compression}'"
85 )
86 _LOGGER.debug(debug_msg)
87 return grpc.secure_channel(
88 target,
89 credentials=channel_credentials,
90 options=options,
91 compression=compression,
92 )
95class ChannelCache:
96 # NOTE(rbellevi): Untyped due to reference cycle.
97 _singleton = None
98 _lock: threading.RLock = threading.RLock()
99 _condition: threading.Condition = threading.Condition(lock=_lock)
100 _eviction_ready: threading.Event = threading.Event()
102 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
103 _eviction_thread: threading.Thread
105 def __init__(self):
106 self._mapping = collections.OrderedDict()
107 self._eviction_thread = threading.Thread(
108 target=ChannelCache._perform_evictions, daemon=True
109 )
110 self._eviction_thread.start()
112 @staticmethod
113 def get():
114 with ChannelCache._lock:
115 if ChannelCache._singleton is None:
116 ChannelCache._singleton = ChannelCache()
117 ChannelCache._eviction_ready.wait()
118 return ChannelCache._singleton
120 def _evict_locked(self, key: CacheKey):
121 channel, _ = self._mapping.pop(key)
122 _LOGGER.debug(
123 "Evicting channel %s with configuration %s.", channel, key
124 )
125 channel.close()
126 del channel
128 @staticmethod
129 def _perform_evictions():
130 while True:
131 with ChannelCache._lock:
132 ChannelCache._eviction_ready.set()
133 if not ChannelCache._singleton._mapping:
134 ChannelCache._condition.wait()
135 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
136 key = next(iter(ChannelCache._singleton._mapping.keys()))
137 ChannelCache._singleton._evict_locked(key)
138 # And immediately reevaluate.
139 else:
140 key, (_, eviction_time) = next(
141 iter(ChannelCache._singleton._mapping.items())
142 )
143 now = datetime.datetime.now()
144 if eviction_time <= now:
145 ChannelCache._singleton._evict_locked(key)
146 continue
147 time_to_eviction = (eviction_time - now).total_seconds()
148 # NOTE: We aim to *eventually* coalesce to a state in
149 # which no overdue channels are in the cache and the
150 # length of the cache is longer than _MAXIMUM_CHANNELS.
151 # We tolerate momentary states in which these two
152 # criteria are not met.
153 ChannelCache._condition.wait(timeout=time_to_eviction)
155 def get_channel(
156 self,
157 target: str,
158 options: Sequence[Tuple[str, str]],
159 channel_credentials: Optional[grpc.ChannelCredentials],
160 insecure: bool,
161 compression: Optional[grpc.Compression],
162 method: str,
163 _registered_method: bool,
164 ) -> Tuple[grpc.Channel, Optional[int]]:
165 """Get a channel from cache or creates a new channel.
167 This method also takes care of register method for channel,
168 which means we'll register a new call handle if we're calling a
169 non-registered method for an existing channel.
171 Returns:
172 A tuple with two items. The first item is the channel, second item is
173 the call handle if the method is registered, None if it's not registered.
174 """
175 if insecure and channel_credentials:
176 raise ValueError(
177 "The insecure option is mutually exclusive with "
178 + "the channel_credentials option. Please use one "
179 + "or the other."
180 )
181 if insecure:
182 channel_credentials = (
183 grpc.experimental.insecure_channel_credentials()
184 )
185 elif channel_credentials is None:
186 _LOGGER.debug("Defaulting to SSL channel credentials.")
187 channel_credentials = grpc.ssl_channel_credentials()
188 key = (target, options, channel_credentials, compression)
189 with self._lock:
190 channel_data = self._mapping.get(key, None)
191 call_handle = None
192 if channel_data is not None:
193 channel = channel_data[0]
194 # Register a new call handle if we're calling a registered method for an
195 # existing channel and this method is not registered.
196 if _registered_method:
197 call_handle = channel._get_registered_call_handle(method)
198 self._mapping.pop(key)
199 self._mapping[key] = (
200 channel,
201 datetime.datetime.now() + _EVICTION_PERIOD,
202 )
203 return channel, call_handle
204 channel = _create_channel(
205 target, options, channel_credentials, compression
206 )
207 if _registered_method:
208 call_handle = channel._get_registered_call_handle(method)
209 self._mapping[key] = (
210 channel,
211 datetime.datetime.now() + _EVICTION_PERIOD,
212 )
213 if (
214 len(self._mapping) == 1
215 or len(self._mapping) >= _MAXIMUM_CHANNELS
216 ):
217 self._condition.notify()
218 return channel, call_handle
220 def _test_only_channel_count(self) -> int:
221 with self._lock:
222 return len(self._mapping)
225@experimental_api
226# pylint: disable=too-many-locals
227def unary_unary(
228 request: RequestType,
229 target: str,
230 method: str,
231 request_serializer: Optional[Callable[[Any], bytes]] = None,
232 response_deserializer: Optional[Callable[[bytes], Any]] = None,
233 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
234 channel_credentials: Optional[grpc.ChannelCredentials] = None,
235 insecure: bool = False,
236 call_credentials: Optional[grpc.CallCredentials] = None,
237 compression: Optional[grpc.Compression] = None,
238 wait_for_ready: Optional[bool] = None,
239 timeout: Optional[float] = _DEFAULT_TIMEOUT,
240 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
241 _registered_method: Optional[bool] = False,
242) -> ResponseType:
243 """Invokes a unary-unary RPC without an explicitly specified channel.
245 THIS IS AN EXPERIMENTAL API.
247 This is backed by a per-process cache of channels. Channels are evicted
248 from the cache after a fixed period by a background. Channels will also be
249 evicted if more than a configured maximum accumulate.
251 The default eviction period is 10 minutes. One may set the environment
252 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
254 The default maximum number of channels is 256. One may set the
255 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
256 this.
258 Args:
259 request: An iterator that yields request values for the RPC.
260 target: The server address.
261 method: The name of the RPC method.
262 request_serializer: Optional :term:`serializer` for serializing the request
263 message. Request goes unserialized in case None is passed.
264 response_deserializer: Optional :term:`deserializer` for deserializing the response
265 message. Response goes undeserialized in case None is passed.
266 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
267 runtime) to configure the channel.
268 channel_credentials: A credential applied to the whole channel, e.g. the
269 return value of grpc.ssl_channel_credentials() or
270 grpc.insecure_channel_credentials().
271 insecure: If True, specifies channel_credentials as
272 :term:`grpc.insecure_channel_credentials()`. This option is mutually
273 exclusive with the `channel_credentials` option.
274 call_credentials: A call credential applied to each call individually,
275 e.g. the output of grpc.metadata_call_credentials() or
276 grpc.access_token_call_credentials().
277 compression: An optional value indicating the compression method to be
278 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
279 wait_for_ready: An optional flag indicating whether the RPC should fail
280 immediately if the connection is not ready at the time the RPC is
281 invoked, or if it should wait until the connection to the server
282 becomes ready. When using this option, the user will likely also want
283 to set a timeout. Defaults to True.
284 timeout: An optional duration of time in seconds to allow for the RPC,
285 after which an exception will be raised. If timeout is unspecified,
286 defaults to a timeout controlled by the
287 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
288 unset, defaults to 60 seconds. Supply a value of None to indicate that
289 no timeout should be enforced.
290 metadata: Optional metadata to send to the server.
292 Returns:
293 The response to the RPC.
294 """
295 channel, method_handle = ChannelCache.get().get_channel(
296 target,
297 options,
298 channel_credentials,
299 insecure,
300 compression,
301 method,
302 _registered_method,
303 )
304 multicallable = channel.unary_unary(
305 method, request_serializer, response_deserializer, method_handle
306 )
307 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
308 return multicallable(
309 request,
310 metadata=metadata,
311 wait_for_ready=wait_for_ready,
312 credentials=call_credentials,
313 timeout=timeout,
314 )
317@experimental_api
318# pylint: disable=too-many-locals
319def unary_stream(
320 request: RequestType,
321 target: str,
322 method: str,
323 request_serializer: Optional[Callable[[Any], bytes]] = None,
324 response_deserializer: Optional[Callable[[bytes], Any]] = None,
325 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
326 channel_credentials: Optional[grpc.ChannelCredentials] = None,
327 insecure: bool = False,
328 call_credentials: Optional[grpc.CallCredentials] = None,
329 compression: Optional[grpc.Compression] = None,
330 wait_for_ready: Optional[bool] = None,
331 timeout: Optional[float] = _DEFAULT_TIMEOUT,
332 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
333 _registered_method: Optional[bool] = False,
334) -> Iterator[ResponseType]:
335 """Invokes a unary-stream RPC without an explicitly specified channel.
337 THIS IS AN EXPERIMENTAL API.
339 This is backed by a per-process cache of channels. Channels are evicted
340 from the cache after a fixed period by a background. Channels will also be
341 evicted if more than a configured maximum accumulate.
343 The default eviction period is 10 minutes. One may set the environment
344 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
346 The default maximum number of channels is 256. One may set the
347 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
348 this.
350 Args:
351 request: An iterator that yields request values for the RPC.
352 target: The server address.
353 method: The name of the RPC method.
354 request_serializer: Optional :term:`serializer` for serializing the request
355 message. Request goes unserialized in case None is passed.
356 response_deserializer: Optional :term:`deserializer` for deserializing the response
357 message. Response goes undeserialized in case None is passed.
358 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
359 runtime) to configure the channel.
360 channel_credentials: A credential applied to the whole channel, e.g. the
361 return value of grpc.ssl_channel_credentials().
362 insecure: If True, specifies channel_credentials as
363 :term:`grpc.insecure_channel_credentials()`. This option is mutually
364 exclusive with the `channel_credentials` option.
365 call_credentials: A call credential applied to each call individually,
366 e.g. the output of grpc.metadata_call_credentials() or
367 grpc.access_token_call_credentials().
368 compression: An optional value indicating the compression method to be
369 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
370 wait_for_ready: An optional flag indicating whether the RPC should fail
371 immediately if the connection is not ready at the time the RPC is
372 invoked, or if it should wait until the connection to the server
373 becomes ready. When using this option, the user will likely also want
374 to set a timeout. Defaults to True.
375 timeout: An optional duration of time in seconds to allow for the RPC,
376 after which an exception will be raised. If timeout is unspecified,
377 defaults to a timeout controlled by the
378 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
379 unset, defaults to 60 seconds. Supply a value of None to indicate that
380 no timeout should be enforced.
381 metadata: Optional metadata to send to the server.
383 Returns:
384 An iterator of responses.
385 """
386 channel, method_handle = ChannelCache.get().get_channel(
387 target,
388 options,
389 channel_credentials,
390 insecure,
391 compression,
392 method,
393 _registered_method,
394 )
395 multicallable = channel.unary_stream(
396 method, request_serializer, response_deserializer, method_handle
397 )
398 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
399 return multicallable(
400 request,
401 metadata=metadata,
402 wait_for_ready=wait_for_ready,
403 credentials=call_credentials,
404 timeout=timeout,
405 )
408@experimental_api
409# pylint: disable=too-many-locals
410def stream_unary(
411 request_iterator: Iterator[RequestType],
412 target: str,
413 method: str,
414 request_serializer: Optional[Callable[[Any], bytes]] = None,
415 response_deserializer: Optional[Callable[[bytes], Any]] = None,
416 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
417 channel_credentials: Optional[grpc.ChannelCredentials] = None,
418 insecure: bool = False,
419 call_credentials: Optional[grpc.CallCredentials] = None,
420 compression: Optional[grpc.Compression] = None,
421 wait_for_ready: Optional[bool] = None,
422 timeout: Optional[float] = _DEFAULT_TIMEOUT,
423 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
424 _registered_method: Optional[bool] = False,
425) -> ResponseType:
426 """Invokes a stream-unary RPC without an explicitly specified channel.
428 THIS IS AN EXPERIMENTAL API.
430 This is backed by a per-process cache of channels. Channels are evicted
431 from the cache after a fixed period by a background. Channels will also be
432 evicted if more than a configured maximum accumulate.
434 The default eviction period is 10 minutes. One may set the environment
435 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
437 The default maximum number of channels is 256. One may set the
438 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
439 this.
441 Args:
442 request_iterator: An iterator that yields request values for the RPC.
443 target: The server address.
444 method: The name of the RPC method.
445 request_serializer: Optional :term:`serializer` for serializing the request
446 message. Request goes unserialized in case None is passed.
447 response_deserializer: Optional :term:`deserializer` for deserializing the response
448 message. Response goes undeserialized in case None is passed.
449 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
450 runtime) to configure the channel.
451 channel_credentials: A credential applied to the whole channel, e.g. the
452 return value of grpc.ssl_channel_credentials().
453 call_credentials: A call credential applied to each call individually,
454 e.g. the output of grpc.metadata_call_credentials() or
455 grpc.access_token_call_credentials().
456 insecure: If True, specifies channel_credentials as
457 :term:`grpc.insecure_channel_credentials()`. This option is mutually
458 exclusive with the `channel_credentials` option.
459 compression: An optional value indicating the compression method to be
460 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
461 wait_for_ready: An optional flag indicating whether the RPC should fail
462 immediately if the connection is not ready at the time the RPC is
463 invoked, or if it should wait until the connection to the server
464 becomes ready. When using this option, the user will likely also want
465 to set a timeout. Defaults to True.
466 timeout: An optional duration of time in seconds to allow for the RPC,
467 after which an exception will be raised. If timeout is unspecified,
468 defaults to a timeout controlled by the
469 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
470 unset, defaults to 60 seconds. Supply a value of None to indicate that
471 no timeout should be enforced.
472 metadata: Optional metadata to send to the server.
474 Returns:
475 The response to the RPC.
476 """
477 channel, method_handle = ChannelCache.get().get_channel(
478 target,
479 options,
480 channel_credentials,
481 insecure,
482 compression,
483 method,
484 _registered_method,
485 )
486 multicallable = channel.stream_unary(
487 method, request_serializer, response_deserializer, method_handle
488 )
489 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
490 return multicallable(
491 request_iterator,
492 metadata=metadata,
493 wait_for_ready=wait_for_ready,
494 credentials=call_credentials,
495 timeout=timeout,
496 )
499@experimental_api
500# pylint: disable=too-many-locals
501def stream_stream(
502 request_iterator: Iterator[RequestType],
503 target: str,
504 method: str,
505 request_serializer: Optional[Callable[[Any], bytes]] = None,
506 response_deserializer: Optional[Callable[[bytes], Any]] = None,
507 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
508 channel_credentials: Optional[grpc.ChannelCredentials] = None,
509 insecure: bool = False,
510 call_credentials: Optional[grpc.CallCredentials] = None,
511 compression: Optional[grpc.Compression] = None,
512 wait_for_ready: Optional[bool] = None,
513 timeout: Optional[float] = _DEFAULT_TIMEOUT,
514 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
515 _registered_method: Optional[bool] = False,
516) -> Iterator[ResponseType]:
517 """Invokes a stream-stream RPC without an explicitly specified channel.
519 THIS IS AN EXPERIMENTAL API.
521 This is backed by a per-process cache of channels. Channels are evicted
522 from the cache after a fixed period by a background. Channels will also be
523 evicted if more than a configured maximum accumulate.
525 The default eviction period is 10 minutes. One may set the environment
526 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
528 The default maximum number of channels is 256. One may set the
529 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
530 this.
532 Args:
533 request_iterator: An iterator that yields request values for the RPC.
534 target: The server address.
535 method: The name of the RPC method.
536 request_serializer: Optional :term:`serializer` for serializing the request
537 message. Request goes unserialized in case None is passed.
538 response_deserializer: Optional :term:`deserializer` for deserializing the response
539 message. Response goes undeserialized in case None is passed.
540 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
541 runtime) to configure the channel.
542 channel_credentials: A credential applied to the whole channel, e.g. the
543 return value of grpc.ssl_channel_credentials().
544 call_credentials: A call credential applied to each call individually,
545 e.g. the output of grpc.metadata_call_credentials() or
546 grpc.access_token_call_credentials().
547 insecure: If True, specifies channel_credentials as
548 :term:`grpc.insecure_channel_credentials()`. This option is mutually
549 exclusive with the `channel_credentials` option.
550 compression: An optional value indicating the compression method to be
551 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
552 wait_for_ready: An optional flag indicating whether the RPC should fail
553 immediately if the connection is not ready at the time the RPC is
554 invoked, or if it should wait until the connection to the server
555 becomes ready. When using this option, the user will likely also want
556 to set a timeout. Defaults to True.
557 timeout: An optional duration of time in seconds to allow for the RPC,
558 after which an exception will be raised. If timeout is unspecified,
559 defaults to a timeout controlled by the
560 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
561 unset, defaults to 60 seconds. Supply a value of None to indicate that
562 no timeout should be enforced.
563 metadata: Optional metadata to send to the server.
565 Returns:
566 An iterator of responses.
567 """
568 channel, method_handle = ChannelCache.get().get_channel(
569 target,
570 options,
571 channel_credentials,
572 insecure,
573 compression,
574 method,
575 _registered_method,
576 )
577 multicallable = channel.stream_stream(
578 method, request_serializer, response_deserializer, method_handle
579 )
580 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
581 return multicallable(
582 request_iterator,
583 metadata=metadata,
584 wait_for_ready=wait_for_ready,
585 credentials=call_credentials,
586 timeout=timeout,
587 )