Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/sansio/multipart.py: 35%

161 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import re 

2from dataclasses import dataclass 

3from enum import auto 

4from enum import Enum 

5from typing import cast 

6from typing import List 

7from typing import Optional 

8from typing import Tuple 

9 

10from .._internal import _to_bytes 

11from .._internal import _to_str 

12from ..datastructures import Headers 

13from ..exceptions import RequestEntityTooLarge 

14from ..http import parse_options_header 

15 

16 

17class Event: 

18 pass 

19 

20 

21@dataclass(frozen=True) 

22class Preamble(Event): 

23 data: bytes 

24 

25 

26@dataclass(frozen=True) 

27class Field(Event): 

28 name: str 

29 headers: Headers 

30 

31 

32@dataclass(frozen=True) 

33class File(Event): 

34 name: str 

35 filename: str 

36 headers: Headers 

37 

38 

39@dataclass(frozen=True) 

40class Data(Event): 

41 data: bytes 

42 more_data: bool 

43 

44 

45@dataclass(frozen=True) 

46class Epilogue(Event): 

47 data: bytes 

48 

49 

50class NeedData(Event): 

51 pass 

52 

53 

54NEED_DATA = NeedData() 

55 

56 

57class State(Enum): 

58 PREAMBLE = auto() 

59 PART = auto() 

60 DATA = auto() 

61 EPILOGUE = auto() 

62 COMPLETE = auto() 

63 

64 

65# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that 

66# many implementations break this and either use CR or LF alone. 

67LINE_BREAK = b"(?:\r\n|\n|\r)" 

68BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE) 

69LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE) 

70# Header values can be continued via a space or tab after the linebreak, as 

71# per RFC2231 

72HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE) 

73# This must be long enough to contain any line breaks plus any 

74# additional boundary markers (--) such that they will be found in a 

75# subsequent search 

76SEARCH_EXTRA_LENGTH = 8 

77 

78 

79class MultipartDecoder: 

80 """Decodes a multipart message as bytes into Python events. 

81 

82 The part data is returned as available to allow the caller to save 

83 the data from memory to disk, if desired. 

84 """ 

85 

86 def __init__( 

87 self, 

88 boundary: bytes, 

89 max_form_memory_size: Optional[int] = None, 

90 ) -> None: 

91 self.buffer = bytearray() 

92 self.complete = False 

93 self.max_form_memory_size = max_form_memory_size 

94 self.state = State.PREAMBLE 

95 self.boundary = boundary 

96 

97 # Note in the below \h i.e. horizontal whitespace is used 

98 # as [^\S\n\r] as \h isn't supported in python. 

99 

100 # The preamble must end with a boundary where the boundary is 

101 # prefixed by a line break, RFC2046. Except that many 

102 # implementations including Werkzeug's tests omit the line 

103 # break prefix. In addition the first boundary could be the 

104 # epilogue boundary (for empty form-data) hence the matching 

105 # group to understand if it is an epilogue boundary. 

106 self.preamble_re = re.compile( 

107 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

108 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

109 re.MULTILINE, 

110 ) 

111 # A boundary must include a line break prefix and suffix, and 

112 # may include trailing whitespace. In addition the boundary 

113 # could be the epilogue boundary hence the matching group to 

114 # understand if it is an epilogue boundary. 

115 self.boundary_re = re.compile( 

116 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

117 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

118 re.MULTILINE, 

119 ) 

120 self._search_position = 0 

121 

122 def last_newline(self) -> int: 

123 try: 

124 last_nl = self.buffer.rindex(b"\n") 

125 except ValueError: 

126 last_nl = len(self.buffer) 

127 try: 

128 last_cr = self.buffer.rindex(b"\r") 

129 except ValueError: 

130 last_cr = len(self.buffer) 

131 

132 return min(last_nl, last_cr) 

133 

134 def receive_data(self, data: Optional[bytes]) -> None: 

135 if data is None: 

136 self.complete = True 

137 elif ( 

138 self.max_form_memory_size is not None 

139 and len(self.buffer) + len(data) > self.max_form_memory_size 

140 ): 

141 raise RequestEntityTooLarge() 

142 else: 

143 self.buffer.extend(data) 

144 

145 def next_event(self) -> Event: 

146 event: Event = NEED_DATA 

147 

148 if self.state == State.PREAMBLE: 

149 match = self.preamble_re.search(self.buffer, self._search_position) 

150 if match is not None: 

151 if match.group(1).startswith(b"--"): 

152 self.state = State.EPILOGUE 

153 else: 

154 self.state = State.PART 

