Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/a2wsgi/asgi_typing.py: 4%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2https://asgi.readthedocs.io/en/latest/specs/index.html
3"""
4import sys
5from typing import (
6 Any,
7 Awaitable,
8 Callable,
9 Dict,
10 Iterable,
11 Literal,
12 Optional,
13 Tuple,
14 TypedDict,
15 Union,
16)
18if sys.version_info >= (3, 11):
19 from typing import NotRequired
20else:
21 from typing_extensions import NotRequired
24class ASGIVersions(TypedDict):
25 spec_version: str
26 version: Literal["3.0"]
29class HTTPScope(TypedDict):
30 type: Literal["http"]
31 asgi: ASGIVersions
32 http_version: str
33 method: str
34 scheme: str
35 path: str
36 raw_path: NotRequired[bytes]
37 query_string: bytes
38 root_path: str
39 headers: Iterable[Tuple[bytes, bytes]]
40 client: NotRequired[Tuple[str, int]]
41 server: NotRequired[Tuple[str, Optional[int]]]
42 state: NotRequired[Dict[str, Any]]
43 extensions: NotRequired[Dict[str, Dict[object, object]]]
46class WebSocketScope(TypedDict):
47 type: Literal["websocket"]
48 asgi: ASGIVersions
49 http_version: str
50 scheme: str
51 path: str
52 raw_path: bytes
53 query_string: bytes
54 root_path: str
55 headers: Iterable[Tuple[bytes, bytes]]
56 client: NotRequired[Tuple[str, int]]
57 server: NotRequired[Tuple[str, Optional[int]]]
58 subprotocols: Iterable[str]
59 state: NotRequired[Dict[str, Any]]
60 extensions: NotRequired[Dict[str, Dict[object, object]]]
63class LifespanScope(TypedDict):
64 type: Literal["lifespan"]
65 asgi: ASGIVersions
66 state: NotRequired[Dict[str, Any]]
69WWWScope = Union[HTTPScope, WebSocketScope]
70Scope = Union[HTTPScope, WebSocketScope, LifespanScope]
73class HTTPRequestEvent(TypedDict):
74 type: Literal["http.request"]
75 body: bytes
76 more_body: NotRequired[bool]
79class HTTPResponseStartEvent(TypedDict):
80 type: Literal["http.response.start"]
81 status: int
82 headers: NotRequired[Iterable[Tuple[bytes, bytes]]]
83 trailers: NotRequired[bool]
86class HTTPResponseBodyEvent(TypedDict):
87 type: Literal["http.response.body"]
88 body: NotRequired[bytes]
89 more_body: NotRequired[bool]
92class HTTPDisconnectEvent(TypedDict):
93 type: Literal["http.disconnect"]
96class WebSocketConnectEvent(TypedDict):
97 type: Literal["websocket.connect"]
100class WebSocketAcceptEvent(TypedDict):
101 type: Literal["websocket.accept"]
102 subprotocol: NotRequired[str]
103 headers: NotRequired[Iterable[Tuple[bytes, bytes]]]
106class WebSocketReceiveEvent(TypedDict):
107 type: Literal["websocket.receive"]
108 bytes: NotRequired[bytes]
109 text: NotRequired[str]
112class WebSocketSendEvent(TypedDict):
113 type: Literal["websocket.send"]
114 bytes: NotRequired[bytes]
115 text: NotRequired[str]
118class WebSocketDisconnectEvent(TypedDict):
119 type: Literal["websocket.disconnect"]
120 code: int
123class WebSocketCloseEvent(TypedDict):
124 type: Literal["websocket.close"]
125 code: NotRequired[int]
126 reason: NotRequired[str]
129class LifespanStartupEvent(TypedDict):
130 type: Literal["lifespan.startup"]
133class LifespanShutdownEvent(TypedDict):
134 type: Literal["lifespan.shutdown"]
137class LifespanStartupCompleteEvent(TypedDict):
138 type: Literal["lifespan.startup.complete"]
141class LifespanStartupFailedEvent(TypedDict):
142 type: Literal["lifespan.startup.failed"]
143 message: str
146class LifespanShutdownCompleteEvent(TypedDict):
147 type: Literal["lifespan.shutdown.complete"]
150class LifespanShutdownFailedEvent(TypedDict):
151 type: Literal["lifespan.shutdown.failed"]
152 message: str
155ReceiveEvent = Union[
156 HTTPRequestEvent,
157 HTTPDisconnectEvent,
158 WebSocketConnectEvent,
159 WebSocketReceiveEvent,
160 WebSocketDisconnectEvent,
161 LifespanStartupEvent,
162 LifespanShutdownEvent,
163]
165SendEvent = Union[
166 HTTPResponseStartEvent,
167 HTTPResponseBodyEvent,
168 HTTPDisconnectEvent,
169 WebSocketAcceptEvent,
170 WebSocketSendEvent,
171 WebSocketCloseEvent,
172 LifespanStartupCompleteEvent,
173 LifespanStartupFailedEvent,
174 LifespanShutdownCompleteEvent,
175 LifespanShutdownFailedEvent,
176]
178Receive = Callable[[], Awaitable[ReceiveEvent]]
180Send = Callable[[SendEvent], Awaitable[None]]
182ASGIApp = Callable[[Scope, Receive, Send], Awaitable[None]]