1# Copyright 2020 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Functions that obviate explicit stubs and explicit channels."""
15
16import collections
17import datetime
18import logging
19import os
20import threading
21from typing import (
22 Any,
23 AnyStr,
24 Callable,
25 Dict,
26 Iterator,
27 Optional,
28 Sequence,
29 Tuple,
30 TypeVar,
31 Union,
32)
33
34import grpc
35from grpc.experimental import experimental_api
36
37RequestType = TypeVar("RequestType")
38ResponseType = TypeVar("ResponseType")
39
40OptionsType = Sequence[Tuple[str, str]]
41CacheKey = Tuple[
42 str,
43 OptionsType,
44 Optional[grpc.ChannelCredentials],
45 Optional[grpc.Compression],
46]
47
48_LOGGER = logging.getLogger(__name__)
49
50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
51if _EVICTION_PERIOD_KEY in os.environ:
52 _EVICTION_PERIOD = datetime.timedelta(
53 seconds=float(os.environ[_EVICTION_PERIOD_KEY])
54 )
55 _LOGGER.debug(
56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD
57 )
58else:
59 _EVICTION_PERIOD = datetime.timedelta(minutes=10)
60
61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
62if _MAXIMUM_CHANNELS_KEY in os.environ:
63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
65else:
66 _MAXIMUM_CHANNELS = 2**8
67
68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
69if _DEFAULT_TIMEOUT_KEY in os.environ:
70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
72else:
73 _DEFAULT_TIMEOUT = 60.0
74
75
76def _create_channel(
77 target: str,
78 options: Sequence[Tuple[str, str]],
79 channel_credentials: Optional[grpc.ChannelCredentials],
80 compression: Optional[grpc.Compression],
81) -> grpc.Channel:
82 _LOGGER.debug(
83 f"Creating secure channel with credentials '{channel_credentials}', "
84 + f"options '{options}' and compression '{compression}'"
85 )
86 return grpc.secure_channel(
87 target,
88 credentials=channel_credentials,
89 options=options,
90 compression=compression,
91 )
92
93
94class ChannelCache:
95 # NOTE(rbellevi): Untyped due to reference cycle.
96 _singleton = None
97 _lock: threading.RLock = threading.RLock()
98 _condition: threading.Condition = threading.Condition(lock=_lock)
99 _eviction_ready: threading.Event = threading.Event()
100
101 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
102 _eviction_thread: threading.Thread
103
104 def __init__(self):
105 self._mapping = collections.OrderedDict()
106 self._eviction_thread = threading.Thread(
107 target=ChannelCache._perform_evictions, daemon=True
108 )
109 self._eviction_thread.start()
110
111 @staticmethod
112 def get():
113 with ChannelCache._lock:
114 if ChannelCache._singleton is None:
115 ChannelCache._singleton = ChannelCache()
116 ChannelCache._eviction_ready.wait()
117 return ChannelCache._singleton
118
119 def _evict_locked(self, key: CacheKey):
120 channel, _ = self._mapping.pop(key)
121 _LOGGER.debug(
122 "Evicting channel %s with configuration %s.", channel, key
123 )
124 channel.close()
125 del channel
126
127 @staticmethod
128 def _perform_evictions():
129 while True:
130 with ChannelCache._lock:
131 ChannelCache._eviction_ready.set()
132 if not ChannelCache._singleton._mapping:
133 ChannelCache._condition.wait()
134 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
135 key = next(iter(ChannelCache._singleton._mapping.keys()))
136 ChannelCache._singleton._evict_locked(key)
137 # And immediately reevaluate.
138 else:
139 key, (_, eviction_time) = next(
140 iter(ChannelCache._singleton._mapping.items())
141 )
142 now = datetime.datetime.now()
143 if eviction_time <= now:
144 ChannelCache._singleton._evict_locked(key)
145 continue
146 else:
147 time_to_eviction = (eviction_time - now).total_seconds()
148 # NOTE: We aim to *eventually* coalesce to a state in
149 # which no overdue channels are in the cache and the
150 # length of the cache is longer than _MAXIMUM_CHANNELS.
151 # We tolerate momentary states in which these two
152 # criteria are not met.
153 ChannelCache._condition.wait(timeout=time_to_eviction)
154
155 def get_channel(
156 self,
157 target: str,
158 options: Sequence[Tuple[str, str]],
159 channel_credentials: Optional[grpc.ChannelCredentials],
160 insecure: bool,
161 compression: Optional[grpc.Compression],
162 method: str,
163 _registered_method: bool,
164 ) -> Tuple[grpc.Channel, Optional[int]]:
165 """Get a channel from cache or creates a new channel.
166
167 This method also takes care of register method for channel,
168 which means we'll register a new call handle if we're calling a
169 non-registered method for an existing channel.
170
171 Returns:
172 A tuple with two items. The first item is the channel, second item is
173 the call handle if the method is registered, None if it's not registered.
174 """
175 if insecure and channel_credentials:
176 raise ValueError(
177 "The insecure option is mutually exclusive with "
178 + "the channel_credentials option. Please use one "
179 + "or the other."
180 )
181 if insecure:
182 channel_credentials = (
183 grpc.experimental.insecure_channel_credentials()
184 )
185 elif channel_credentials is None:
186 _LOGGER.debug("Defaulting to SSL channel credentials.")
187 channel_credentials = grpc.ssl_channel_credentials()
188 key = (target, options, channel_credentials, compression)
189 with self._lock:
190 channel_data = self._mapping.get(key, None)
191 call_handle = None
192 if channel_data is not None:
193 channel = channel_data[0]
194 # Register a new call handle if we're calling a registered method for an
195 # existing channel and this method is not registered.
196 if _registered_method:
197 call_handle = channel._get_registered_call_handle(method)
198 self._mapping.pop(key)
199 self._mapping[key] = (
200 channel,
201 datetime.datetime.now() + _EVICTION_PERIOD,
202 )
203 return channel, call_handle
204 else:
205 channel = _create_channel(
206 target, options, channel_credentials, compression
207 )
208 if _registered_method:
209 call_handle = channel._get_registered_call_handle(method)
210 self._mapping[key] = (
211 channel,
212 datetime.datetime.now() + _EVICTION_PERIOD,
213 )
214 if (
215 len(self._mapping) == 1
216 or len(self._mapping) >= _MAXIMUM_CHANNELS
217 ):
218 self._condition.notify()
219 return channel, call_handle
220
221 def _test_only_channel_count(self) -> int:
222 with self._lock:
223 return len(self._mapping)
224
225
226@experimental_api
227# pylint: disable=too-many-locals
228def unary_unary(
229 request: RequestType,
230 target: str,
231 method: str,
232 request_serializer: Optional[Callable[[Any], bytes]] = None,
233 response_deserializer: Optional[Callable[[bytes], Any]] = None,
234 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
235 channel_credentials: Optional[grpc.ChannelCredentials] = None,
236 insecure: bool = False,
237 call_credentials: Optional[grpc.CallCredentials] = None,
238 compression: Optional[grpc.Compression] = None,
239 wait_for_ready: Optional[bool] = None,
240 timeout: Optional[float] = _DEFAULT_TIMEOUT,
241 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
242 _registered_method: Optional[bool] = False,
243) -> ResponseType:
244 """Invokes a unary-unary RPC without an explicitly specified channel.
245
246 THIS IS AN EXPERIMENTAL API.
247
248 This is backed by a per-process cache of channels. Channels are evicted
249 from the cache after a fixed period by a background. Channels will also be
250 evicted if more than a configured maximum accumulate.
251
252 The default eviction period is 10 minutes. One may set the environment
253 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
254
255 The default maximum number of channels is 256. One may set the
256 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
257 this.
258
259 Args:
260 request: An iterator that yields request values for the RPC.
261 target: The server address.
262 method: The name of the RPC method.
263 request_serializer: Optional :term:`serializer` for serializing the request
264 message. Request goes unserialized in case None is passed.
265 response_deserializer: Optional :term:`deserializer` for deserializing the response
266 message. Response goes undeserialized in case None is passed.
267 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
268 runtime) to configure the channel.
269 channel_credentials: A credential applied to the whole channel, e.g. the
270 return value of grpc.ssl_channel_credentials() or
271 grpc.insecure_channel_credentials().
272 insecure: If True, specifies channel_credentials as
273 :term:`grpc.insecure_channel_credentials()`. This option is mutually
274 exclusive with the `channel_credentials` option.
275 call_credentials: A call credential applied to each call individually,
276 e.g. the output of grpc.metadata_call_credentials() or
277 grpc.access_token_call_credentials().
278 compression: An optional value indicating the compression method to be
279 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
280 wait_for_ready: An optional flag indicating whether the RPC should fail
281 immediately if the connection is not ready at the time the RPC is
282 invoked, or if it should wait until the connection to the server
283 becomes ready. When using this option, the user will likely also want
284 to set a timeout. Defaults to True.
285 timeout: An optional duration of time in seconds to allow for the RPC,
286 after which an exception will be raised. If timeout is unspecified,
287 defaults to a timeout controlled by the
288 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
289 unset, defaults to 60 seconds. Supply a value of None to indicate that
290 no timeout should be enforced.
291 metadata: Optional metadata to send to the server.
292
293 Returns:
294 The response to the RPC.
295 """
296 channel, method_handle = ChannelCache.get().get_channel(
297 target,
298 options,
299 channel_credentials,
300 insecure,
301 compression,
302 method,
303 _registered_method,
304 )
305 multicallable = channel.unary_unary(
306 method, request_serializer, response_deserializer, method_handle
307 )
308 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
309 return multicallable(
310 request,
311 metadata=metadata,
312 wait_for_ready=wait_for_ready,
313 credentials=call_credentials,
314 timeout=timeout,
315 )
316
317
318@experimental_api
319# pylint: disable=too-many-locals
320def unary_stream(
321 request: RequestType,
322 target: str,
323 method: str,
324 request_serializer: Optional[Callable[[Any], bytes]] = None,
325 response_deserializer: Optional[Callable[[bytes], Any]] = None,
326 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
327 channel_credentials: Optional[grpc.ChannelCredentials] = None,
328 insecure: bool = False,
329 call_credentials: Optional[grpc.CallCredentials] = None,
330 compression: Optional[grpc.Compression] = None,
331 wait_for_ready: Optional[bool] = None,
332 timeout: Optional[float] = _DEFAULT_TIMEOUT,
333 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
334 _registered_method: Optional[bool] = False,
335) -> Iterator[ResponseType]:
336 """Invokes a unary-stream RPC without an explicitly specified channel.
337
338 THIS IS AN EXPERIMENTAL API.
339
340 This is backed by a per-process cache of channels. Channels are evicted
341 from the cache after a fixed period by a background. Channels will also be
342 evicted if more than a configured maximum accumulate.
343
344 The default eviction period is 10 minutes. One may set the environment
345 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
346
347 The default maximum number of channels is 256. One may set the
348 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
349 this.
350
351 Args:
352 request: An iterator that yields request values for the RPC.
353 target: The server address.
354 method: The name of the RPC method.
355 request_serializer: Optional :term:`serializer` for serializing the request
356 message. Request goes unserialized in case None is passed.
357 response_deserializer: Optional :term:`deserializer` for deserializing the response
358 message. Response goes undeserialized in case None is passed.
359 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
360 runtime) to configure the channel.
361 channel_credentials: A credential applied to the whole channel, e.g. the
362 return value of grpc.ssl_channel_credentials().
363 insecure: If True, specifies channel_credentials as
364 :term:`grpc.insecure_channel_credentials()`. This option is mutually
365 exclusive with the `channel_credentials` option.
366 call_credentials: A call credential applied to each call individually,
367 e.g. the output of grpc.metadata_call_credentials() or
368 grpc.access_token_call_credentials().
369 compression: An optional value indicating the compression method to be
370 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
371 wait_for_ready: An optional flag indicating whether the RPC should fail
372 immediately if the connection is not ready at the time the RPC is
373 invoked, or if it should wait until the connection to the server
374 becomes ready. When using this option, the user will likely also want
375 to set a timeout. Defaults to True.
376 timeout: An optional duration of time in seconds to allow for the RPC,
377 after which an exception will be raised. If timeout is unspecified,
378 defaults to a timeout controlled by the
379 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
380 unset, defaults to 60 seconds. Supply a value of None to indicate that
381 no timeout should be enforced.
382 metadata: Optional metadata to send to the server.
383
384 Returns:
385 An iterator of responses.
386 """
387 channel, method_handle = ChannelCache.get().get_channel(
388 target,
389 options,
390 channel_credentials,
391 insecure,
392 compression,
393 method,
394 _registered_method,
395 )
396 multicallable = channel.unary_stream(
397 method, request_serializer, response_deserializer, method_handle
398 )
399 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
400 return multicallable(
401 request,
402 metadata=metadata,
403 wait_for_ready=wait_for_ready,
404 credentials=call_credentials,
405 timeout=timeout,
406 )
407
408
409@experimental_api
410# pylint: disable=too-many-locals
411def stream_unary(
412 request_iterator: Iterator[RequestType],
413 target: str,
414 method: str,
415 request_serializer: Optional[Callable[[Any], bytes]] = None,
416 response_deserializer: Optional[Callable[[bytes], Any]] = None,
417 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
418 channel_credentials: Optional[grpc.ChannelCredentials] = None,
419 insecure: bool = False,
420 call_credentials: Optional[grpc.CallCredentials] = None,
421 compression: Optional[grpc.Compression] = None,
422 wait_for_ready: Optional[bool] = None,
423 timeout: Optional[float] = _DEFAULT_TIMEOUT,
424 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
425 _registered_method: Optional[bool] = False,
426) -> ResponseType:
427 """Invokes a stream-unary RPC without an explicitly specified channel.
428
429 THIS IS AN EXPERIMENTAL API.
430
431 This is backed by a per-process cache of channels. Channels are evicted
432 from the cache after a fixed period by a background. Channels will also be
433 evicted if more than a configured maximum accumulate.
434
435 The default eviction period is 10 minutes. One may set the environment
436 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
437
438 The default maximum number of channels is 256. One may set the
439 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
440 this.
441
442 Args:
443 request_iterator: An iterator that yields request values for the RPC.
444 target: The server address.
445 method: The name of the RPC method.
446 request_serializer: Optional :term:`serializer` for serializing the request
447 message. Request goes unserialized in case None is passed.
448 response_deserializer: Optional :term:`deserializer` for deserializing the response
449 message. Response goes undeserialized in case None is passed.
450 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
451 runtime) to configure the channel.
452 channel_credentials: A credential applied to the whole channel, e.g. the
453 return value of grpc.ssl_channel_credentials().
454 call_credentials: A call credential applied to each call individually,
455 e.g. the output of grpc.metadata_call_credentials() or
456 grpc.access_token_call_credentials().
457 insecure: If True, specifies channel_credentials as
458 :term:`grpc.insecure_channel_credentials()`. This option is mutually
459 exclusive with the `channel_credentials` option.
460 compression: An optional value indicating the compression method to be
461 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
462 wait_for_ready: An optional flag indicating whether the RPC should fail
463 immediately if the connection is not ready at the time the RPC is
464 invoked, or if it should wait until the connection to the server
465 becomes ready. When using this option, the user will likely also want
466 to set a timeout. Defaults to True.
467 timeout: An optional duration of time in seconds to allow for the RPC,
468 after which an exception will be raised. If timeout is unspecified,
469 defaults to a timeout controlled by the
470 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
471 unset, defaults to 60 seconds. Supply a value of None to indicate that
472 no timeout should be enforced.
473 metadata: Optional metadata to send to the server.
474
475 Returns:
476 The response to the RPC.
477 """
478 channel, method_handle = ChannelCache.get().get_channel(
479 target,
480 options,
481 channel_credentials,
482 insecure,
483 compression,
484 method,
485 _registered_method,
486 )
487 multicallable = channel.stream_unary(
488 method, request_serializer, response_deserializer, method_handle
489 )
490 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
491 return multicallable(
492 request_iterator,
493 metadata=metadata,
494 wait_for_ready=wait_for_ready,
495 credentials=call_credentials,
496 timeout=timeout,
497 )
498
499
500@experimental_api
501# pylint: disable=too-many-locals
502def stream_stream(
503 request_iterator: Iterator[RequestType],
504 target: str,
505 method: str,
506 request_serializer: Optional[Callable[[Any], bytes]] = None,
507 response_deserializer: Optional[Callable[[bytes], Any]] = None,
508 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
509 channel_credentials: Optional[grpc.ChannelCredentials] = None,
510 insecure: bool = False,
511 call_credentials: Optional[grpc.CallCredentials] = None,
512 compression: Optional[grpc.Compression] = None,
513 wait_for_ready: Optional[bool] = None,
514 timeout: Optional[float] = _DEFAULT_TIMEOUT,
515 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
516 _registered_method: Optional[bool] = False,
517) -> Iterator[ResponseType]:
518 """Invokes a stream-stream RPC without an explicitly specified channel.
519
520 THIS IS AN EXPERIMENTAL API.
521
522 This is backed by a per-process cache of channels. Channels are evicted
523 from the cache after a fixed period by a background. Channels will also be
524 evicted if more than a configured maximum accumulate.
525
526 The default eviction period is 10 minutes. One may set the environment
527 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
528
529 The default maximum number of channels is 256. One may set the
530 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
531 this.
532
533 Args:
534 request_iterator: An iterator that yields request values for the RPC.
535 target: The server address.
536 method: The name of the RPC method.
537 request_serializer: Optional :term:`serializer` for serializing the request
538 message. Request goes unserialized in case None is passed.
539 response_deserializer: Optional :term:`deserializer` for deserializing the response
540 message. Response goes undeserialized in case None is passed.
541 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
542 runtime) to configure the channel.
543 channel_credentials: A credential applied to the whole channel, e.g. the
544 return value of grpc.ssl_channel_credentials().
545 call_credentials: A call credential applied to each call individually,
546 e.g. the output of grpc.metadata_call_credentials() or
547 grpc.access_token_call_credentials().
548 insecure: If True, specifies channel_credentials as
549 :term:`grpc.insecure_channel_credentials()`. This option is mutually
550 exclusive with the `channel_credentials` option.
551 compression: An optional value indicating the compression method to be
552 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
553 wait_for_ready: An optional flag indicating whether the RPC should fail
554 immediately if the connection is not ready at the time the RPC is
555 invoked, or if it should wait until the connection to the server
556 becomes ready. When using this option, the user will likely also want
557 to set a timeout. Defaults to True.
558 timeout: An optional duration of time in seconds to allow for the RPC,
559 after which an exception will be raised. If timeout is unspecified,
560 defaults to a timeout controlled by the
561 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
562 unset, defaults to 60 seconds. Supply a value of None to indicate that
563 no timeout should be enforced.
564 metadata: Optional metadata to send to the server.
565
566 Returns:
567 An iterator of responses.
568 """
569 channel, method_handle = ChannelCache.get().get_channel(
570 target,
571 options,
572 channel_credentials,
573 insecure,
574 compression,
575 method,
576 _registered_method,
577 )
578 multicallable = channel.stream_stream(
579 method, request_serializer, response_deserializer, method_handle
580 )
581 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
582 return multicallable(
583 request_iterator,
584 metadata=metadata,
585 wait_for_ready=wait_for_ready,
586 credentials=call_credentials,
587 timeout=timeout,
588 )