1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""gRPC's experimental APIs.
15
16These APIs are subject to be removed during any minor version release.
17"""
18from __future__ import annotations
19
20import copy
21import functools
22import sys
23from typing import Callable, Optional, Union
24import warnings
25
26import grpc
27from grpc._cython import cygrpc as _cygrpc
28
29_EXPERIMENTAL_APIS_USED = set()
30
31
32class ChannelOptions(object):
33 """Indicates a channel option unique to gRPC Python.
34
35 This enumeration is part of an EXPERIMENTAL API.
36
37 Attributes:
38 SingleThreadedUnaryStream: Perform unary-stream RPCs on a single thread.
39 """
40
41 SingleThreadedUnaryStream = "SingleThreadedUnaryStream"
42
43
44class UsageError(Exception):
45 """Raised by the gRPC library to indicate usage not allowed by the API."""
46
47
48# It's important that there be a single insecure credentials object so that its
49# hash is deterministic and can be used for indexing in the simple stubs cache.
50_insecure_channel_credentials = grpc.ChannelCredentials(
51 _cygrpc.channel_credentials_insecure()
52)
53
54
55def insecure_channel_credentials():
56 """Creates a ChannelCredentials for use with an insecure channel.
57
58 THIS IS AN EXPERIMENTAL API.
59 """
60 return _insecure_channel_credentials
61
62
63class ExperimentalApiWarning(Warning):
64 """A warning that an API is experimental."""
65
66
67def _warn_experimental(api_name, stack_offset):
68 if api_name not in _EXPERIMENTAL_APIS_USED:
69 _EXPERIMENTAL_APIS_USED.add(api_name)
70 msg = (
71 "'{}' is an experimental API. It is subject to change or ".format(
72 api_name
73 )
74 + "removal between minor releases. Proceed with caution."
75 )
76 warnings.warn(msg, ExperimentalApiWarning, stacklevel=2 + stack_offset)
77
78
79def experimental_api(f):
80 @functools.wraps(f)
81 def _wrapper(*args, **kwargs):
82 _warn_experimental(f.__name__, 1)
83 return f(*args, **kwargs)
84
85 return _wrapper
86
87
88def wrap_server_method_handler(wrapper, handler):
89 """Wraps the server method handler function.
90
91 The server implementation requires all server handlers being wrapped as
92 RpcMethodHandler objects. This helper function ease the pain of writing
93 server handler wrappers.
94
95 Args:
96 wrapper: A wrapper function that takes in a method handler behavior
97 (the actual function) and returns a wrapped function.
98 handler: A RpcMethodHandler object to be wrapped.
99
100 Returns:
101 A newly created RpcMethodHandler.
102 """
103 if not handler:
104 return None
105
106 if not handler.request_streaming:
107 if not handler.response_streaming:
108 # NOTE(lidiz) _replace is a public API:
109 # https://docs.python.org/dev/library/collections.html
110 return handler._replace(unary_unary=wrapper(handler.unary_unary))
111 return handler._replace(unary_stream=wrapper(handler.unary_stream))
112 if not handler.response_streaming:
113 return handler._replace(stream_unary=wrapper(handler.stream_unary))
114 return handler._replace(stream_stream=wrapper(handler.stream_stream))
115
116
117# A Callable to return in the async case
118# See the `ssl_channel_credentials_with_custom_signer` docstring for more detail on usage.
119PrivateKeySignCancel = Callable[[], None]
120PrivateKeySignatureAlgorithm = _cygrpc.PrivateKeySignatureAlgorithm
121PrivateKeySignOnComplete = Callable[[Union[bytes, Exception]], None]
122
123# See the `ssl_channel_credentials_with_custom_signer` docstring for more detail on usage.
124# The custom signing function for a user to implement and pass to gRPC Python.
125CustomPrivateKeySign = Callable[
126 [
127 bytes,
128 PrivateKeySignatureAlgorithm,
129 "PrivateKeySignOnComplete",
130 ],
131 Union[bytes, "PrivateKeySignCancel"],
132]
133
134
135@experimental_api
136def ssl_channel_credentials_with_custom_signer(
137 *,
138 private_key_sign_fn: "CustomPrivateKeySign",
139 root_certificates: Optional[bytes] = None,
140 certificate_chain: bytes,
141) -> grpc.ChannelCredentials:
142 """Creates a ChannelCredentials for use with an SSL-enabled Channel with a custom signer.
143
144 THIS IS AN EXPERIMENTAL API.
145 This API will be removed in a future version and combined with `grpc.ssl_channel_credentials`.
146
147 Args:
148 private_key_sign_fn: a function with the signature of
149 `CustomPrivateKeySign`. This function can return synchronously or
150 asynchronously. To return synchronously, return the signed bytes. To
151 return asynchronously, return a callable matching the
152 `PrivateKeySignCancel` signature.This can be a no-op if no cancellation is
153 needed. In the async case, this function must return this callable
154 quickly, then call the passed in `PrivateKeySignOnComplete` when the async
155 signing operation is complete to trigger gRPC to continue the handshake.
156 root_certificates: The PEM-encoded root certificates as a byte string,
157 or None to retrieve them from a default location chosen by gRPC
158 runtime.
159 certificate_chain: The PEM-encoded certificate chain as a byte string
160 to use
161
162 Returns:
163 A ChannelCredentials for use with an SSL-enabled Channel.
164 """
165 return grpc.ChannelCredentials(
166 _cygrpc.SSLChannelCredentials(
167 root_certificates, None, certificate_chain, private_key_sign_fn
168 )
169 )
170
171
172__all__ = (
173 "ChannelOptions",
174 "ExperimentalApiWarning",
175 "UsageError",
176 "insecure_channel_credentials",
177 "ssl_channel_credentials_with_custom_signer",
178 "wrap_server_method_handler",
179)
180
181if sys.version_info > (3, 6):
182 from grpc._simple_stubs import stream_stream
183 from grpc._simple_stubs import stream_unary
184 from grpc._simple_stubs import unary_stream
185 from grpc._simple_stubs import unary_unary
186
187 __all__ += (
188 "stream_stream",
189 "stream_unary",
190 "unary_stream",
191 "unary_unary",
192 )