1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Abstract base classes for Channel objects and Multicallable objects."""
15
16import abc
17from typing import Generic, Optional
18
19import grpc
20
21from . import _base_call
22from ._typing import DeserializingFunction
23from ._typing import MetadataType
24from ._typing import RequestIterableType
25from ._typing import RequestType
26from ._typing import ResponseType
27from ._typing import SerializingFunction
28
29
30class UnaryUnaryMultiCallable(Generic[RequestType, ResponseType], abc.ABC):
31 """Enables asynchronous invocation of a unary-call RPC."""
32
33 @abc.abstractmethod
34 def __call__(
35 self,
36 request: RequestType,
37 *,
38 timeout: Optional[float] = None,
39 metadata: Optional[MetadataType] = None,
40 credentials: Optional[grpc.CallCredentials] = None,
41 wait_for_ready: Optional[bool] = None,
42 compression: Optional[grpc.Compression] = None,
43 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]:
44 """Asynchronously invokes the underlying RPC.
45
46 Args:
47 request: The request value for the RPC.
48 timeout: An optional duration of time in seconds to allow
49 for the RPC.
50 metadata: Optional :term:`metadata` to be transmitted to the
51 service-side of the RPC.
52 credentials: An optional CallCredentials for the RPC. Only valid for
53 secure Channel.
54 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
55 compression: An element of grpc.compression, e.g.
56 grpc.compression.Gzip.
57
58 Returns:
59 A UnaryUnaryCall object.
60
61 Raises:
62 RpcError: Indicates that the RPC terminated with non-OK status. The
63 raised RpcError will also be a Call for the RPC affording the RPC's
64 metadata, status code, and details.
65 """
66
67
68class UnaryStreamMultiCallable(Generic[RequestType, ResponseType], abc.ABC):
69 """Enables asynchronous invocation of a server-streaming RPC."""
70
71 @abc.abstractmethod
72 def __call__(
73 self,
74 request: RequestType,
75 *,
76 timeout: Optional[float] = None,
77 metadata: Optional[MetadataType] = None,
78 credentials: Optional[grpc.CallCredentials] = None,
79 wait_for_ready: Optional[bool] = None,
80 compression: Optional[grpc.Compression] = None,
81 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]:
82 """Asynchronously invokes the underlying RPC.
83
84 Args:
85 request: The request value for the RPC.
86 timeout: An optional duration of time in seconds to allow
87 for the RPC.
88 metadata: Optional :term:`metadata` to be transmitted to the
89 service-side of the RPC.
90 credentials: An optional CallCredentials for the RPC. Only valid for
91 secure Channel.
92 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
93 compression: An element of grpc.compression, e.g.
94 grpc.compression.Gzip.
95
96 Returns:
97 A UnaryStreamCall object.
98
99 Raises:
100 RpcError: Indicates that the RPC terminated with non-OK status. The
101 raised RpcError will also be a Call for the RPC affording the RPC's
102 metadata, status code, and details.
103 """
104
105
106class StreamUnaryMultiCallable(abc.ABC):
107 """Enables asynchronous invocation of a client-streaming RPC."""
108
109 @abc.abstractmethod
110 def __call__(
111 self,
112 request_iterator: Optional[RequestIterableType] = None,
113 timeout: Optional[float] = None,
114 metadata: Optional[MetadataType] = None,
115 credentials: Optional[grpc.CallCredentials] = None,
116 wait_for_ready: Optional[bool] = None,
117 compression: Optional[grpc.Compression] = None,
118 ) -> _base_call.StreamUnaryCall:
119 """Asynchronously invokes the underlying RPC.
120
121 Args:
122 request_iterator: An optional async iterable or iterable of request
123 messages for the RPC.
124 timeout: An optional duration of time in seconds to allow
125 for the RPC.
126 metadata: Optional :term:`metadata` to be transmitted to the
127 service-side of the RPC.
128 credentials: An optional CallCredentials for the RPC. Only valid for
129 secure Channel.
130 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
131 compression: An element of grpc.compression, e.g.
132 grpc.compression.Gzip.
133
134 Returns:
135 A StreamUnaryCall object.
136
137 Raises:
138 RpcError: Indicates that the RPC terminated with non-OK status. The
139 raised RpcError will also be a Call for the RPC affording the RPC's
140 metadata, status code, and details.
141 """
142
143
144class StreamStreamMultiCallable(abc.ABC):
145 """Enables asynchronous invocation of a bidirectional-streaming RPC."""
146
147 @abc.abstractmethod
148 def __call__(
149 self,
150 request_iterator: Optional[RequestIterableType] = None,
151 timeout: Optional[float] = None,
152 metadata: Optional[MetadataType] = None,
153 credentials: Optional[grpc.CallCredentials] = None,
154 wait_for_ready: Optional[bool] = None,
155 compression: Optional[grpc.Compression] = None,
156 ) -> _base_call.StreamStreamCall:
157 """Asynchronously invokes the underlying RPC.
158
159 Args:
160 request_iterator: An optional async iterable or iterable of request
161 messages for the RPC.
162 timeout: An optional duration of time in seconds to allow
163 for the RPC.
164 metadata: Optional :term:`metadata` to be transmitted to the
165 service-side of the RPC.
166 credentials: An optional CallCredentials for the RPC. Only valid for
167 secure Channel.
168 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
169 compression: An element of grpc.compression, e.g.
170 grpc.compression.Gzip.
171
172 Returns:
173 A StreamStreamCall object.
174
175 Raises:
176 RpcError: Indicates that the RPC terminated with non-OK status. The
177 raised RpcError will also be a Call for the RPC affording the RPC's
178 metadata, status code, and details.
179 """
180
181
182class Channel(abc.ABC):
183 """Enables asynchronous RPC invocation as a client.
184
185 Channel objects implement the Asynchronous Context Manager (aka. async
186 with) type, although they are not supported to be entered and exited
187 multiple times.
188 """
189
190 @abc.abstractmethod
191 async def __aenter__(self):
192 """Starts an asynchronous context manager.
193
194 Returns:
195 Channel the channel that was instantiated.
196 """
197
198 @abc.abstractmethod
199 async def __aexit__(self, exc_type, exc_val, exc_tb):
200 """Finishes the asynchronous context manager by closing the channel.
201
202 Still active RPCs will be cancelled.
203 """
204
205 @abc.abstractmethod
206 async def close(self, grace: Optional[float] = None):
207 """Closes this Channel and releases all resources held by it.
208
209 This method immediately stops the channel from executing new RPCs in
210 all cases.
211
212 If a grace period is specified, this method waits until all active
213 RPCs are finished or until the grace period is reached. RPCs that haven't
214 been terminated within the grace period are aborted.
215 If a grace period is not specified (by passing None for grace),
216 all existing RPCs are cancelled immediately.
217
218 This method is idempotent.
219 """
220
221 @abc.abstractmethod
222 def get_state(
223 self, try_to_connect: bool = False
224 ) -> grpc.ChannelConnectivity:
225 """Checks the connectivity state of a channel.
226
227 This is an EXPERIMENTAL API.
228
229 If the channel reaches a stable connectivity state, it is guaranteed
230 that the return value of this function will eventually converge to that
231 state.
232
233 Args:
234 try_to_connect: a bool indicate whether the Channel should try to
235 connect to peer or not.
236
237 Returns: A ChannelConnectivity object.
238 """
239
240 @abc.abstractmethod
241 async def wait_for_state_change(
242 self,
243 last_observed_state: grpc.ChannelConnectivity,
244 ) -> None:
245 """Waits for a change in connectivity state.
246
247 This is an EXPERIMENTAL API.
248
249 The function blocks until there is a change in the channel connectivity
250 state from the "last_observed_state". If the state is already
251 different, this function will return immediately.
252
253 There is an inherent race between the invocation of
254 "Channel.wait_for_state_change" and "Channel.get_state". The state can
255 change arbitrary many times during the race, so there is no way to
256 observe every state transition.
257
258 If there is a need to put a timeout for this function, please refer to
259 "asyncio.wait_for".
260
261 Args:
262 last_observed_state: A grpc.ChannelConnectivity object representing
263 the last known state.
264 """
265
266 @abc.abstractmethod
267 async def channel_ready(self) -> None:
268 """Creates a coroutine that blocks until the Channel is READY."""
269
270 @abc.abstractmethod
271 def unary_unary(
272 self,
273 method: str,
274 request_serializer: Optional[SerializingFunction] = None,
275 response_deserializer: Optional[DeserializingFunction] = None,
276 _registered_method: Optional[bool] = False,
277 ) -> UnaryUnaryMultiCallable:
278 """Creates a UnaryUnaryMultiCallable for a unary-unary method.
279
280 Args:
281 method: The name of the RPC method.
282 request_serializer: Optional :term:`serializer` for serializing the request
283 message. Request goes unserialized in case None is passed.
284 response_deserializer: Optional :term:`deserializer` for deserializing the
285 response message. Response goes undeserialized in case None
286 is passed.
287 _registered_method: Implementation Private. Optional: A bool representing
288 whether the method is registered.
289
290 Returns:
291 A UnaryUnaryMultiCallable value for the named unary-unary method.
292 """
293
294 @abc.abstractmethod
295 def unary_stream(
296 self,
297 method: str,
298 request_serializer: Optional[SerializingFunction] = None,
299 response_deserializer: Optional[DeserializingFunction] = None,
300 _registered_method: Optional[bool] = False,
301 ) -> UnaryStreamMultiCallable:
302 """Creates a UnaryStreamMultiCallable for a unary-stream method.
303
304 Args:
305 method: The name of the RPC method.
306 request_serializer: Optional :term:`serializer` for serializing the request
307 message. Request goes unserialized in case None is passed.
308 response_deserializer: Optional :term:`deserializer` for deserializing the
309 response message. Response goes undeserialized in case None
310 is passed.
311 _registered_method: Implementation Private. Optional: A bool representing
312 whether the method is registered.
313
314 Returns:
315 A UnaryStreamMultiCallable value for the named unary-stream method.
316 """
317
318 @abc.abstractmethod
319 def stream_unary(
320 self,
321 method: str,
322 request_serializer: Optional[SerializingFunction] = None,
323 response_deserializer: Optional[DeserializingFunction] = None,
324 _registered_method: Optional[bool] = False,
325 ) -> StreamUnaryMultiCallable:
326 """Creates a StreamUnaryMultiCallable for a stream-unary method.
327
328 Args:
329 method: The name of the RPC method.
330 request_serializer: Optional :term:`serializer` for serializing the request
331 message. Request goes unserialized in case None is passed.
332 response_deserializer: Optional :term:`deserializer` for deserializing the
333 response message. Response goes undeserialized in case None
334 is passed.
335 _registered_method: Implementation Private. Optional: A bool representing
336 whether the method is registered.
337
338 Returns:
339 A StreamUnaryMultiCallable value for the named stream-unary method.
340 """
341
342 @abc.abstractmethod
343 def stream_stream(
344 self,
345 method: str,
346 request_serializer: Optional[SerializingFunction] = None,
347 response_deserializer: Optional[DeserializingFunction] = None,
348 _registered_method: Optional[bool] = False,
349 ) -> StreamStreamMultiCallable:
350 """Creates a StreamStreamMultiCallable for a stream-stream method.
351
352 Args:
353 method: The name of the RPC method.
354 request_serializer: Optional :term:`serializer` for serializing the request
355 message. Request goes unserialized in case None is passed.
356 response_deserializer: Optional :term:`deserializer` for deserializing the
357 response message. Response goes undeserialized in case None
358 is passed.
359 _registered_method: Implementation Private. Optional: A bool representing
360 whether the method is registered.
361
362 Returns:
363 A StreamStreamMultiCallable value for the named stream-stream method.
364 """