1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Abstract base classes for server-side classes."""
15
16import abc
17from typing import Generic, Iterable, Mapping, NoReturn, Optional, Sequence
18
19import grpc
20
21from ._metadata import Metadata # pylint: disable=unused-import
22from ._typing import DoneCallbackType
23from ._typing import MetadataType
24from ._typing import RequestType
25from ._typing import ResponseType
26
27
28class Server(abc.ABC):
29 """Serves RPCs."""
30
31 @abc.abstractmethod
32 def add_generic_rpc_handlers(
33 self, generic_rpc_handlers: Sequence[grpc.GenericRpcHandler]
34 ) -> None:
35 """Registers GenericRpcHandlers with this Server.
36
37 This method is only safe to call before the server is started.
38
39 Args:
40 generic_rpc_handlers: A sequence of GenericRpcHandlers that will be
41 used to service RPCs.
42 """
43
44 @abc.abstractmethod
45 def add_insecure_port(self, address: str) -> int:
46 """Opens an insecure port for accepting RPCs.
47
48 A port is a communication endpoint that used by networking protocols,
49 like TCP and UDP. To date, we only support TCP.
50
51 This method may only be called before starting the server.
52
53 Args:
54 address: The address for which to open a port. If the port is 0,
55 or not specified in the address, then the gRPC runtime will choose a port.
56
57 Returns:
58 An integer port on which the server will accept RPC requests.
59 """
60
61 @abc.abstractmethod
62 def add_secure_port(
63 self, address: str, server_credentials: grpc.ServerCredentials
64 ) -> int:
65 """Opens a secure port for accepting RPCs.
66
67 A port is a communication endpoint that used by networking protocols,
68 like TCP and UDP. To date, we only support TCP.
69
70 This method may only be called before starting the server.
71
72 Args:
73 address: The address for which to open a port.
74 if the port is 0, or not specified in the address, then the gRPC
75 runtime will choose a port.
76 server_credentials: A ServerCredentials object.
77
78 Returns:
79 An integer port on which the server will accept RPC requests.
80 """
81
82 @abc.abstractmethod
83 async def start(self) -> None:
84 """Starts this Server.
85
86 This method may only be called once. (i.e. it is not idempotent).
87 """
88
89 @abc.abstractmethod
90 async def stop(self, grace: Optional[float]) -> None:
91 """Stops this Server.
92
93 This method immediately stops the server from servicing new RPCs in
94 all cases.
95
96 If a grace period is specified, this method waits until all active
97 RPCs are finished or until the grace period is reached. RPCs that haven't
98 been terminated within the grace period are aborted.
99 If a grace period is not specified (by passing None for grace), all
100 existing RPCs are aborted immediately and this method blocks until
101 the last RPC handler terminates.
102
103 This method is idempotent and may be called at any time. Passing a
104 smaller grace value in a subsequent call will have the effect of
105 stopping the Server sooner (passing None will have the effect of
106 stopping the server immediately). Passing a larger grace value in a
107 subsequent call will not have the effect of stopping the server later
108 (i.e. the most restrictive grace value is used).
109
110 Args:
111 grace: A duration of time in seconds or None.
112 """
113
114 @abc.abstractmethod
115 async def wait_for_termination(
116 self, timeout: Optional[float] = None
117 ) -> bool:
118 """Continues current coroutine once the server stops.
119
120 This is an EXPERIMENTAL API.
121
122 The wait will not consume computational resources during blocking, and
123 it will block until one of the two following conditions are met:
124
125 1) The server is stopped or terminated;
126 2) A timeout occurs if timeout is not `None`.
127
128 The timeout argument works in the same way as `threading.Event.wait()`.
129 https://docs.python.org/3/library/threading.html#threading.Event.wait
130
131 Args:
132 timeout: A floating point number specifying a timeout for the
133 operation in seconds.
134
135 Returns:
136 A bool indicates if the operation times out.
137 """
138
139 def add_registered_method_handlers( # noqa: B027
140 self, service_name, method_handlers
141 ):
142 """Registers GenericRpcHandlers with this Server.
143
144 This method is only safe to call before the server is started.
145
146 Args:
147 service_name: The service name.
148 method_handlers: A dictionary that maps method names to corresponding
149 RpcMethodHandler.
150 """
151
152
153# pylint: disable=too-many-public-methods
154class ServicerContext(Generic[RequestType, ResponseType], abc.ABC):
155 """A context object passed to method implementations."""
156
157 @abc.abstractmethod
158 async def read(self) -> RequestType:
159 """Reads one message from the RPC.
160
161 Only one read operation is allowed simultaneously.
162
163 Returns:
164 A response message of the RPC.
165
166 Raises:
167 An RpcError exception if the read failed.
168 """
169
170 @abc.abstractmethod
171 async def write(self, message: ResponseType) -> None:
172 """Writes one message to the RPC.
173
174 Only one write operation is allowed simultaneously.
175
176 Raises:
177 An RpcError exception if the write failed.
178 """
179
180 @abc.abstractmethod
181 async def send_initial_metadata(
182 self, initial_metadata: MetadataType
183 ) -> None:
184 """Sends the initial metadata value to the client.
185
186 This method need not be called by implementations if they have no
187 metadata to add to what the gRPC runtime will transmit.
188
189 Args:
190 initial_metadata: The initial :term:`metadata`.
191 """
192
193 @abc.abstractmethod
194 async def abort(
195 self,
196 code: grpc.StatusCode,
197 details: str = "",
198 trailing_metadata: MetadataType = (),
199 ) -> NoReturn:
200 """Raises an exception to terminate the RPC with a non-OK status.
201
202 The code and details passed as arguments will supersede any existing
203 ones.
204
205 Args:
206 code: A StatusCode object to be sent to the client.
207 It must not be StatusCode.OK.
208 details: A UTF-8-encodable string to be sent to the client upon
209 termination of the RPC.
210 trailing_metadata: A sequence of tuple represents the trailing
211 :term:`metadata`.
212
213 Raises:
214 Exception: An exception is always raised to signal the abortion the
215 RPC to the gRPC runtime.
216 """
217
218 @abc.abstractmethod
219 def set_trailing_metadata(self, trailing_metadata: MetadataType) -> None:
220 """Sends the trailing metadata for the RPC.
221
222 This method need not be called by implementations if they have no
223 metadata to add to what the gRPC runtime will transmit.
224
225 Args:
226 trailing_metadata: The trailing :term:`metadata`.
227 """
228
229 @abc.abstractmethod
230 def invocation_metadata(self) -> Optional[MetadataType]:
231 """Accesses the metadata sent by the client.
232
233 Returns:
234 The invocation :term:`metadata`.
235 """
236
237 @abc.abstractmethod
238 def set_code(self, code: grpc.StatusCode) -> None:
239 """Sets the value to be used as status code upon RPC completion.
240
241 This method need not be called by method implementations if they wish
242 the gRPC runtime to determine the status code of the RPC.
243
244 Args:
245 code: A StatusCode object to be sent to the client.
246 """
247
248 @abc.abstractmethod
249 def set_details(self, details: str) -> None:
250 """Sets the value to be used the as detail string upon RPC completion.
251
252 This method need not be called by method implementations if they have
253 no details to transmit.
254
255 Args:
256 details: A UTF-8-encodable string to be sent to the client upon
257 termination of the RPC.
258 """
259
260 @abc.abstractmethod
261 def set_compression(self, compression: grpc.Compression) -> None:
262 """Set the compression algorithm to be used for the entire call.
263
264 Args:
265 compression: An element of grpc.compression, e.g.
266 grpc.compression.Gzip.
267 """
268
269 @abc.abstractmethod
270 def disable_next_message_compression(self) -> None:
271 """Disables compression for the next response message.
272
273 This method will override any compression configuration set during
274 server creation or set on the call.
275 """
276
277 @abc.abstractmethod
278 def peer(self) -> str:
279 """Identifies the peer that invoked the RPC being serviced.
280
281 Returns:
282 A string identifying the peer that invoked the RPC being serviced.
283 The string format is determined by gRPC runtime.
284 """
285
286 @abc.abstractmethod
287 def peer_identities(self) -> Optional[Iterable[bytes]]:
288 """Gets one or more peer identity(s).
289
290 Equivalent to
291 servicer_context.auth_context().get(servicer_context.peer_identity_key())
292
293 Returns:
294 An iterable of the identities, or None if the call is not
295 authenticated. Each identity is returned as a raw bytes type.
296 """
297
298 @abc.abstractmethod
299 def peer_identity_key(self) -> Optional[str]:
300 """The auth property used to identify the peer.
301
302 For example, "x509_common_name" or "x509_subject_alternative_name" are
303 used to identify an SSL peer.
304
305 Returns:
306 The auth property (string) that indicates the
307 peer identity, or None if the call is not authenticated.
308 """
309
310 @abc.abstractmethod
311 def auth_context(self) -> Mapping[str, Iterable[bytes]]:
312 """Gets the auth context for the call.
313
314 Returns:
315 A map of strings to an iterable of bytes for each auth property.
316 """
317
318 def time_remaining(self) -> float:
319 """Describes the length of allowed time remaining for the RPC.
320
321 Returns:
322 A nonnegative float indicating the length of allowed time in seconds
323 remaining for the RPC to complete before it is considered to have
324 timed out, or None if no deadline was specified for the RPC.
325 """
326
327 def trailing_metadata(self):
328 """Access value to be used as trailing metadata upon RPC completion.
329
330 This is an EXPERIMENTAL API.
331
332 Returns:
333 The trailing :term:`metadata` for the RPC.
334 """
335 raise NotImplementedError()
336
337 def code(self):
338 """Accesses the value to be used as status code upon RPC completion.
339
340 This is an EXPERIMENTAL API.
341
342 Returns:
343 The StatusCode value for the RPC.
344 """
345 raise NotImplementedError()
346
347 def details(self):
348 """Accesses the value to be used as detail string upon RPC completion.
349
350 This is an EXPERIMENTAL API.
351
352 Returns:
353 The details string of the RPC.
354 """
355 raise NotImplementedError()
356
357 def add_done_callback(self, callback: DoneCallbackType) -> None:
358 """Registers a callback to be called on RPC termination.
359
360 This is an EXPERIMENTAL API.
361
362 Args:
363 callback: A callable object will be called with the servicer context
364 object as its only argument.
365 """
366
367 def cancelled(self) -> bool:
368 """Return True if the RPC is cancelled.
369
370 The RPC is cancelled when the cancellation was requested with cancel().
371
372 This is an EXPERIMENTAL API.
373
374 Returns:
375 A bool indicates whether the RPC is cancelled or not.
376 """
377
378 def done(self) -> bool:
379 """Return True if the RPC is done.
380
381 An RPC is done if the RPC is completed, cancelled or aborted.
382
383 This is an EXPERIMENTAL API.
384
385 Returns:
386 A bool indicates if the RPC is done.
387 """