Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_abnf.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import array
2import os
3import struct
4import sys
5from threading import Lock
6from typing import Callable, Optional, Union, Any
8from ._exceptions import WebSocketPayloadException, WebSocketProtocolException
9from ._utils import validate_utf8
11"""
12_abnf.py
13websocket - WebSocket client library for Python
15Copyright 2025 engn33r
17Licensed under the Apache License, Version 2.0 (the "License");
18you may not use this file except in compliance with the License.
19You may obtain a copy of the License at
21 http://www.apache.org/licenses/LICENSE-2.0
23Unless required by applicable law or agreed to in writing, software
24distributed under the License is distributed on an "AS IS" BASIS,
25WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26See the License for the specific language governing permissions and
27limitations under the License.
28"""
30try:
31 # If wsaccel is available, use compiled routines to mask data.
32 # wsaccel only provides around a 10% speed boost compared
33 # to the websocket-client _mask() implementation.
34 # Note that wsaccel is unmaintained.
35 from wsaccel.xormask import XorMaskerSimple
37 def _mask(mask_value: array.array, data_value: array.array) -> bytes:
38 mask_result: bytes = XorMaskerSimple(mask_value).process(data_value)
39 return mask_result
41except ImportError:
42 # wsaccel is not available, use websocket-client _mask()
43 native_byteorder = sys.byteorder
45 def _mask(mask_value: array.array, data_value: array.array) -> bytes:
46 datalen = len(data_value)
47 int_data_value = int.from_bytes(data_value, native_byteorder)
48 int_mask_value = int.from_bytes(
49 mask_value * (datalen // 4) + mask_value[: datalen % 4], native_byteorder
50 )
51 return (int_data_value ^ int_mask_value).to_bytes(datalen, native_byteorder)
54__all__ = [
55 "ABNF",
56 "continuous_frame",
57 "frame_buffer",
58 "STATUS_NORMAL",
59 "STATUS_GOING_AWAY",
60 "STATUS_PROTOCOL_ERROR",
61 "STATUS_UNSUPPORTED_DATA_TYPE",
62 "STATUS_STATUS_NOT_AVAILABLE",
63 "STATUS_ABNORMAL_CLOSED",
64 "STATUS_INVALID_PAYLOAD",
65 "STATUS_POLICY_VIOLATION",
66 "STATUS_MESSAGE_TOO_BIG",
67 "STATUS_INVALID_EXTENSION",
68 "STATUS_UNEXPECTED_CONDITION",
69 "STATUS_BAD_GATEWAY",
70 "STATUS_TLS_HANDSHAKE_ERROR",
71]
73# closing frame status codes.
74STATUS_NORMAL = 1000
75STATUS_GOING_AWAY = 1001
76STATUS_PROTOCOL_ERROR = 1002
77STATUS_UNSUPPORTED_DATA_TYPE = 1003
78STATUS_STATUS_NOT_AVAILABLE = 1005
79STATUS_ABNORMAL_CLOSED = 1006
80STATUS_INVALID_PAYLOAD = 1007
81STATUS_POLICY_VIOLATION = 1008
82STATUS_MESSAGE_TOO_BIG = 1009
83STATUS_INVALID_EXTENSION = 1010
84STATUS_UNEXPECTED_CONDITION = 1011
85STATUS_SERVICE_RESTART = 1012
86STATUS_TRY_AGAIN_LATER = 1013
87STATUS_BAD_GATEWAY = 1014
88STATUS_TLS_HANDSHAKE_ERROR = 1015
90VALID_CLOSE_STATUS = (
91 STATUS_NORMAL,
92 STATUS_GOING_AWAY,
93 STATUS_PROTOCOL_ERROR,
94 STATUS_UNSUPPORTED_DATA_TYPE,
95 STATUS_INVALID_PAYLOAD,
96 STATUS_POLICY_VIOLATION,
97 STATUS_MESSAGE_TOO_BIG,
98 STATUS_INVALID_EXTENSION,
99 STATUS_UNEXPECTED_CONDITION,
100 STATUS_SERVICE_RESTART,
101 STATUS_TRY_AGAIN_LATER,
102 STATUS_BAD_GATEWAY,
103)
106class ABNF:
107 """
108 ABNF frame class.
109 See http://tools.ietf.org/html/rfc5234
110 and http://tools.ietf.org/html/rfc6455#section-5.2
111 """
113 # operation code values.
114 OPCODE_CONT = 0x0
115 OPCODE_TEXT = 0x1
116 OPCODE_BINARY = 0x2
117 OPCODE_CLOSE = 0x8
118 OPCODE_PING = 0x9
119 OPCODE_PONG = 0xA
121 # available operation code value tuple
122 OPCODES = (
123 OPCODE_CONT,
124 OPCODE_TEXT,
125 OPCODE_BINARY,
126 OPCODE_CLOSE,
127 OPCODE_PING,
128 OPCODE_PONG,
129 )
131 # opcode human readable string
132 OPCODE_MAP = {
133 OPCODE_CONT: "cont",
134 OPCODE_TEXT: "text",
135 OPCODE_BINARY: "binary",
136 OPCODE_CLOSE: "close",
137 OPCODE_PING: "ping",
138 OPCODE_PONG: "pong",
139 }
141 # data length threshold.
142 LENGTH_7 = 0x7E
143 LENGTH_16 = 1 << 16
144 LENGTH_63 = 1 << 63
146 def __init__(
147 self,
148 fin: int = 0,
149 rsv1: int = 0,
150 rsv2: int = 0,
151 rsv3: int = 0,
152 opcode: int = OPCODE_TEXT,
153 mask_value: int = 1,
154 data: Optional[Union[str, bytes]] = "",
155 ) -> None:
156 """
157 Constructor for ABNF. Please check RFC for arguments.
158 """
159 self.fin = fin
160 self.rsv1 = rsv1
161 self.rsv2 = rsv2
162 self.rsv3 = rsv3
163 self.opcode = opcode
164 self.mask_value = mask_value
165 if data is None:
166 data = ""
167 self.data = data
168 self.get_mask_key = os.urandom
170 def validate(self, skip_utf8_validation: bool = False) -> None:
171 """
172 Validate the ABNF frame.
174 Parameters
175 ----------
176 skip_utf8_validation: skip utf8 validation.
177 """
178 if self.rsv1 or self.rsv2 or self.rsv3:
179 raise WebSocketProtocolException("rsv is not implemented, yet")
181 if self.opcode not in ABNF.OPCODES:
182 raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
184 if self.opcode == ABNF.OPCODE_PING and not self.fin:
185 raise WebSocketProtocolException("Invalid ping frame.")
187 if self.opcode == ABNF.OPCODE_CLOSE:
188 data_length = len(self.data)
189 if not data_length:
190 return
191 if data_length == 1 or data_length >= 126:
192 raise WebSocketProtocolException("Invalid close frame.")
193 if (
194 data_length > 2
195 and not skip_utf8_validation
196 and not validate_utf8(self.data[2:])
197 ):
198 raise WebSocketProtocolException("Invalid close frame.")
200 data_bytes = (
201 self.data[:2]
202 if isinstance(self.data, bytes)
203 else self.data[:2].encode("utf-8")
204 )
205 code = struct.unpack("!H", data_bytes)[0]
206 if not self._is_valid_close_status(code):
207 raise WebSocketProtocolException("Invalid close opcode %r", code)
209 @staticmethod
210 def _is_valid_close_status(code: int) -> bool:
211 return code in VALID_CLOSE_STATUS or (3000 <= code < 5000)
213 def __str__(self) -> str:
214 data_repr = self.data if isinstance(self.data, str) else repr(self.data)
215 return f"fin={self.fin} opcode={self.opcode} data={data_repr}"
217 @staticmethod
218 def create_frame(data: Union[bytes, str], opcode: int, fin: int = 1) -> "ABNF":
219 """
220 Create frame to send text, binary and other data.
222 Parameters
223 ----------
224 data: str
225 data to send. This is string value(byte array).
226 If opcode is OPCODE_TEXT and this value is unicode,
227 data value is converted into unicode string, automatically.
228 opcode: int
229 operation code. please see OPCODE_MAP.
230 fin: int
231 fin flag. if set to 0, create continue fragmentation.
232 """
233 if opcode == ABNF.OPCODE_TEXT and isinstance(data, str):
234 data = data.encode("utf-8")
235 # mask must be set if send data from client
236 return ABNF(fin, 0, 0, 0, opcode, 1, data)
238 def format(self) -> bytes:
239 """
240 Format this object to string(byte array) to send data to server.
241 """
242 if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]):
243 raise ValueError("not 0 or 1")
244 if self.opcode not in ABNF.OPCODES:
245 raise ValueError("Invalid OPCODE")
246 length = len(self.data)
247 if length >= ABNF.LENGTH_63:
248 raise ValueError("data is too long")
250 frame_header = chr(
251 self.fin << 7
252 | self.rsv1 << 6
253 | self.rsv2 << 5
254 | self.rsv3 << 4
255 | self.opcode
256 ).encode("latin-1")
257 if length < ABNF.LENGTH_7:
258 frame_header += chr(self.mask_value << 7 | length).encode("latin-1")
259 elif length < ABNF.LENGTH_16:
260 frame_header += chr(self.mask_value << 7 | 0x7E).encode("latin-1")
261 frame_header += struct.pack("!H", length)
262 else:
263 frame_header += chr(self.mask_value << 7 | 0x7F).encode("latin-1")
264 frame_header += struct.pack("!Q", length)
266 if not self.mask_value:
267 if isinstance(self.data, str):
268 self.data = self.data.encode("utf-8")
269 return frame_header + self.data
270 mask_key = self.get_mask_key(4)
271 return frame_header + self._get_masked(mask_key)
273 def _get_masked(self, mask_key: Union[str, bytes]) -> bytes:
274 s = ABNF.mask(mask_key, self.data)
276 if isinstance(mask_key, str):
277 mask_key = mask_key.encode("utf-8")
279 return mask_key + s
281 @staticmethod
282 def mask(mask_key: Union[str, bytes], data: Union[str, bytes]) -> bytes:
283 """
284 Mask or unmask data. Just do xor for each byte
286 Parameters
287 ----------
288 mask_key: bytes or str
289 4 byte mask.
290 data: bytes or str
291 data to mask/unmask.
292 """
293 if data is None:
294 data = ""
296 if isinstance(mask_key, str):
297 mask_key = mask_key.encode("latin-1")
299 if isinstance(data, str):
300 data = data.encode("latin-1")
302 return _mask(array.array("B", mask_key), array.array("B", data))
305class frame_buffer:
306 _HEADER_MASK_INDEX = 5
307 _HEADER_LENGTH_INDEX = 6
309 def __init__(
310 self, recv_fn: Callable[[int], int], skip_utf8_validation: bool
311 ) -> None:
312 self.recv = recv_fn
313 self.skip_utf8_validation = skip_utf8_validation
314 # Buffers over the packets from the layer beneath until desired amount
315 # bytes of bytes are received.
316 self.recv_buffer: list = []
317 self.clear()
318 self.lock = Lock()
320 def clear(self) -> None:
321 self.header: Optional[tuple] = None
322 self.length: Optional[int] = None
323 self.mask_value: Optional[Union[bytes, str]] = None
325 def needs_header(self) -> bool:
326 return self.header is None
328 def recv_header(self) -> None:
329 header = self.recv_strict(2)
330 b1 = header[0]
331 fin = b1 >> 7 & 1
332 rsv1 = b1 >> 6 & 1
333 rsv2 = b1 >> 5 & 1
334 rsv3 = b1 >> 4 & 1
335 opcode = b1 & 0xF
336 b2 = header[1]
337 has_mask = b2 >> 7 & 1
338 length_bits = b2 & 0x7F
340 self.header = (fin, rsv1, rsv2, rsv3, opcode, has_mask, length_bits)
342 def has_mask(self) -> Union[bool, int]:
343 if not self.header:
344 return False
345 header_val: int = self.header[frame_buffer._HEADER_MASK_INDEX]
346 return header_val
348 def needs_length(self) -> bool:
349 return self.length is None
351 def recv_length(self) -> None:
352 if self.header is None:
353 raise WebSocketProtocolException("Header not received")
354 bits = self.header[frame_buffer._HEADER_LENGTH_INDEX]
355 length_bits = bits & 0x7F
356 if length_bits == 0x7E:
357 v = self.recv_strict(2)
358 self.length = struct.unpack("!H", v)[0]
359 elif length_bits == 0x7F:
360 v = self.recv_strict(8)
361 self.length = struct.unpack("!Q", v)[0]
362 else:
363 self.length = length_bits
365 def needs_mask(self) -> bool:
366 return self.mask_value is None
368 def recv_mask(self) -> None:
369 self.mask_value = self.recv_strict(4) if self.has_mask() else ""
371 def recv_frame(self) -> ABNF:
372 with self.lock:
373 # Header
374 if self.needs_header():
375 self.recv_header()
376 if self.header is None:
377 raise WebSocketProtocolException("Header not received")
378 (fin, rsv1, rsv2, rsv3, opcode, has_mask, _) = self.header
380 # Frame length
381 if self.needs_length():
382 self.recv_length()
383 length = self.length
385 # Mask
386 if self.needs_mask():
387 self.recv_mask()
388 mask_value = self.mask_value
390 # Payload
391 if length is None:
392 raise WebSocketProtocolException("Length not received")
393 payload = self.recv_strict(length)
394 if has_mask:
395 if mask_value is None:
396 raise WebSocketProtocolException("Mask not received")
397 payload = ABNF.mask(mask_value, payload)
399 # Reset for next frame
400 self.clear()
402 frame = ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload)
403 frame.validate(self.skip_utf8_validation)
405 return frame
407 def recv_strict(self, bufsize: int) -> bytes:
408 if not isinstance(bufsize, int):
409 raise ValueError("bufsize must be an integer")
410 shortage = bufsize - sum(len(buf) for buf in self.recv_buffer)
411 while shortage > 0:
412 # Limit buffer size that we pass to socket.recv() to avoid
413 # fragmenting the heap -- the number of bytes recv() actually
414 # reads is limited by socket buffer and is relatively small,
415 # yet passing large numbers repeatedly causes lots of large
416 # buffers allocated and then shrunk, which results in
417 # fragmentation.
418 bytes_ = self.recv(min(16384, shortage))
419 if isinstance(bytes_, bytes):
420 self.recv_buffer.append(bytes_)
421 shortage -= len(bytes_)
422 else:
423 # Handle case where recv returns int or other type
424 break
426 unified = b"".join(self.recv_buffer)
428 if shortage == 0:
429 self.recv_buffer = []
430 return unified
431 else:
432 self.recv_buffer = [unified[bufsize:]]
433 return unified[:bufsize]
436class continuous_frame:
437 def __init__(self, fire_cont_frame: bool, skip_utf8_validation: bool) -> None:
438 self.fire_cont_frame = fire_cont_frame
439 self.skip_utf8_validation = skip_utf8_validation
440 self.cont_data: Optional[list[Any]] = None
441 self.recving_frames: Optional[int] = None
443 def validate(self, frame: ABNF) -> None:
444 if not self.recving_frames and frame.opcode == ABNF.OPCODE_CONT:
445 raise WebSocketProtocolException("Illegal frame")
446 if self.recving_frames and frame.opcode in (
447 ABNF.OPCODE_TEXT,
448 ABNF.OPCODE_BINARY,
449 ):
450 raise WebSocketProtocolException("Illegal frame")
452 def add(self, frame: ABNF) -> None:
453 if self.cont_data:
454 self.cont_data[1] += frame.data
455 else:
456 if frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
457 self.recving_frames = frame.opcode
458 self.cont_data = [frame.opcode, frame.data]
460 if frame.fin:
461 self.recving_frames = None
463 def is_fire(self, frame: ABNF) -> Union[bool, int]:
464 return frame.fin or self.fire_cont_frame
466 def extract(self, frame: ABNF) -> tuple:
467 data = self.cont_data
468 if data is None:
469 raise WebSocketProtocolException("No continuation data available")
470 self.cont_data = None
471 frame.data = data[1]
472 if (
473 not self.fire_cont_frame
474 and data is not None
475 and data[0] == ABNF.OPCODE_TEXT
476 and not self.skip_utf8_validation
477 and not validate_utf8(frame.data)
478 ):
479 raise WebSocketPayloadException(f"cannot decode: {repr(frame.data)}")
480 if data is None:
481 raise WebSocketProtocolException("No continuation data available")
482 return data[0], frame