Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/_websocket/models.py: 94%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Models for WebSocket protocol versions 13 and 8."""
3import json
4from enum import IntEnum
5from typing import Any, Callable, Final, Literal, NamedTuple, Optional, Union, cast
7WS_DEFLATE_TRAILING: Final[bytes] = bytes([0x00, 0x00, 0xFF, 0xFF])
10class WSCloseCode(IntEnum):
11 OK = 1000
12 GOING_AWAY = 1001
13 PROTOCOL_ERROR = 1002
14 UNSUPPORTED_DATA = 1003
15 ABNORMAL_CLOSURE = 1006
16 INVALID_TEXT = 1007
17 POLICY_VIOLATION = 1008
18 MESSAGE_TOO_BIG = 1009
19 MANDATORY_EXTENSION = 1010
20 INTERNAL_ERROR = 1011
21 SERVICE_RESTART = 1012
22 TRY_AGAIN_LATER = 1013
23 BAD_GATEWAY = 1014
26class WSMsgType(IntEnum):
27 # websocket spec types
28 CONTINUATION = 0x0
29 TEXT = 0x1
30 BINARY = 0x2
31 PING = 0x9
32 PONG = 0xA
33 CLOSE = 0x8
35 # aiohttp specific types
36 CLOSING = 0x100
37 CLOSED = 0x101
38 ERROR = 0x102
41class WSMessageContinuation(NamedTuple):
42 data: bytes
43 size: int
44 extra: Optional[str] = None
45 type: Literal[WSMsgType.CONTINUATION] = WSMsgType.CONTINUATION
48class WSMessageText(NamedTuple):
49 data: str
50 size: int
51 extra: Optional[str] = None
52 type: Literal[WSMsgType.TEXT] = WSMsgType.TEXT
54 def json(
55 self, *, loads: Callable[[Union[str, bytes, bytearray]], Any] = json.loads
56 ) -> Any:
57 """Return parsed JSON data."""
58 return loads(self.data)
61class WSMessageBinary(NamedTuple):
62 data: bytes
63 size: int
64 extra: Optional[str] = None
65 type: Literal[WSMsgType.BINARY] = WSMsgType.BINARY
67 def json(
68 self, *, loads: Callable[[Union[str, bytes, bytearray]], Any] = json.loads
69 ) -> Any:
70 """Return parsed JSON data."""
71 return loads(self.data)
74class WSMessagePing(NamedTuple):
75 data: bytes
76 size: int
77 extra: Optional[str] = None
78 type: Literal[WSMsgType.PING] = WSMsgType.PING
81class WSMessagePong(NamedTuple):
82 data: bytes
83 size: int
84 extra: Optional[str] = None
85 type: Literal[WSMsgType.PONG] = WSMsgType.PONG
88class WSMessageClose(NamedTuple):
89 data: int
90 size: int
91 extra: Optional[str] = None
92 type: Literal[WSMsgType.CLOSE] = WSMsgType.CLOSE
95class WSMessageClosing(NamedTuple):
96 data: None = None
97 size: int = 0
98 extra: Optional[str] = None
99 type: Literal[WSMsgType.CLOSING] = WSMsgType.CLOSING
102class WSMessageClosed(NamedTuple):
103 data: None = None
104 size: int = 0
105 extra: Optional[str] = None
106 type: Literal[WSMsgType.CLOSED] = WSMsgType.CLOSED
109class WSMessageError(NamedTuple):
110 data: BaseException
111 size: int = 0
112 extra: Optional[str] = None
113 type: Literal[WSMsgType.ERROR] = WSMsgType.ERROR
116WSMessage = Union[
117 WSMessageContinuation,
118 WSMessageText,
119 WSMessageBinary,
120 WSMessagePing,
121 WSMessagePong,
122 WSMessageClose,
123 WSMessageClosing,
124 WSMessageClosed,
125 WSMessageError,
126]
128WS_CLOSED_MESSAGE = WSMessageClosed()
129WS_CLOSING_MESSAGE = WSMessageClosing()
132class WebSocketError(Exception):
133 """WebSocket protocol parser error."""
135 def __init__(self, code: int, message: str) -> None:
136 self.code = code
137 super().__init__(code, message)
139 def __str__(self) -> str:
140 return cast(str, self.args[1])
143class WSHandshakeError(Exception):
144 """WebSocket protocol handshake error."""