Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_abnf.py: 32%
225 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1import array
2import os
3import struct
4import sys
6from ._exceptions import *
7from ._utils import validate_utf8
8from threading import Lock
10"""
11_abnf.py
12websocket - WebSocket client library for Python
14Copyright 2022 engn33r
16Licensed under the Apache License, Version 2.0 (the "License");
17you may not use this file except in compliance with the License.
18You may obtain a copy of the License at
20 http://www.apache.org/licenses/LICENSE-2.0
22Unless required by applicable law or agreed to in writing, software
23distributed under the License is distributed on an "AS IS" BASIS,
24WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25See the License for the specific language governing permissions and
26limitations under the License.
27"""
29try:
30 # If wsaccel is available, use compiled routines to mask data.
31 # wsaccel only provides around a 10% speed boost compared
32 # to the websocket-client _mask() implementation.
33 # Note that wsaccel is unmaintained.
34 from wsaccel.xormask import XorMaskerSimple
36 def _mask(_m, _d) -> bytes:
37 return XorMaskerSimple(_m).process(_d)
39except ImportError:
40 # wsaccel is not available, use websocket-client _mask()
41 native_byteorder = sys.byteorder
43 def _mask(mask_value: int, data_value: int) -> bytes:
44 datalen = len(data_value)
45 data_value = int.from_bytes(data_value, native_byteorder)
46 mask_value = int.from_bytes(mask_value * (datalen // 4) + mask_value[: datalen % 4], native_byteorder)
47 return (data_value ^ mask_value).to_bytes(datalen, native_byteorder)
50__all__ = [
51 'ABNF', 'continuous_frame', 'frame_buffer',
52 'STATUS_NORMAL',
53 'STATUS_GOING_AWAY',
54 'STATUS_PROTOCOL_ERROR',
55 'STATUS_UNSUPPORTED_DATA_TYPE',
56 'STATUS_STATUS_NOT_AVAILABLE',
57 'STATUS_ABNORMAL_CLOSED',
58 'STATUS_INVALID_PAYLOAD',
59 'STATUS_POLICY_VIOLATION',
60 'STATUS_MESSAGE_TOO_BIG',
61 'STATUS_INVALID_EXTENSION',
62 'STATUS_UNEXPECTED_CONDITION',
63 'STATUS_BAD_GATEWAY',
64 'STATUS_TLS_HANDSHAKE_ERROR',
65]
67# closing frame status codes.
68STATUS_NORMAL = 1000
69STATUS_GOING_AWAY = 1001
70STATUS_PROTOCOL_ERROR = 1002
71STATUS_UNSUPPORTED_DATA_TYPE = 1003
72STATUS_STATUS_NOT_AVAILABLE = 1005
73STATUS_ABNORMAL_CLOSED = 1006
74STATUS_INVALID_PAYLOAD = 1007
75STATUS_POLICY_VIOLATION = 1008
76STATUS_MESSAGE_TOO_BIG = 1009
77STATUS_INVALID_EXTENSION = 1010
78STATUS_UNEXPECTED_CONDITION = 1011
79STATUS_SERVICE_RESTART = 1012
80STATUS_TRY_AGAIN_LATER = 1013
81STATUS_BAD_GATEWAY = 1014
82STATUS_TLS_HANDSHAKE_ERROR = 1015
84VALID_CLOSE_STATUS = (
85 STATUS_NORMAL,
86 STATUS_GOING_AWAY,
87 STATUS_PROTOCOL_ERROR,
88 STATUS_UNSUPPORTED_DATA_TYPE,
89 STATUS_INVALID_PAYLOAD,
90 STATUS_POLICY_VIOLATION,
91 STATUS_MESSAGE_TOO_BIG,
92 STATUS_INVALID_EXTENSION,
93 STATUS_UNEXPECTED_CONDITION,
94 STATUS_SERVICE_RESTART,
95 STATUS_TRY_AGAIN_LATER,
96 STATUS_BAD_GATEWAY,
97)
100class ABNF:
101 """
102 ABNF frame class.
103 See http://tools.ietf.org/html/rfc5234
104 and http://tools.ietf.org/html/rfc6455#section-5.2
105 """
107 # operation code values.
108 OPCODE_CONT = 0x0
109 OPCODE_TEXT = 0x1
110 OPCODE_BINARY = 0x2
111 OPCODE_CLOSE = 0x8
112 OPCODE_PING = 0x9
113 OPCODE_PONG = 0xa
115 # available operation code value tuple
116 OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,
117 OPCODE_PING, OPCODE_PONG)
119 # opcode human readable string
120 OPCODE_MAP = {
121 OPCODE_CONT: "cont",
122 OPCODE_TEXT: "text",
123 OPCODE_BINARY: "binary",
124 OPCODE_CLOSE: "close",
125 OPCODE_PING: "ping",
126 OPCODE_PONG: "pong"
127 }
129 # data length threshold.
130 LENGTH_7 = 0x7e
131 LENGTH_16 = 1 << 16
132 LENGTH_63 = 1 << 63
134 def __init__(self, fin: int = 0, rsv1: int = 0, rsv2: int = 0, rsv3: int = 0,
135 opcode: int = OPCODE_TEXT, mask: int = 1, data: str = "") -> None:
136 """
137 Constructor for ABNF. Please check RFC for arguments.
138 """
139 self.fin = fin
140 self.rsv1 = rsv1
141 self.rsv2 = rsv2
142 self.rsv3 = rsv3
143 self.opcode = opcode
144 self.mask = mask
145 if data is None:
146 data = ""
147 self.data = data
148 self.get_mask_key = os.urandom
150 def validate(self, skip_utf8_validation: bool = False) -> None:
151 """
152 Validate the ABNF frame.
154 Parameters
155 ----------
156 skip_utf8_validation: skip utf8 validation.
157 """
158 if self.rsv1 or self.rsv2 or self.rsv3:
159 raise WebSocketProtocolException("rsv is not implemented, yet")
161 if self.opcode not in ABNF.OPCODES:
162 raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
164 if self.opcode == ABNF.OPCODE_PING and not self.fin:
165 raise WebSocketProtocolException("Invalid ping frame.")
167 if self.opcode == ABNF.OPCODE_CLOSE:
168 l = len(self.data)
169 if not l:
170 return
171 if l == 1 or l >= 126:
172 raise WebSocketProtocolException("Invalid close frame.")
173 if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
174 raise WebSocketProtocolException("Invalid close frame.")
176 code = 256 * self.data[0] + self.data[1]
177 if not self._is_valid_close_status(code):
178 raise WebSocketProtocolException("Invalid close opcode %r", code)
180 @staticmethod
181 def _is_valid_close_status(code: int) -> bool:
182 return code in VALID_CLOSE_STATUS or (3000 <= code < 5000)
184 def __str__(self) -> str:
185 return "fin=" + str(self.fin) \
186 + " opcode=" + str(self.opcode) \
187 + " data=" + str(self.data)
189 @staticmethod
190 def create_frame(data: str, opcode: int, fin: int = 1) -> 'ABNF':
191 """
192 Create frame to send text, binary and other data.
194 Parameters
195 ----------
196 data: <type>
197 data to send. This is string value(byte array).
198 If opcode is OPCODE_TEXT and this value is unicode,
199 data value is converted into unicode string, automatically.
200 opcode: <type>
201 operation code. please see OPCODE_XXX.
202 fin: <type>
203 fin flag. if set to 0, create continue fragmentation.
204 """
205 if opcode == ABNF.OPCODE_TEXT and isinstance(data, str):
206 data = data.encode("utf-8")
207 # mask must be set if send data from client
208 return ABNF(fin, 0, 0, 0, opcode, 1, data)
210 def format(self) -> bytes:
211 """
212 Format this object to string(byte array) to send data to server.
213 """
214 if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]):
215 raise ValueError("not 0 or 1")
216 if self.opcode not in ABNF.OPCODES:
217 raise ValueError("Invalid OPCODE")
218 length = len(self.data)
219 if length >= ABNF.LENGTH_63:
220 raise ValueError("data is too long")
222 frame_header = chr(self.fin << 7 |
223 self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 |
224 self.opcode).encode('latin-1')
225 if length < ABNF.LENGTH_7:
226 frame_header += chr(self.mask << 7 | length).encode('latin-1')
227 elif length < ABNF.LENGTH_16:
228 frame_header += chr(self.mask << 7 | 0x7e).encode('latin-1')
229 frame_header += struct.pack("!H", length)
230 else:
231 frame_header += chr(self.mask << 7 | 0x7f).encode('latin-1')
232 frame_header += struct.pack("!Q", length)
234 if not self.mask:
235 return frame_header + self.data
236 else:
237 mask_key = self.get_mask_key(4)
238 return frame_header + self._get_masked(mask_key)
240 def _get_masked(self, mask_key: bytes) -> bytes:
241 s = ABNF.mask(mask_key, self.data)
243 if isinstance(mask_key, str):
244 mask_key = mask_key.encode('utf-8')
246 return mask_key + s
248 @staticmethod
249 def mask(mask_key: bytes, data: bytes) -> bytes:
250 """
251 Mask or unmask data. Just do xor for each byte
253 Parameters
254 ----------
255 mask_key: bytes or str
256 4 byte mask.
257 data: bytes or str
258 data to mask/unmask.
259 """
260 if data is None:
261 data = ""
263 if isinstance(mask_key, str):
264 mask_key = mask_key.encode('latin-1')
266 if isinstance(data, str):
267 data = data.encode('latin-1')
269 return _mask(array.array("B", mask_key), array.array("B", data))
272class frame_buffer:
273 _HEADER_MASK_INDEX = 5
274 _HEADER_LENGTH_INDEX = 6
276 def __init__(self, recv_fn: int, skip_utf8_validation: bool) -> None:
277 self.recv = recv_fn
278 self.skip_utf8_validation = skip_utf8_validation
279 # Buffers over the packets from the layer beneath until desired amount
280 # bytes of bytes are received.
281 self.recv_buffer = []
282 self.clear()
283 self.lock = Lock()
285 def clear(self) -> None:
286 self.header = None
287 self.length = None
288 self.mask = None
290 def has_received_header(self) -> bool:
291 return self.header is None
293 def recv_header(self) -> None:
294 header = self.recv_strict(2)
295 b1 = header[0]
296 fin = b1 >> 7 & 1
297 rsv1 = b1 >> 6 & 1
298 rsv2 = b1 >> 5 & 1
299 rsv3 = b1 >> 4 & 1
300 opcode = b1 & 0xf
301 b2 = header[1]
302 has_mask = b2 >> 7 & 1
303 length_bits = b2 & 0x7f
305 self.header = (fin, rsv1, rsv2, rsv3, opcode, has_mask, length_bits)
307 def has_mask(self) -> bool or int:
308 if not self.header:
309 return False
310 return self.header[frame_buffer._HEADER_MASK_INDEX]
312 def has_received_length(self) -> bool:
313 return self.length is None
315 def recv_length(self) -> None:
316 bits = self.header[frame_buffer._HEADER_LENGTH_INDEX]
317 length_bits = bits & 0x7f
318 if length_bits == 0x7e:
319 v = self.recv_strict(2)
320 self.length = struct.unpack("!H", v)[0]
321 elif length_bits == 0x7f:
322 v = self.recv_strict(8)
323 self.length = struct.unpack("!Q", v)[0]
324 else:
325 self.length = length_bits
327 def has_received_mask(self) -> bool:
328 return self.mask is None
330 def recv_mask(self) -> None:
331 self.mask = self.recv_strict(4) if self.has_mask() else ""
333 def recv_frame(self) -> ABNF:
335 with self.lock:
336 # Header
337 if self.has_received_header():
338 self.recv_header()
339 (fin, rsv1, rsv2, rsv3, opcode, has_mask, _) = self.header
341 # Frame length
342 if self.has_received_length():
343 self.recv_length()
344 length = self.length
346 # Mask
347 if self.has_received_mask():
348 self.recv_mask()
349 mask = self.mask
351 # Payload
352 payload = self.recv_strict(length)
353 if has_mask:
354 payload = ABNF.mask(mask, payload)
356 # Reset for next frame
357 self.clear()
359 frame = ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload)
360 frame.validate(self.skip_utf8_validation)
362 return frame
364 def recv_strict(self, bufsize: int) -> bytes:
365 shortage = bufsize - sum(map(len, self.recv_buffer))
366 while shortage > 0:
367 # Limit buffer size that we pass to socket.recv() to avoid
368 # fragmenting the heap -- the number of bytes recv() actually
369 # reads is limited by socket buffer and is relatively small,
370 # yet passing large numbers repeatedly causes lots of large
371 # buffers allocated and then shrunk, which results in
372 # fragmentation.
373 bytes_ = self.recv(min(16384, shortage))
374 self.recv_buffer.append(bytes_)
375 shortage -= len(bytes_)
377 unified = b"".join(self.recv_buffer)
379 if shortage == 0:
380 self.recv_buffer = []
381 return unified
382 else:
383 self.recv_buffer = [unified[bufsize:]]
384 return unified[:bufsize]
387class continuous_frame:
389 def __init__(self, fire_cont_frame: bool, skip_utf8_validation: bool) -> None:
390 self.fire_cont_frame = fire_cont_frame
391 self.skip_utf8_validation = skip_utf8_validation
392 self.cont_data = None
393 self.recving_frames = None
395 def validate(self, frame: ABNF) -> None:
396 if not self.recving_frames and frame.opcode == ABNF.OPCODE_CONT:
397 raise WebSocketProtocolException("Illegal frame")
398 if self.recving_frames and \
399 frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
400 raise WebSocketProtocolException("Illegal frame")
402 def add(self, frame: ABNF) -> None:
403 if self.cont_data:
404 self.cont_data[1] += frame.data
405 else:
406 if frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
407 self.recving_frames = frame.opcode
408 self.cont_data = [frame.opcode, frame.data]
410 if frame.fin:
411 self.recving_frames = None
413 def is_fire(self, frame: ABNF) -> bool or int:
414 return frame.fin or self.fire_cont_frame
416 def extract(self, frame: ABNF) -> list:
417 data = self.cont_data
418 self.cont_data = None
419 frame.data = data[1]
420 if not self.fire_cont_frame and data[0] == ABNF.OPCODE_TEXT and not self.skip_utf8_validation and not validate_utf8(frame.data):
421 raise WebSocketPayloadException(
422 "cannot decode: " + repr(frame.data))
424 return [data[0], frame]