1# Copyright 2020 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Functions that obviate explicit stubs and explicit channels."""
15
16import collections
17import datetime
18import logging
19import os
20import threading
21from typing import (
22 Any,
23 AnyStr,
24 Callable,
25 Dict,
26 Iterator,
27 Optional,
28 Sequence,
29 Tuple,
30 TypeVar,
31 Union,
32)
33
34import grpc
35from grpc.experimental import experimental_api
36
37RequestType = TypeVar("RequestType")
38ResponseType = TypeVar("ResponseType")
39
40OptionsType = Sequence[Tuple[str, str]]
41CacheKey = Tuple[
42 str,
43 OptionsType,
44 Optional[grpc.ChannelCredentials],
45 Optional[grpc.Compression],
46]
47
48_LOGGER = logging.getLogger(__name__)
49
50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
51if _EVICTION_PERIOD_KEY in os.environ:
52 _EVICTION_PERIOD = datetime.timedelta(
53 seconds=float(os.environ[_EVICTION_PERIOD_KEY])
54 )
55 _LOGGER.debug(
56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD
57 )
58else:
59 _EVICTION_PERIOD = datetime.timedelta(minutes=10)
60
61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
62if _MAXIMUM_CHANNELS_KEY in os.environ:
63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
65else:
66 _MAXIMUM_CHANNELS = 2**8
67
68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
69if _DEFAULT_TIMEOUT_KEY in os.environ:
70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
72else:
73 _DEFAULT_TIMEOUT = 60.0
74
75
76def _create_channel(
77 target: str,
78 options: Sequence[Tuple[str, str]],
79 channel_credentials: Optional[grpc.ChannelCredentials],
80 compression: Optional[grpc.Compression],
81) -> grpc.Channel:
82 debug_msg = (
83 f"Creating secure channel with credentials '{channel_credentials}', "
84 f"options '{options}' and compression '{compression}'"
85 )
86 _LOGGER.debug(debug_msg)
87 return grpc.secure_channel(
88 target,
89 credentials=channel_credentials,
90 options=options,
91 compression=compression,
92 )
93
94
95class ChannelCache:
96 # NOTE(rbellevi): Untyped due to reference cycle.
97 _singleton = None
98 _lock: threading.RLock = threading.RLock()
99 _condition: threading.Condition = threading.Condition(lock=_lock)
100 _eviction_ready: threading.Event = threading.Event()
101
102 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
103 _eviction_thread: threading.Thread
104
105 def __init__(self):
106 self._mapping = collections.OrderedDict()
107 self._eviction_thread = threading.Thread(
108 target=ChannelCache._perform_evictions, daemon=True
109 )
110 self._eviction_thread.start()
111
112 @staticmethod
113 def get():
114 with ChannelCache._lock:
115 if ChannelCache._singleton is None:
116 ChannelCache._singleton = ChannelCache()
117 ChannelCache._eviction_ready.wait()
118 return ChannelCache._singleton
119
120 def _evict_locked(self, key: CacheKey):
121 channel, _ = self._mapping.pop(key)
122 _LOGGER.debug(
123 "Evicting channel %s with configuration %s.", channel, key
124 )
125 channel.close()
126 del channel
127
128 @staticmethod
129 def _perform_evictions():
130 while True:
131 with ChannelCache._lock:
132 ChannelCache._eviction_ready.set()
133 if not ChannelCache._singleton._mapping:
134 ChannelCache._condition.wait()
135 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
136 key = next(iter(ChannelCache._singleton._mapping.keys()))
137 ChannelCache._singleton._evict_locked(key)
138 # And immediately reevaluate.
139 else:
140 key, (_, eviction_time) = next(
141 iter(ChannelCache._singleton._mapping.items())
142 )
143 now = datetime.datetime.now()
144 if eviction_time <= now:
145 ChannelCache._singleton._evict_locked(key)
146 continue
147 else:
148 time_to_eviction = (eviction_time - now).total_seconds()
149 # NOTE: We aim to *eventually* coalesce to a state in
150 # which no overdue channels are in the cache and the
151 # length of the cache is longer than _MAXIMUM_CHANNELS.
152 # We tolerate momentary states in which these two
153 # criteria are not met.
154 ChannelCache._condition.wait(timeout=time_to_eviction)
155
156 def get_channel(
157 self,
158 target: str,
159 options: Sequence[Tuple[str, str]],
160 channel_credentials: Optional[grpc.ChannelCredentials],
161 insecure: bool,
162 compression: Optional[grpc.Compression],
163 method: str,
164 _registered_method: bool,
165 ) -> Tuple[grpc.Channel, Optional[int]]:
166 """Get a channel from cache or creates a new channel.
167
168 This method also takes care of register method for channel,
169 which means we'll register a new call handle if we're calling a
170 non-registered method for an existing channel.
171
172 Returns:
173 A tuple with two items. The first item is the channel, second item is
174 the call handle if the method is registered, None if it's not registered.
175 """
176 if insecure and channel_credentials:
177 raise ValueError(
178 "The insecure option is mutually exclusive with "
179 + "the channel_credentials option. Please use one "
180 + "or the other."
181 )
182 if insecure:
183 channel_credentials = (
184 grpc.experimental.insecure_channel_credentials()
185 )
186 elif channel_credentials is None:
187 _LOGGER.debug("Defaulting to SSL channel credentials.")
188 channel_credentials = grpc.ssl_channel_credentials()
189 key = (target, options, channel_credentials, compression)
190 with self._lock:
191 channel_data = self._mapping.get(key, None)
192 call_handle = None
193 if channel_data is not None:
194 channel = channel_data[0]
195 # Register a new call handle if we're calling a registered method for an
196 # existing channel and this method is not registered.
197 if _registered_method:
198 call_handle = channel._get_registered_call_handle(method)
199 self._mapping.pop(key)
200 self._mapping[key] = (
201 channel,
202 datetime.datetime.now() + _EVICTION_PERIOD,
203 )
204 return channel, call_handle
205 else:
206 channel = _create_channel(
207 target, options, channel_credentials, compression
208 )
209 if _registered_method:
210 call_handle = channel._get_registered_call_handle(method)
211 self._mapping[key] = (
212 channel,
213 datetime.datetime.now() + _EVICTION_PERIOD,
214 )
215 if (
216 len(self._mapping) == 1
217 or len(self._mapping) >= _MAXIMUM_CHANNELS
218 ):
219 self._condition.notify()
220 return channel, call_handle
221
222 def _test_only_channel_count(self) -> int:
223 with self._lock:
224 return len(self._mapping)
225
226
227@experimental_api
228# pylint: disable=too-many-locals
229def unary_unary(
230 request: RequestType,
231 target: str,
232 method: str,
233 request_serializer: Optional[Callable[[Any], bytes]] = None,
234 response_deserializer: Optional[Callable[[bytes], Any]] = None,
235 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
236 channel_credentials: Optional[grpc.ChannelCredentials] = None,
237 insecure: bool = False,
238 call_credentials: Optional[grpc.CallCredentials] = None,
239 compression: Optional[grpc.Compression] = None,
240 wait_for_ready: Optional[bool] = None,
241 timeout: Optional[float] = _DEFAULT_TIMEOUT,
242 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
243 _registered_method: Optional[bool] = False,
244) -> ResponseType:
245 """Invokes a unary-unary RPC without an explicitly specified channel.
246
247 THIS IS AN EXPERIMENTAL API.
248
249 This is backed by a per-process cache of channels. Channels are evicted
250 from the cache after a fixed period by a background. Channels will also be
251 evicted if more than a configured maximum accumulate.
252
253 The default eviction period is 10 minutes. One may set the environment
254 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
255
256 The default maximum number of channels is 256. One may set the
257 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
258 this.
259
260 Args:
261 request: An iterator that yields request values for the RPC.
262 target: The server address.
263 method: The name of the RPC method.
264 request_serializer: Optional :term:`serializer` for serializing the request
265 message. Request goes unserialized in case None is passed.
266 response_deserializer: Optional :term:`deserializer` for deserializing the response
267 message. Response goes undeserialized in case None is passed.
268 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
269 runtime) to configure the channel.
270 channel_credentials: A credential applied to the whole channel, e.g. the
271 return value of grpc.ssl_channel_credentials() or
272 grpc.insecure_channel_credentials().
273 insecure: If True, specifies channel_credentials as
274 :term:`grpc.insecure_channel_credentials()`. This option is mutually
275 exclusive with the `channel_credentials` option.
276 call_credentials: A call credential applied to each call individually,
277 e.g. the output of grpc.metadata_call_credentials() or
278 grpc.access_token_call_credentials().
279 compression: An optional value indicating the compression method to be
280 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
281 wait_for_ready: An optional flag indicating whether the RPC should fail
282 immediately if the connection is not ready at the time the RPC is
283 invoked, or if it should wait until the connection to the server
284 becomes ready. When using this option, the user will likely also want
285 to set a timeout. Defaults to True.
286 timeout: An optional duration of time in seconds to allow for the RPC,
287 after which an exception will be raised. If timeout is unspecified,
288 defaults to a timeout controlled by the
289 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
290 unset, defaults to 60 seconds. Supply a value of None to indicate that
291 no timeout should be enforced.
292 metadata: Optional metadata to send to the server.
293
294 Returns:
295 The response to the RPC.
296 """
297 channel, method_handle = ChannelCache.get().get_channel(
298 target,
299 options,
300 channel_credentials,
301 insecure,
302 compression,
303 method,
304 _registered_method,
305 )
306 multicallable = channel.unary_unary(
307 method, request_serializer, response_deserializer, method_handle
308 )
309 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
310 return multicallable(
311 request,
312 metadata=metadata,
313 wait_for_ready=wait_for_ready,
314 credentials=call_credentials,
315 timeout=timeout,
316 )
317
318
319@experimental_api
320# pylint: disable=too-many-locals
321def unary_stream(
322 request: RequestType,
323 target: str,
324 method: str,
325 request_serializer: Optional[Callable[[Any], bytes]] = None,
326 response_deserializer: Optional[Callable[[bytes], Any]] = None,
327 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
328 channel_credentials: Optional[grpc.ChannelCredentials] = None,
329 insecure: bool = False,
330 call_credentials: Optional[grpc.CallCredentials] = None,
331 compression: Optional[grpc.Compression] = None,
332 wait_for_ready: Optional[bool] = None,
333 timeout: Optional[float] = _DEFAULT_TIMEOUT,
334 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
335 _registered_method: Optional[bool] = False,
336) -> Iterator[ResponseType]:
337 """Invokes a unary-stream RPC without an explicitly specified channel.
338
339 THIS IS AN EXPERIMENTAL API.
340
341 This is backed by a per-process cache of channels. Channels are evicted
342 from the cache after a fixed period by a background. Channels will also be
343 evicted if more than a configured maximum accumulate.
344
345 The default eviction period is 10 minutes. One may set the environment
346 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
347
348 The default maximum number of channels is 256. One may set the
349 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
350 this.
351
352 Args:
353 request: An iterator that yields request values for the RPC.
354 target: The server address.
355 method: The name of the RPC method.
356 request_serializer: Optional :term:`serializer` for serializing the request
357 message. Request goes unserialized in case None is passed.
358 response_deserializer: Optional :term:`deserializer` for deserializing the response
359 message. Response goes undeserialized in case None is passed.
360 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
361 runtime) to configure the channel.
362 channel_credentials: A credential applied to the whole channel, e.g. the
363 return value of grpc.ssl_channel_credentials().
364 insecure: If True, specifies channel_credentials as
365 :term:`grpc.insecure_channel_credentials()`. This option is mutually
366 exclusive with the `channel_credentials` option.
367 call_credentials: A call credential applied to each call individually,
368 e.g. the output of grpc.metadata_call_credentials() or
369 grpc.access_token_call_credentials().
370 compression: An optional value indicating the compression method to be
371 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
372 wait_for_ready: An optional flag indicating whether the RPC should fail
373 immediately if the connection is not ready at the time the RPC is
374 invoked, or if it should wait until the connection to the server
375 becomes ready. When using this option, the user will likely also want
376 to set a timeout. Defaults to True.
377 timeout: An optional duration of time in seconds to allow for the RPC,
378 after which an exception will be raised. If timeout is unspecified,
379 defaults to a timeout controlled by the
380 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
381 unset, defaults to 60 seconds. Supply a value of None to indicate that
382 no timeout should be enforced.
383 metadata: Optional metadata to send to the server.
384
385 Returns:
386 An iterator of responses.
387 """
388 channel, method_handle = ChannelCache.get().get_channel(
389 target,
390 options,
391 channel_credentials,
392 insecure,
393 compression,
394 method,
395 _registered_method,
396 )
397 multicallable = channel.unary_stream(
398 method, request_serializer, response_deserializer, method_handle
399 )
400 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
401 return multicallable(
402 request,
403 metadata=metadata,
404 wait_for_ready=wait_for_ready,
405 credentials=call_credentials,
406 timeout=timeout,
407 )
408
409
410@experimental_api
411# pylint: disable=too-many-locals
412def stream_unary(
413 request_iterator: Iterator[RequestType],
414 target: str,
415 method: str,
416 request_serializer: Optional[Callable[[Any], bytes]] = None,
417 response_deserializer: Optional[Callable[[bytes], Any]] = None,
418 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
419 channel_credentials: Optional[grpc.ChannelCredentials] = None,
420 insecure: bool = False,
421 call_credentials: Optional[grpc.CallCredentials] = None,
422 compression: Optional[grpc.Compression] = None,
423 wait_for_ready: Optional[bool] = None,
424 timeout: Optional[float] = _DEFAULT_TIMEOUT,
425 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
426 _registered_method: Optional[bool] = False,
427) -> ResponseType:
428 """Invokes a stream-unary RPC without an explicitly specified channel.
429
430 THIS IS AN EXPERIMENTAL API.
431
432 This is backed by a per-process cache of channels. Channels are evicted
433 from the cache after a fixed period by a background. Channels will also be
434 evicted if more than a configured maximum accumulate.
435
436 The default eviction period is 10 minutes. One may set the environment
437 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
438
439 The default maximum number of channels is 256. One may set the
440 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
441 this.
442
443 Args:
444 request_iterator: An iterator that yields request values for the RPC.
445 target: The server address.
446 method: The name of the RPC method.
447 request_serializer: Optional :term:`serializer` for serializing the request
448 message. Request goes unserialized in case None is passed.
449 response_deserializer: Optional :term:`deserializer` for deserializing the response
450 message. Response goes undeserialized in case None is passed.
451 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
452 runtime) to configure the channel.
453 channel_credentials: A credential applied to the whole channel, e.g. the
454 return value of grpc.ssl_channel_credentials().
455 call_credentials: A call credential applied to each call individually,
456 e.g. the output of grpc.metadata_call_credentials() or
457 grpc.access_token_call_credentials().
458 insecure: If True, specifies channel_credentials as
459 :term:`grpc.insecure_channel_credentials()`. This option is mutually
460 exclusive with the `channel_credentials` option.
461 compression: An optional value indicating the compression method to be
462 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
463 wait_for_ready: An optional flag indicating whether the RPC should fail
464 immediately if the connection is not ready at the time the RPC is
465 invoked, or if it should wait until the connection to the server
466 becomes ready. When using this option, the user will likely also want
467 to set a timeout. Defaults to True.
468 timeout: An optional duration of time in seconds to allow for the RPC,
469 after which an exception will be raised. If timeout is unspecified,
470 defaults to a timeout controlled by the
471 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
472 unset, defaults to 60 seconds. Supply a value of None to indicate that
473 no timeout should be enforced.
474 metadata: Optional metadata to send to the server.
475
476 Returns:
477 The response to the RPC.
478 """
479 channel, method_handle = ChannelCache.get().get_channel(
480 target,
481 options,
482 channel_credentials,
483 insecure,
484 compression,
485 method,
486 _registered_method,
487 )
488 multicallable = channel.stream_unary(
489 method, request_serializer, response_deserializer, method_handle
490 )
491 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
492 return multicallable(
493 request_iterator,
494 metadata=metadata,
495 wait_for_ready=wait_for_ready,
496 credentials=call_credentials,
497 timeout=timeout,
498 )
499
500
501@experimental_api
502# pylint: disable=too-many-locals
503def stream_stream(
504 request_iterator: Iterator[RequestType],
505 target: str,
506 method: str,
507 request_serializer: Optional[Callable[[Any], bytes]] = None,
508 response_deserializer: Optional[Callable[[bytes], Any]] = None,
509 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
510 channel_credentials: Optional[grpc.ChannelCredentials] = None,
511 insecure: bool = False,
512 call_credentials: Optional[grpc.CallCredentials] = None,
513 compression: Optional[grpc.Compression] = None,
514 wait_for_ready: Optional[bool] = None,
515 timeout: Optional[float] = _DEFAULT_TIMEOUT,
516 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
517 _registered_method: Optional[bool] = False,
518) -> Iterator[ResponseType]:
519 """Invokes a stream-stream RPC without an explicitly specified channel.
520
521 THIS IS AN EXPERIMENTAL API.
522
523 This is backed by a per-process cache of channels. Channels are evicted
524 from the cache after a fixed period by a background. Channels will also be
525 evicted if more than a configured maximum accumulate.
526
527 The default eviction period is 10 minutes. One may set the environment
528 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
529
530 The default maximum number of channels is 256. One may set the
531 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
532 this.
533
534 Args:
535 request_iterator: An iterator that yields request values for the RPC.
536 target: The server address.
537 method: The name of the RPC method.
538 request_serializer: Optional :term:`serializer` for serializing the request
539 message. Request goes unserialized in case None is passed.
540 response_deserializer: Optional :term:`deserializer` for deserializing the response
541 message. Response goes undeserialized in case None is passed.
542 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
543 runtime) to configure the channel.
544 channel_credentials: A credential applied to the whole channel, e.g. the
545 return value of grpc.ssl_channel_credentials().
546 call_credentials: A call credential applied to each call individually,
547 e.g. the output of grpc.metadata_call_credentials() or
548 grpc.access_token_call_credentials().
549 insecure: If True, specifies channel_credentials as
550 :term:`grpc.insecure_channel_credentials()`. This option is mutually
551 exclusive with the `channel_credentials` option.
552 compression: An optional value indicating the compression method to be
553 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
554 wait_for_ready: An optional flag indicating whether the RPC should fail
555 immediately if the connection is not ready at the time the RPC is
556 invoked, or if it should wait until the connection to the server
557 becomes ready. When using this option, the user will likely also want
558 to set a timeout. Defaults to True.
559 timeout: An optional duration of time in seconds to allow for the RPC,
560 after which an exception will be raised. If timeout is unspecified,
561 defaults to a timeout controlled by the
562 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
563 unset, defaults to 60 seconds. Supply a value of None to indicate that
564 no timeout should be enforced.
565 metadata: Optional metadata to send to the server.
566
567 Returns:
568 An iterator of responses.
569 """
570 channel, method_handle = ChannelCache.get().get_channel(
571 target,
572 options,
573 channel_credentials,
574 insecure,
575 compression,
576 method,
577 _registered_method,
578 )
579 multicallable = channel.stream_stream(
580 method, request_serializer, response_deserializer, method_handle
581 )
582 wait_for_ready = wait_for_ready if wait_for_ready is not None else True
583 return multicallable(
584 request_iterator,
585 metadata=metadata,
586 wait_for_ready=wait_for_ready,
587 credentials=call_credentials,
588 timeout=timeout,
589 )