155 data = bytes(self.buffer[: match.start()]) 

156 del self.buffer[: match.end()] 

157 event = Preamble(data=data) 

158 self._search_position = 0 

159 else: 

160 # Update the search start position to be equal to the 

161 # current buffer length (already searched) minus a 

162 # safe buffer for part of the search target. 

163 self._search_position = max( 

164 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH 

165 ) 

166 

167 elif self.state == State.PART: 

168 match = BLANK_LINE_RE.search(self.buffer, self._search_position) 

169 if match is not None: 

170 headers = self._parse_headers(self.buffer[: match.start()]) 

171 del self.buffer[: match.end()] 

172 

173 if "content-disposition" not in headers: 

174 raise ValueError("Missing Content-Disposition header") 

175 

176 disposition, extra = parse_options_header( 

177 headers["content-disposition"] 

178 ) 

179 name = cast(str, extra.get("name")) 

180 filename = extra.get("filename") 

181 if filename is not None: 

182 event = File( 

183 filename=filename, 

184 headers=headers, 

185 name=name, 

186 ) 

187 else: 

188 event = Field( 

189 headers=headers, 

190 name=name, 

191 ) 

192 self.state = State.DATA 

193 self._search_position = 0 

194 else: 

195 # Update the search start position to be equal to the 

196 # current buffer length (already searched) minus a 

197 # safe buffer for part of the search target. 

198 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH) 

199 

200 elif self.state == State.DATA: 

201 if self.buffer.find(b"--" + self.boundary) == -1: 

202 # No complete boundary in the buffer, but there may be 

203 # a partial boundary at the end. As the boundary 

204 # starts with either a nl or cr find the earliest and 

205 # return up to that as data. 

206 data_length = del_index = self.last_newline() 

207 more_data = True 

208 else: 

209 match = self.boundary_re.search(self.buffer) 

210 if match is not None: 

211 if match.group(1).startswith(b"--"): 

212 self.state = State.EPILOGUE 

213 else: 

214 self.state = State.PART 

215 data_length = match.start() 

216 del_index = match.end() 

217 else: 

218 data_length = del_index = self.last_newline() 

219 more_data = match is None 

220 

221 data = bytes(self.buffer[:data_length]) 

222 del self.buffer[:del_index] 

223 if data or not more_data: 

224 event = Data(data=data, more_data=more_data) 

225 

226 elif self.state == State.EPILOGUE and self.complete: 

227 event = Epilogue(data=bytes(self.buffer)) 

228 del self.buffer[:] 

229 self.state = State.COMPLETE 

230 

231 if self.complete and isinstance(event, NeedData): 

232 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}") 

233 

234 return event 

235 

236 def _parse_headers(self, data: bytes) -> Headers: 

237 headers: List[Tuple[str, str]] = [] 

238 # Merge the continued headers into one line 

239 data = HEADER_CONTINUATION_RE.sub(b" ", data) 

240 # Now there is one header per line 

241 for line in data.splitlines(): 

242 if line.strip() != b"": 

243 name, value = _to_str(line).strip().split(":", 1) 

244 headers.append((name.strip(), value.strip())) 

245 return Headers(headers) 

246 

247 

248class MultipartEncoder: 

249 def __init__(self, boundary: bytes) -> None: 

250 self.boundary = boundary 

251 self.state = State.PREAMBLE 

252 

253 def send_event(self, event: Event) -> bytes: 

254 if isinstance(event, Preamble) and self.state == State.PREAMBLE: 

255 self.state = State.PART 

256 return event.data 

257 elif isinstance(event, (Field, File)) and self.state in { 

258 State.PREAMBLE, 

259 State.PART, 

260 State.DATA, 

261 }: 

262 self.state = State.DATA 

263 data = b"\r\n--" + self.boundary + b"\r\n" 

264 data += b'Content-Disposition: form-data; name="%s"' % _to_bytes(event.name) 

265 if isinstance(event, File): 

266 data += b'; filename="%s"' % _to_bytes(event.filename) 

267 data += b"\r\n" 

268 for name, value in cast(Field, event).headers: 

269 if name.lower() != "content-disposition": 

270 data += _to_bytes(f"{name}: {value}\r\n") 

271 data += b"\r\n" 

272 return data 

273 elif isinstance(event, Data) and self.state == State.DATA: 

274 return event.data 

275 elif isinstance(event, Epilogue): 

276 self.state = State.COMPLETE 

277 return b"\r\n--" + self.boundary + b"--\r\n" + event.data 

278 else: 

279 raise ValueError(f"Cannot generate {event} in state: {self.state}")