1import re
2import sys
3from typing import List, Optional, Union
4
5__all__ = ["ReceiveBuffer"]
6
7
8# Operations we want to support:
9# - find next \r\n or \r\n\r\n (\n or \n\n are also acceptable),
10# or wait until there is one
11# - read at-most-N bytes
12# Goals:
13# - on average, do this fast
14# - worst case, do this in O(n) where n is the number of bytes processed
15# Plan:
16# - store bytearray, offset, how far we've searched for a separator token
17# - use the how-far-we've-searched data to avoid rescanning
18# - while doing a stream of uninterrupted processing, advance offset instead
19# of constantly copying
20# WARNING:
21# - I haven't benchmarked or profiled any of this yet.
22#
23# Note that starting in Python 3.4, deleting the initial n bytes from a
24# bytearray is amortized O(n), thanks to some excellent work by Antoine
25# Martin:
26#
27# https://bugs.python.org/issue19087
28#
29# This means that if we only supported 3.4+, we could get rid of the code here
30# involving self._start and self.compress, because it's doing exactly the same
31# thing that bytearray now does internally.
32#
33# BUT unfortunately, we still support 2.7, and reading short segments out of a
34# long buffer MUST be O(bytes read) to avoid DoS issues, so we can't actually
35# delete this code. Yet:
36#
37# https://pythonclock.org/
38#
39# (Two things to double-check first though: make sure PyPy also has the
40# optimization, and benchmark to make sure it's a win, since we do have a
41# slightly clever thing where we delay calling compress() until we've
42# processed a whole event, which could in theory be slightly more efficient
43# than the internal bytearray support.)
44blank_line_regex = re.compile(b"\n\r?\n", re.MULTILINE)
45
46
47class ReceiveBuffer:
48 def __init__(self) -> None:
49 self._data = bytearray()
50 self._next_line_search = 0
51 self._multiple_lines_search = 0
52
53 def __iadd__(self, byteslike: Union[bytes, bytearray]) -> "ReceiveBuffer":
54 self._data += byteslike
55 return self
56
57 def __bool__(self) -> bool:
58 return bool(len(self))
59
60 def __len__(self) -> int:
61 return len(self._data)
62
63 # for @property unprocessed_data
64 def __bytes__(self) -> bytes:
65 return bytes(self._data)
66
67 def _extract(self, count: int) -> bytearray:
68 # extracting an initial slice of the data buffer and return it
69 out = self._data[:count]
70 del self._data[:count]
71
72 self._next_line_search = 0
73 self._multiple_lines_search = 0
74
75 return out
76
77 def maybe_extract_at_most(self, count: int) -> Optional[bytearray]:
78 """
79 Extract a fixed number of bytes from the buffer.
80 """
81 out = self._data[:count]
82 if not out:
83 return None
84
85 return self._extract(count)
86
87 def maybe_extract_next_line(self) -> Optional[bytearray]:
88 """
89 Extract the first line, if it is completed in the buffer.
90 """
91 # Only search in buffer space that we've not already looked at.
92 search_start_index = max(0, self._next_line_search - 1)
93 partial_idx = self._data.find(b"\r\n", search_start_index)
94
95 if partial_idx == -1:
96 self._next_line_search = len(self._data)
97 return None
98
99 # + 2 is to compensate len(b"\r\n")
100 idx = partial_idx + 2
101
102 return self._extract(idx)
103
104 def maybe_extract_lines(self) -> Optional[List[bytearray]]:
105 """
106 Extract everything up to the first blank line, and return a list of lines.
107 """
108 # Handle the case where we have an immediate empty line.
109 if self._data[:1] == b"\n":
110 self._extract(1)
111 return []
112
113 if self._data[:2] == b"\r\n":
114 self._extract(2)
115 return []
116
117 # Only search in buffer space that we've not already looked at.
118 match = blank_line_regex.search(self._data, self._multiple_lines_search)
119 if match is None:
120 self._multiple_lines_search = max(0, len(self._data) - 2)
121 return None
122
123 # Truncate the buffer and return it.
124 idx = match.span(0)[-1]
125 out = self._extract(idx)
126 lines = out.split(b"\n")
127
128 for line in lines:
129 if line.endswith(b"\r"):
130 del line[-1]
131
132 assert lines[-2] == lines[-1] == b""
133
134 del lines[-2:]
135
136 return lines
137
138 # In theory we should wait until `\r\n` before starting to validate
139 # incoming data. However it's interesting to detect (very) invalid data
140 # early given they might not even contain `\r\n` at all (hence only
141 # timeout will get rid of them).
142 # This is not a 100% effective detection but more of a cheap sanity check
143 # allowing for early abort in some useful cases.
144 # This is especially interesting when peer is messing up with HTTPS and
145 # sent us a TLS stream where we were expecting plain HTTP given all
146 # versions of TLS so far start handshake with a 0x16 message type code.
147 def is_next_line_obviously_invalid_request_line(self) -> bool:
148 try:
149 # HTTP header line must not contain non-printable characters
150 # and should not start with a space
151 return self._data[0] < 0x21
152 except IndexError:
153 return False