Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/a2wsgi/asgi_typing.py: 4%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1""" 

2https://asgi.readthedocs.io/en/latest/specs/index.html 

3""" 

4import sys 

5from typing import ( 

6 Any, 

7 Awaitable, 

8 Callable, 

9 Dict, 

10 Iterable, 

11 Literal, 

12 Optional, 

13 Tuple, 

14 TypedDict, 

15 Union, 

16) 

17 

18if sys.version_info >= (3, 11): 

19 from typing import NotRequired 

20else: 

21 from typing_extensions import NotRequired 

22 

23 

24class ASGIVersions(TypedDict): 

25 spec_version: str 

26 version: Literal["3.0"] 

27 

28 

29class HTTPScope(TypedDict): 

30 type: Literal["http"] 

31 asgi: ASGIVersions 

32 http_version: str 

33 method: str 

34 scheme: str 

35 path: str 

36 raw_path: NotRequired[bytes] 

37 query_string: bytes 

38 root_path: str 

39 headers: Iterable[Tuple[bytes, bytes]] 

40 client: NotRequired[Tuple[str, int]] 

41 server: NotRequired[Tuple[str, Optional[int]]] 

42 state: NotRequired[Dict[str, Any]] 

43 extensions: NotRequired[Dict[str, Dict[object, object]]] 

44 

45 

46class WebSocketScope(TypedDict): 

47 type: Literal["websocket"] 

48 asgi: ASGIVersions 

49 http_version: str 

50 scheme: str 

51 path: str 

52 raw_path: bytes 

53 query_string: bytes 

54 root_path: str 

55 headers: Iterable[Tuple[bytes, bytes]] 

56 client: NotRequired[Tuple[str, int]] 

57 server: NotRequired[Tuple[str, Optional[int]]] 

58 subprotocols: Iterable[str] 

59 state: NotRequired[Dict[str, Any]] 

60 extensions: NotRequired[Dict[str, Dict[object, object]]] 

61 

62 

63class LifespanScope(TypedDict): 

64 type: Literal["lifespan"] 

65 asgi: ASGIVersions 

66 state: NotRequired[Dict[str, Any]] 

67 

68 

69WWWScope = Union[HTTPScope, WebSocketScope] 

70Scope = Union[HTTPScope, WebSocketScope, LifespanScope] 

71 

72 

73class HTTPRequestEvent(TypedDict): 

74 type: Literal["http.request"] 

75 body: bytes 

76 more_body: NotRequired[bool] 

77 

78 

79class HTTPResponseStartEvent(TypedDict): 

80 type: Literal["http.response.start"] 

81 status: int 

82 headers: NotRequired[Iterable[Tuple[bytes, bytes]]] 

83 trailers: NotRequired[bool] 

84 

85 

86class HTTPResponseBodyEvent(TypedDict): 

87 type: Literal["http.response.body"] 

88 body: NotRequired[bytes] 

89 more_body: NotRequired[bool] 

90 

91 

92class HTTPDisconnectEvent(TypedDict): 

93 type: Literal["http.disconnect"] 

94 

95 

96class WebSocketConnectEvent(TypedDict): 

97 type: Literal["websocket.connect"] 

98 

99 

100class WebSocketAcceptEvent(TypedDict): 

101 type: Literal["websocket.accept"] 

102 subprotocol: NotRequired[str] 

103 headers: NotRequired[Iterable[Tuple[bytes, bytes]]] 

104 

105 

106class WebSocketReceiveEvent(TypedDict): 

107 type: Literal["websocket.receive"] 

108 bytes: NotRequired[bytes] 

109 text: NotRequired[str] 

110 

111 

112class WebSocketSendEvent(TypedDict): 

113 type: Literal["websocket.send"] 

114 bytes: NotRequired[bytes] 

115 text: NotRequired[str] 

116 

117 

118class WebSocketDisconnectEvent(TypedDict): 

119 type: Literal["websocket.disconnect"] 

120 code: int 

121 

122 

123class WebSocketCloseEvent(TypedDict): 

124 type: Literal["websocket.close"] 

125 code: NotRequired[int] 

126 reason: NotRequired[str] 

127 

128 

129class LifespanStartupEvent(TypedDict): 

130 type: Literal["lifespan.startup"] 

131 

132 

133class LifespanShutdownEvent(TypedDict): 

134 type: Literal["lifespan.shutdown"] 

135 

136 

137class LifespanStartupCompleteEvent(TypedDict): 

138 type: Literal["lifespan.startup.complete"] 

139 

140 

141class LifespanStartupFailedEvent(TypedDict): 

142 type: Literal["lifespan.startup.failed"] 

143 message: str 

144 

145 

146class LifespanShutdownCompleteEvent(TypedDict): 

147 type: Literal["lifespan.shutdown.complete"] 

148 

149 

150class LifespanShutdownFailedEvent(TypedDict): 

151 type: Literal["lifespan.shutdown.failed"] 

152 message: str 

153 

154 

155ReceiveEvent = Union[ 

156 HTTPRequestEvent, 

157 HTTPDisconnectEvent, 

158 WebSocketConnectEvent, 

159 WebSocketReceiveEvent, 

160 WebSocketDisconnectEvent, 

161 LifespanStartupEvent, 

162 LifespanShutdownEvent, 

163] 

164 

165SendEvent = Union[ 

166 HTTPResponseStartEvent, 

167 HTTPResponseBodyEvent, 

168 HTTPDisconnectEvent, 

169 WebSocketAcceptEvent, 

170 WebSocketSendEvent, 

171 WebSocketCloseEvent, 

172 LifespanStartupCompleteEvent, 

173 LifespanStartupFailedEvent, 

174 LifespanShutdownCompleteEvent, 

175 LifespanShutdownFailedEvent, 

176] 

177 

178Receive = Callable[[], Awaitable[ReceiveEvent]] 

179 

180Send = Callable[[SendEvent], Awaitable[None]] 

181 

182ASGIApp = Callable[[Scope, Receive, Send], Awaitable[None]]