Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_simple_stubs.py: 39%
119 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2020 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Functions that obviate explicit stubs and explicit channels."""
16import collections
17import datetime
18import logging
19import os
20import threading
21from typing import (Any, AnyStr, Callable, Dict, Iterator, Optional, Sequence,
22 Tuple, TypeVar, Union)
24import grpc
25from grpc.experimental import experimental_api
27RequestType = TypeVar('RequestType')
28ResponseType = TypeVar('ResponseType')
30OptionsType = Sequence[Tuple[str, str]]
31CacheKey = Tuple[str, OptionsType, Optional[grpc.ChannelCredentials],
32 Optional[grpc.Compression]]
34_LOGGER = logging.getLogger(__name__)
36_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
37if _EVICTION_PERIOD_KEY in os.environ:
38 _EVICTION_PERIOD = datetime.timedelta(
39 seconds=float(os.environ[_EVICTION_PERIOD_KEY]))
40 _LOGGER.debug("Setting managed channel eviction period to %s",
41 _EVICTION_PERIOD)
42else:
43 _EVICTION_PERIOD = datetime.timedelta(minutes=10)
45_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
46if _MAXIMUM_CHANNELS_KEY in os.environ:
47 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
48 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
49else:
50 _MAXIMUM_CHANNELS = 2**8
52_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
53if _DEFAULT_TIMEOUT_KEY in os.environ:
54 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
55 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
56else:
57 _DEFAULT_TIMEOUT = 60.0
60def _create_channel(target: str, options: Sequence[Tuple[str, str]],
61 channel_credentials: Optional[grpc.ChannelCredentials],
62 compression: Optional[grpc.Compression]) -> grpc.Channel:
63 _LOGGER.debug(
64 f"Creating secure channel with credentials '{channel_credentials}', " +
65 f"options '{options}' and compression '{compression}'")
66 return grpc.secure_channel(target,
67 credentials=channel_credentials,
68 options=options,
69 compression=compression)
72class ChannelCache:
73 # NOTE(rbellevi): Untyped due to reference cycle.
74 _singleton = None
75 _lock: threading.RLock = threading.RLock()
76 _condition: threading.Condition = threading.Condition(lock=_lock)
77 _eviction_ready: threading.Event = threading.Event()
79 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
80 _eviction_thread: threading.Thread
82 def __init__(self):
83 self._mapping = collections.OrderedDict()
84 self._eviction_thread = threading.Thread(
85 target=ChannelCache._perform_evictions, daemon=True)
86 self._eviction_thread.start()
88 @staticmethod
89 def get():
90 with ChannelCache._lock:
91 if ChannelCache._singleton is None:
92 ChannelCache._singleton = ChannelCache()
93 ChannelCache._eviction_ready.wait()
94 return ChannelCache._singleton
96 def _evict_locked(self, key: CacheKey):
97 channel, _ = self._mapping.pop(key)
98 _LOGGER.debug("Evicting channel %s with configuration %s.", channel,
99 key)
100 channel.close()
101 del channel
103 @staticmethod
104 def _perform_evictions():
105 while True:
106 with ChannelCache._lock:
107 ChannelCache._eviction_ready.set()
108 if not ChannelCache._singleton._mapping:
109 ChannelCache._condition.wait()
110 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
111 key = next(iter(ChannelCache._singleton._mapping.keys()))
112 ChannelCache._singleton._evict_locked(key)
113 # And immediately reevaluate.
114 else:
115 key, (_, eviction_time) = next(
116 iter(ChannelCache._singleton._mapping.items()))
117 now = datetime.datetime.now()
118 if eviction_time <= now:
119 ChannelCache._singleton._evict_locked(key)
120 continue
121 else:
122 time_to_eviction = (eviction_time - now).total_seconds()
123 # NOTE: We aim to *eventually* coalesce to a state in
124 # which no overdue channels are in the cache and the
125 # length of the cache is longer than _MAXIMUM_CHANNELS.
126 # We tolerate momentary states in which these two
127 # criteria are not met.
128 ChannelCache._condition.wait(timeout=time_to_eviction)
130 def get_channel(self, target: str, options: Sequence[Tuple[str, str]],
131 channel_credentials: Optional[grpc.ChannelCredentials],
132 insecure: bool,
133 compression: Optional[grpc.Compression]) -> grpc.Channel:
134 if insecure and channel_credentials:
135 raise ValueError("The insecure option is mutually exclusive with " +
136 "the channel_credentials option. Please use one " +
137 "or the other.")
138 if insecure:
139 channel_credentials = grpc.experimental.insecure_channel_credentials(
140 )
141 elif channel_credentials is None:
142 _LOGGER.debug("Defaulting to SSL channel credentials.")
143 channel_credentials = grpc.ssl_channel_credentials()
144 key = (target, options, channel_credentials, compression)
145 with self._lock:
146 channel_data = self._mapping.get(key, None)
147 if channel_data is not None:
148 channel = channel_data[0]
149 self._mapping.pop(key)
150 self._mapping[key] = (channel, datetime.datetime.now() +
151 _EVICTION_PERIOD)
152 return channel
153 else:
154 channel = _create_channel(target, options, channel_credentials,
155 compression)
156 self._mapping[key] = (channel, datetime.datetime.now() +
157 _EVICTION_PERIOD)
158 if len(self._mapping) == 1 or len(
159 self._mapping) >= _MAXIMUM_CHANNELS:
160 self._condition.notify()
161 return channel
163 def _test_only_channel_count(self) -> int:
164 with self._lock:
165 return len(self._mapping)
168@experimental_api
169def unary_unary(
170 request: RequestType,
171 target: str,
172 method: str,
173 request_serializer: Optional[Callable[[Any], bytes]] = None,
174 response_deserializer: Optional[Callable[[bytes], Any]] = None,
175 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
176 channel_credentials: Optional[grpc.ChannelCredentials] = None,
177 insecure: bool = False,
178 call_credentials: Optional[grpc.CallCredentials] = None,
179 compression: Optional[grpc.Compression] = None,
180 wait_for_ready: Optional[bool] = None,
181 timeout: Optional[float] = _DEFAULT_TIMEOUT,
182 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
183) -> ResponseType:
184 """Invokes a unary-unary RPC without an explicitly specified channel.
186 THIS IS AN EXPERIMENTAL API.
188 This is backed by a per-process cache of channels. Channels are evicted
189 from the cache after a fixed period by a background. Channels will also be
190 evicted if more than a configured maximum accumulate.
192 The default eviction period is 10 minutes. One may set the environment
193 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
195 The default maximum number of channels is 256. One may set the
196 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
197 this.
199 Args:
200 request: An iterator that yields request values for the RPC.
201 target: The server address.
202 method: The name of the RPC method.
203 request_serializer: Optional :term:`serializer` for serializing the request
204 message. Request goes unserialized in case None is passed.
205 response_deserializer: Optional :term:`deserializer` for deserializing the response
206 message. Response goes undeserialized in case None is passed.
207 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
208 runtime) to configure the channel.
209 channel_credentials: A credential applied to the whole channel, e.g. the
210 return value of grpc.ssl_channel_credentials() or
211 grpc.insecure_channel_credentials().
212 insecure: If True, specifies channel_credentials as
213 :term:`grpc.insecure_channel_credentials()`. This option is mutually
214 exclusive with the `channel_credentials` option.
215 call_credentials: A call credential applied to each call individually,
216 e.g. the output of grpc.metadata_call_credentials() or
217 grpc.access_token_call_credentials().
218 compression: An optional value indicating the compression method to be
219 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
220 wait_for_ready: An optional flag indicating whether the RPC should fail
221 immediately if the connection is not ready at the time the RPC is
222 invoked, or if it should wait until the connection to the server
223 becomes ready. When using this option, the user will likely also want
224 to set a timeout. Defaults to True.
225 timeout: An optional duration of time in seconds to allow for the RPC,
226 after which an exception will be raised. If timeout is unspecified,
227 defaults to a timeout controlled by the
228 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
229 unset, defaults to 60 seconds. Supply a value of None to indicate that
230 no timeout should be enforced.
231 metadata: Optional metadata to send to the server.
233 Returns:
234 The response to the RPC.
235 """
236 channel = ChannelCache.get().get_channel(target, options,
237 channel_credentials, insecure,
238 compression)
239 multicallable = channel.unary_unary(method, request_serializer,
240 response_deserializer)
241 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
242 return multicallable(request,
243 metadata=metadata,
244 wait_for_ready=wait_for_ready,
245 credentials=call_credentials,
246 timeout=timeout)
249@experimental_api
250def unary_stream(
251 request: RequestType,
252 target: str,
253 method: str,
254 request_serializer: Optional[Callable[[Any], bytes]] = None,
255 response_deserializer: Optional[Callable[[bytes], Any]] = None,
256 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
257 channel_credentials: Optional[grpc.ChannelCredentials] = None,
258 insecure: bool = False,
259 call_credentials: Optional[grpc.CallCredentials] = None,
260 compression: Optional[grpc.Compression] = None,
261 wait_for_ready: Optional[bool] = None,
262 timeout: Optional[float] = _DEFAULT_TIMEOUT,
263 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
264) -> Iterator[ResponseType]:
265 """Invokes a unary-stream RPC without an explicitly specified channel.
267 THIS IS AN EXPERIMENTAL API.
269 This is backed by a per-process cache of channels. Channels are evicted
270 from the cache after a fixed period by a background. Channels will also be
271 evicted if more than a configured maximum accumulate.
273 The default eviction period is 10 minutes. One may set the environment
274 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
276 The default maximum number of channels is 256. One may set the
277 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
278 this.
280 Args:
281 request: An iterator that yields request values for the RPC.
282 target: The server address.
283 method: The name of the RPC method.
284 request_serializer: Optional :term:`serializer` for serializing the request
285 message. Request goes unserialized in case None is passed.
286 response_deserializer: Optional :term:`deserializer` for deserializing the response
287 message. Response goes undeserialized in case None is passed.
288 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
289 runtime) to configure the channel.
290 channel_credentials: A credential applied to the whole channel, e.g. the
291 return value of grpc.ssl_channel_credentials().
292 insecure: If True, specifies channel_credentials as
293 :term:`grpc.insecure_channel_credentials()`. This option is mutually
294 exclusive with the `channel_credentials` option.
295 call_credentials: A call credential applied to each call individually,
296 e.g. the output of grpc.metadata_call_credentials() or
297 grpc.access_token_call_credentials().
298 compression: An optional value indicating the compression method to be
299 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
300 wait_for_ready: An optional flag indicating whether the RPC should fail
301 immediately if the connection is not ready at the time the RPC is
302 invoked, or if it should wait until the connection to the server
303 becomes ready. When using this option, the user will likely also want
304 to set a timeout. Defaults to True.
305 timeout: An optional duration of time in seconds to allow for the RPC,
306 after which an exception will be raised. If timeout is unspecified,
307 defaults to a timeout controlled by the
308 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
309 unset, defaults to 60 seconds. Supply a value of None to indicate that
310 no timeout should be enforced.
311 metadata: Optional metadata to send to the server.
313 Returns:
314 An iterator of responses.
315 """
316 channel = ChannelCache.get().get_channel(target, options,
317 channel_credentials, insecure,
318 compression)
319 multicallable = channel.unary_stream(method, request_serializer,
320 response_deserializer)
321 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
322 return multicallable(request,
323 metadata=metadata,
324 wait_for_ready=wait_for_ready,
325 credentials=call_credentials,
326 timeout=timeout)
329@experimental_api
330def stream_unary(
331 request_iterator: Iterator[RequestType],
332 target: str,
333 method: str,
334 request_serializer: Optional[Callable[[Any], bytes]] = None,
335 response_deserializer: Optional[Callable[[bytes], Any]] = None,
336 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
337 channel_credentials: Optional[grpc.ChannelCredentials] = None,
338 insecure: bool = False,
339 call_credentials: Optional[grpc.CallCredentials] = None,
340 compression: Optional[grpc.Compression] = None,
341 wait_for_ready: Optional[bool] = None,
342 timeout: Optional[float] = _DEFAULT_TIMEOUT,
343 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
344) -> ResponseType:
345 """Invokes a stream-unary RPC without an explicitly specified channel.
347 THIS IS AN EXPERIMENTAL API.
349 This is backed by a per-process cache of channels. Channels are evicted
350 from the cache after a fixed period by a background. Channels will also be
351 evicted if more than a configured maximum accumulate.
353 The default eviction period is 10 minutes. One may set the environment
354 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
356 The default maximum number of channels is 256. One may set the
357 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
358 this.
360 Args:
361 request_iterator: An iterator that yields request values for the RPC.
362 target: The server address.
363 method: The name of the RPC method.
364 request_serializer: Optional :term:`serializer` for serializing the request
365 message. Request goes unserialized in case None is passed.
366 response_deserializer: Optional :term:`deserializer` for deserializing the response
367 message. Response goes undeserialized in case None is passed.
368 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
369 runtime) to configure the channel.
370 channel_credentials: A credential applied to the whole channel, e.g. the
371 return value of grpc.ssl_channel_credentials().
372 call_credentials: A call credential applied to each call individually,
373 e.g. the output of grpc.metadata_call_credentials() or
374 grpc.access_token_call_credentials().
375 insecure: If True, specifies channel_credentials as
376 :term:`grpc.insecure_channel_credentials()`. This option is mutually
377 exclusive with the `channel_credentials` option.
378 compression: An optional value indicating the compression method to be
379 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
380 wait_for_ready: An optional flag indicating whether the RPC should fail
381 immediately if the connection is not ready at the time the RPC is
382 invoked, or if it should wait until the connection to the server
383 becomes ready. When using this option, the user will likely also want
384 to set a timeout. Defaults to True.
385 timeout: An optional duration of time in seconds to allow for the RPC,
386 after which an exception will be raised. If timeout is unspecified,
387 defaults to a timeout controlled by the
388 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
389 unset, defaults to 60 seconds. Supply a value of None to indicate that
390 no timeout should be enforced.
391 metadata: Optional metadata to send to the server.
393 Returns:
394 The response to the RPC.
395 """
396 channel = ChannelCache.get().get_channel(target, options,
397 channel_credentials, insecure,
398 compression)
399 multicallable = channel.stream_unary(method, request_serializer,
400 response_deserializer)
401 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
402 return multicallable(request_iterator,
403 metadata=metadata,
404 wait_for_ready=wait_for_ready,
405 credentials=call_credentials,
406 timeout=timeout)
409@experimental_api
410def stream_stream(
411 request_iterator: Iterator[RequestType],
412 target: str,
413 method: str,
414 request_serializer: Optional[Callable[[Any], bytes]] = None,
415 response_deserializer: Optional[Callable[[bytes], Any]] = None,
416 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
417 channel_credentials: Optional[grpc.ChannelCredentials] = None,
418 insecure: bool = False,
419 call_credentials: Optional[grpc.CallCredentials] = None,
420 compression: Optional[grpc.Compression] = None,
421 wait_for_ready: Optional[bool] = None,
422 timeout: Optional[float] = _DEFAULT_TIMEOUT,
423 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
424) -> Iterator[ResponseType]:
425 """Invokes a stream-stream RPC without an explicitly specified channel.
427 THIS IS AN EXPERIMENTAL API.
429 This is backed by a per-process cache of channels. Channels are evicted
430 from the cache after a fixed period by a background. Channels will also be
431 evicted if more than a configured maximum accumulate.
433 The default eviction period is 10 minutes. One may set the environment
434 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
436 The default maximum number of channels is 256. One may set the
437 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
438 this.
440 Args:
441 request_iterator: An iterator that yields request values for the RPC.
442 target: The server address.
443 method: The name of the RPC method.
444 request_serializer: Optional :term:`serializer` for serializing the request
445 message. Request goes unserialized in case None is passed.
446 response_deserializer: Optional :term:`deserializer` for deserializing the response
447 message. Response goes undeserialized in case None is passed.
448 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
449 runtime) to configure the channel.
450 channel_credentials: A credential applied to the whole channel, e.g. the
451 return value of grpc.ssl_channel_credentials().
452 call_credentials: A call credential applied to each call individually,
453 e.g. the output of grpc.metadata_call_credentials() or
454 grpc.access_token_call_credentials().
455 insecure: If True, specifies channel_credentials as
456 :term:`grpc.insecure_channel_credentials()`. This option is mutually
457 exclusive with the `channel_credentials` option.
458 compression: An optional value indicating the compression method to be
459 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
460 wait_for_ready: An optional flag indicating whether the RPC should fail
461 immediately if the connection is not ready at the time the RPC is
462 invoked, or if it should wait until the connection to the server
463 becomes ready. When using this option, the user will likely also want
464 to set a timeout. Defaults to True.
465 timeout: An optional duration of time in seconds to allow for the RPC,
466 after which an exception will be raised. If timeout is unspecified,
467 defaults to a timeout controlled by the
468 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
469 unset, defaults to 60 seconds. Supply a value of None to indicate that
470 no timeout should be enforced.
471 metadata: Optional metadata to send to the server.
473 Returns:
474 An iterator of responses.
475 """
476 channel = ChannelCache.get().get_channel(target, options,
477 channel_credentials, insecure,
478 compression)
479 multicallable = channel.stream_stream(method, request_serializer,
480 response_deserializer)
481 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
482 return multicallable(request_iterator,
483 metadata=metadata,
484 wait_for_ready=wait_for_ready,
485 credentials=call_credentials,
486 timeout=timeout)