Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_typing.py: 0%

21 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2022 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Common types for gRPC Sync API""" 

15 

16from typing import ( 

17 TYPE_CHECKING, 

18 Any, 

19 Callable, 

20 Iterable, 

21 Iterator, 

22 Optional, 

23 Sequence, 

24 Tuple, 

25 TypeVar, 

26 Union, 

27) 

28 

29from grpc._cython import cygrpc 

30 

31if TYPE_CHECKING: 

32 from grpc import ServicerContext 

33 from grpc._server import _RPCState 

34 

35RequestType = TypeVar("RequestType") 

36ResponseType = TypeVar("ResponseType") 

37SerializingFunction = Callable[[Any], bytes] 

38DeserializingFunction = Callable[[bytes], Any] 

39MetadataType = Sequence[Tuple[str, Union[str, bytes]]] 

40ChannelArgumentType = Tuple[str, Any] 

41DoneCallbackType = Callable[[Any], None] 

42NullaryCallbackType = Callable[[], None] 

43RequestIterableType = Iterable[Any] 

44ResponseIterableType = Iterable[Any] 

45UserTag = Callable[[cygrpc.BaseEvent], bool] 

46IntegratedCallFactory = Callable[ 

47 [ 

48 int, 

49 bytes, 

50 None, 

51 Optional[float], 

52 Optional[MetadataType], 

53 Optional[cygrpc.CallCredentials], 

54 Sequence[Sequence[cygrpc.Operation]], 

55 UserTag, 

56 Any, 

57 ], 

58 cygrpc.IntegratedCall, 

59] 

60ServerTagCallbackType = Tuple[ 

61 Optional["_RPCState"], Sequence[NullaryCallbackType] 

62] 

63ServerCallbackTag = Callable[[cygrpc.BaseEvent], ServerTagCallbackType] 

64ArityAgnosticMethodHandler = Union[ 

65 Callable[ 

66 [RequestType, "ServicerContext", Callable[[ResponseType], None]], 

67 ResponseType, 

68 ], 

69 Callable[ 

70 [RequestType, "ServicerContext", Callable[[ResponseType], None]], 

71 Iterator[ResponseType], 

72 ], 

73 Callable[ 

74 [ 

75 Iterator[RequestType], 

76 "ServicerContext", 

77 Callable[[ResponseType], None], 

78 ], 

79 ResponseType, 

80 ], 

81 Callable[ 

82 [ 

83 Iterator[RequestType], 

84 "ServicerContext", 

85 Callable[[ResponseType], None], 

86 ], 

87 Iterator[ResponseType], 

88 ], 

89 Callable[[RequestType, "ServicerContext"], ResponseType], 

90 Callable[[RequestType, "ServicerContext"], Iterator[ResponseType]], 

91 Callable[[Iterator[RequestType], "ServicerContext"], ResponseType], 

92 Callable[ 

93 [Iterator[RequestType], "ServicerContext"], Iterator[ResponseType] 

94 ], 

95]