Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/h11/_receivebuffer.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

63 statements  

1import re 

2import sys 

3from typing import List, Optional, Union 

4 

5__all__ = ["ReceiveBuffer"] 

6 

7 

8# Operations we want to support: 

9# - find next \r\n or \r\n\r\n (\n or \n\n are also acceptable), 

10# or wait until there is one 

11# - read at-most-N bytes 

12# Goals: 

13# - on average, do this fast 

14# - worst case, do this in O(n) where n is the number of bytes processed 

15# Plan: 

16# - store bytearray, offset, how far we've searched for a separator token 

17# - use the how-far-we've-searched data to avoid rescanning 

18# - while doing a stream of uninterrupted processing, advance offset instead 

19# of constantly copying 

20# WARNING: 

21# - I haven't benchmarked or profiled any of this yet. 

22# 

23# Note that starting in Python 3.4, deleting the initial n bytes from a 

24# bytearray is amortized O(n), thanks to some excellent work by Antoine 

25# Martin: 

26# 

27# https://bugs.python.org/issue19087 

28# 

29# This means that if we only supported 3.4+, we could get rid of the code here 

30# involving self._start and self.compress, because it's doing exactly the same 

31# thing that bytearray now does internally. 

32# 

33# BUT unfortunately, we still support 2.7, and reading short segments out of a 

34# long buffer MUST be O(bytes read) to avoid DoS issues, so we can't actually 

35# delete this code. Yet: 

36# 

37# https://pythonclock.org/ 

38# 

39# (Two things to double-check first though: make sure PyPy also has the 

40# optimization, and benchmark to make sure it's a win, since we do have a 

41# slightly clever thing where we delay calling compress() until we've 

42# processed a whole event, which could in theory be slightly more efficient 

43# than the internal bytearray support.) 

44blank_line_regex = re.compile(b"\n\r?\n", re.MULTILINE) 

45 

46 

47class ReceiveBuffer: 

48 def __init__(self) -> None: 

49 self._data = bytearray() 

50 self._next_line_search = 0 

51 self._multiple_lines_search = 0 

52 

53 def __iadd__(self, byteslike: Union[bytes, bytearray]) -> "ReceiveBuffer": 

54 self._data += byteslike 

55 return self 

56 

57 def __bool__(self) -> bool: 

58 return bool(len(self)) 

59 

60 def __len__(self) -> int: 

61 return len(self._data) 

62 

63 # for @property unprocessed_data 

64 def __bytes__(self) -> bytes: 

65 return bytes(self._data) 

66 

67 def _extract(self, count: int) -> bytearray: 

68 # extracting an initial slice of the data buffer and return it 

69 out = self._data[:count] 

70 del self._data[:count] 

71 

72 self._next_line_search = 0 

73 self._multiple_lines_search = 0 

74 

75 return out 

76 

77 def maybe_extract_at_most(self, count: int) -> Optional[bytearray]: 

78 """ 

79 Extract a fixed number of bytes from the buffer. 

80 """ 

81 out = self._data[:count] 

82 if not out: 

83 return None 

84 

85 return self._extract(count) 

86 

87 def maybe_extract_next_line(self) -> Optional[bytearray]: 

88 """ 

89 Extract the first line, if it is completed in the buffer. 

90 """ 

91 # Only search in buffer space that we've not already looked at. 

92 search_start_index = max(0, self._next_line_search - 1) 

93 partial_idx = self._data.find(b"\r\n", search_start_index) 

94 

95 if partial_idx == -1: 

96 self._next_line_search = len(self._data) 

97 return None 

98 

99 # + 2 is to compensate len(b"\r\n") 

100 idx = partial_idx + 2 

101 

102 return self._extract(idx) 

103 

104 def maybe_extract_lines(self) -> Optional[List[bytearray]]: 

105 """ 

106 Extract everything up to the first blank line, and return a list of lines. 

107 """ 

108 # Handle the case where we have an immediate empty line. 

109 if self._data[:1] == b"\n": 

110 self._extract(1) 

111 return [] 

112 

113 if self._data[:2] == b"\r\n": 

114 self._extract(2) 

115 return [] 

116 

117 # Only search in buffer space that we've not already looked at. 

118 match = blank_line_regex.search(self._data, self._multiple_lines_search) 

119 if match is None: 

120 self._multiple_lines_search = max(0, len(self._data) - 2) 

121 return None 

122 

123 # Truncate the buffer and return it. 

124 idx = match.span(0)[-1] 

125 out = self._extract(idx) 

126 lines = out.split(b"\n") 

127 

128 for line in lines: 

129 if line.endswith(b"\r"): 

130 del line[-1] 

131 

132 assert lines[-2] == lines[-1] == b"" 

133 

134 del lines[-2:] 

135 

136 return lines 

137 

138 # In theory we should wait until `\r\n` before starting to validate 

139 # incoming data. However it's interesting to detect (very) invalid data 

140 # early given they might not even contain `\r\n` at all (hence only 

141 # timeout will get rid of them). 

142 # This is not a 100% effective detection but more of a cheap sanity check 

143 # allowing for early abort in some useful cases. 

144 # This is especially interesting when peer is messing up with HTTPS and 

145 # sent us a TLS stream where we were expecting plain HTTP given all 

146 # versions of TLS so far start handshake with a 0x16 message type code. 

147 def is_next_line_obviously_invalid_request_line(self) -> bool: 

148 try: 

149 # HTTP header line must not contain non-printable characters 

150 # and should not start with a space 

151 return self._data[0] < 0x21 

152 except IndexError: 

153 return False