Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_typing.py: 86%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Common types for gRPC Sync API"""
16from typing import (
17 TYPE_CHECKING,
18 Any,
19 Callable,
20 Iterable,
21 Iterator,
22 Optional,
23 Sequence,
24 Tuple,
25 TypeVar,
26 Union,
27)
29from grpc._cython import cygrpc
31if TYPE_CHECKING:
32 from grpc import ServicerContext
33 from grpc._server import _RPCState
35RequestType = TypeVar("RequestType")
36ResponseType = TypeVar("ResponseType")
37SerializingFunction = Callable[[Any], bytes]
38DeserializingFunction = Callable[[bytes], Any]
39MetadataType = Sequence[Tuple[str, Union[str, bytes]]]
40ChannelArgumentType = Tuple[str, Any]
41DoneCallbackType = Callable[[Any], None]
42NullaryCallbackType = Callable[[], None]
43RequestIterableType = Iterable[Any]
44ResponseIterableType = Iterable[Any]
45UserTag = Callable[[cygrpc.BaseEvent], bool]
46IntegratedCallFactory = Callable[
47 [
48 int,
49 bytes,
50 Optional[str],
51 Optional[float],
52 Optional[MetadataType],
53 Optional[cygrpc.CallCredentials],
54 Sequence[Sequence[cygrpc.Operation]],
55 UserTag,
56 Any,
57 Optional[int],
58 ],
59 cygrpc.IntegratedCall,
60]
61ServerTagCallbackType = Tuple[
62 Optional["_RPCState"], Sequence[NullaryCallbackType]
63]
64ServerCallbackTag = Callable[[cygrpc.BaseEvent], ServerTagCallbackType]
65ArityAgnosticMethodHandler = Union[
66 Callable[
67 [RequestType, "ServicerContext", Callable[[ResponseType], None]],
68 ResponseType,
69 ],
70 Callable[
71 [RequestType, "ServicerContext", Callable[[ResponseType], None]],
72 Iterator[ResponseType],
73 ],
74 Callable[
75 [
76 Iterator[RequestType],
77 "ServicerContext",
78 Callable[[ResponseType], None],
79 ],
80 ResponseType,
81 ],
82 Callable[
83 [
84 Iterator[RequestType],
85 "ServicerContext",
86 Callable[[ResponseType], None],
87 ],
88 Iterator[ResponseType],
89 ],
90 Callable[[RequestType, "ServicerContext"], ResponseType],
91 Callable[[RequestType, "ServicerContext"], Iterator[ResponseType]],
92 Callable[[Iterator[RequestType], "ServicerContext"], ResponseType],
93 Callable[
94 [Iterator[RequestType], "ServicerContext"], Iterator[ResponseType]
95 ],
96]