Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/sansio/multipart.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

186 statements  

1from __future__ import annotations 

2 

3import re 

4import typing as t 

5from dataclasses import dataclass 

6from enum import auto 

7from enum import Enum 

8 

9from ..datastructures import Headers 

10from ..exceptions import RequestEntityTooLarge 

11from ..http import parse_options_header 

12 

13 

14class Event: 

15 pass 

16 

17 

18@dataclass(frozen=True) 

19class Preamble(Event): 

20 data: bytes 

21 

22 

23@dataclass(frozen=True) 

24class Field(Event): 

25 name: str 

26 headers: Headers 

27 

28 

29@dataclass(frozen=True) 

30class File(Event): 

31 name: str 

32 filename: str 

33 headers: Headers 

34 

35 

36@dataclass(frozen=True) 

37class Data(Event): 

38 data: bytes 

39 more_data: bool 

40 

41 

42@dataclass(frozen=True) 

43class Epilogue(Event): 

44 data: bytes 

45 

46 

47class NeedData(Event): 

48 pass 

49 

50 

51NEED_DATA = NeedData() 

52 

53 

54class State(Enum): 

55 PREAMBLE = auto() 

56 PART = auto() 

57 DATA = auto() 

58 DATA_START = auto() 

59 EPILOGUE = auto() 

60 COMPLETE = auto() 

61 

62 

63# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that 

64# many implementations break this and either use CR or LF alone. 

65LINE_BREAK = b"(?:\r\n|\n|\r)" 

66BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE) 

67LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE) 

68# Header values can be continued via a space or tab after the linebreak, as 

69# per RFC2231 

70HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE) 

71# This must be long enough to contain any line breaks plus any 

72# additional boundary markers (--) such that they will be found in a 

73# subsequent search 

74SEARCH_EXTRA_LENGTH = 8 

75 

76 

77class MultipartDecoder: 

78 """Decodes a multipart message as bytes into Python events. 

79 

80 The part data is returned as available to allow the caller to save 

81 the data from memory to disk, if desired. 

82 

83 .. versionchanged:: 3.1.4 

84 Handle chunks that split a``\r\n`` sequence. 

85 """ 

86 

87 def __init__( 

88 self, 

89 boundary: bytes, 

90 max_form_memory_size: int | None = None, 

91 *, 

92 max_parts: int | None = None, 

93 ) -> None: 

94 self.buffer = bytearray() 

95 self.complete = False 

96 self.max_form_memory_size = max_form_memory_size 

97 self.max_parts = max_parts 

98 self.state = State.PREAMBLE 

99 self.boundary = boundary 

100 

101 # Note in the below \h i.e. horizontal whitespace is used 

102 # as [^\S\n\r] as \h isn't supported in python. 

103 

104 # The preamble must end with a boundary where the boundary is 

105 # prefixed by a line break, RFC2046. Except that many 

106 # implementations including Werkzeug's tests omit the line 

107 # break prefix. In addition the first boundary could be the 

108 # epilogue boundary (for empty form-data) hence the matching 

109 # group to understand if it is an epilogue boundary. 

110 self.preamble_re = re.compile( 

111 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

112 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

113 re.MULTILINE, 

114 ) 

115 # A boundary must include a line break prefix and suffix, and 

116 # may include trailing whitespace. In addition the boundary 

117 # could be the epilogue boundary hence the matching group to 

118 # understand if it is an epilogue boundary. 

119 self.boundary_re = re.compile( 

120 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

121 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

122 re.MULTILINE, 

123 ) 

124 self._search_position = 0 

125 self._parts_decoded = 0 

126 

127 def receive_data(self, data: bytes | None) -> None: 

128 if data is None: 

129 self.complete = True 

130 elif ( 

131 self.max_form_memory_size is not None 

132 and len(self.buffer) + len(data) > self.max_form_memory_size 

133 ): 

134 # Ensure that data within single event does not exceed limit. 

135 # Also checked across accumulated events in MultiPartParser. 

136 raise RequestEntityTooLarge() 

137 else: 

138 self.buffer.extend(data) 

139 

140 def next_event(self) -> Event: 

141 event: Event = NEED_DATA 

142 

143 if self.state == State.PREAMBLE: 

144 match = self.preamble_re.search(self.buffer, self._search_position) 

145 if match is not None: 

146 if match.group(1).startswith(b"--"): 

147 self.state = State.EPILOGUE 

148 else: 

149 self.state = State.PART 

150 data = bytes(self.buffer[: match.start()]) 

151 del self.buffer[: match.end()] 

152 event = Preamble(data=data) 

153 self._search_position = 0 

154 else: 

155 # Update the search start position to be equal to the 

156 # current buffer length (already searched) minus a 

157 # safe buffer for part of the search target. 

158 self._search_position = max( 

159 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH 

160 ) 

161 

162 elif self.state == State.PART: 

163 match = BLANK_LINE_RE.search(self.buffer, self._search_position) 

164 if match is not None: 

165 headers = self._parse_headers(self.buffer[: match.start()]) 

166 # The final header ends with a single CRLF, however a 

167 # blank line indicates the start of the 

168 # body. Therefore the end is after the first CRLF. 

169 headers_end = (match.start() + match.end()) // 2 

170 del self.buffer[:headers_end] 

171 

172 if "content-disposition" not in headers: 

173 raise ValueError("Missing Content-Disposition header") 

174 

175 disposition, extra = parse_options_header( 

176 headers["content-disposition"] 

177 ) 

178 name = t.cast(str, extra.get("name")) 

179 filename = extra.get("filename") 

180 if filename is not None: 

181 event = File( 

182 filename=filename, 

183 headers=headers, 

184 name=name, 

185 ) 

186 else: 

187 event = Field( 

188 headers=headers, 

189 name=name, 

190 ) 

191 self.state = State.DATA_START 

192 self._search_position = 0 

193 self._parts_decoded += 1 

194 

195 if self.max_parts is not None and self._parts_decoded > self.max_parts: 

196 raise RequestEntityTooLarge() 

197 else: 

198 # Update the search start position to be equal to the 

199 # current buffer length (already searched) minus a 

200 # safe buffer for part of the search target. 

201 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH) 

202 

203 elif self.state == State.DATA_START: 

204 data, del_index, more_data = self._parse_data(self.buffer, start=True) 

205 del self.buffer[:del_index] 

206 event = Data(data=data, more_data=more_data) 

207 if more_data: 

208 self.state = State.DATA 

209 

210 elif self.state == State.DATA: 

211 data, del_index, more_data = self._parse_data(self.buffer, start=False) 

212 del self.buffer[:del_index] 

213 if data or not more_data: 

214 event = Data(data=data, more_data=more_data) 

215 

216 elif self.state == State.EPILOGUE and self.complete: 

217 event = Epilogue(data=bytes(self.buffer)) 

218 del self.buffer[:] 

219 self.state = State.COMPLETE 

220 

221 if self.complete and isinstance(event, NeedData): 

222 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}") 

223 

224 return event 

225 

226 def _parse_headers(self, data: bytes | bytearray) -> Headers: 

227 headers: list[tuple[str, str]] = [] 

228 # Merge the continued headers into one line 

229 data = HEADER_CONTINUATION_RE.sub(b" ", data) 

230 # Now there is one header per line 

231 for line in data.splitlines(): 

232 line = line.strip() 

233 

234 if line != b"": 

235 name, _, value = line.decode().partition(":") 

236 headers.append((name.strip(), value.strip())) 

237 return Headers(headers) 

238 

239 def _parse_data( 

240 self, data: bytes | bytearray, *, start: bool 

241 ) -> tuple[bytes, int, bool]: 

242 # Body parts must start with CRLF (or CR or LF) 

243 if start: 

244 match = LINE_BREAK_RE.match(data) 

245 data_start = t.cast(t.Match[bytes], match).end() 

246 else: 

247 data_start = 0 

248 

249 if self.buffer.find(b"--" + self.boundary) == -1: 

250 # No complete boundary in the buffer, but there may be 

251 # a partial boundary at the end. 

252 data_end = del_index = self._last_partial_boundary_index(data) 

253 more_data = True 

254 else: 

255 match = self.boundary_re.search(data) 

256 if match is not None: 

257 if match.group(1).startswith(b"--"): 

258 self.state = State.EPILOGUE 

259 else: 

260 self.state = State.PART 

261 data_end = match.start() 

262 del_index = match.end() 

263 else: 

264 data_end = del_index = self._last_partial_boundary_index(data) 

265 more_data = match is None 

266 

267 return bytes(data[data_start:data_end]), del_index, more_data 

268 

269 def _last_partial_boundary_index(self, data: bytes | bytearray) -> int: 

270 # Find the last index following which a partial boundary 

271 # could be present in the data. This will be the earliest 

272 # position of a LR or a CR, unless that position is more 

273 # than a complete boundary from the end in which case there 

274 # is no partial boundary. 

275 complete_boundary_index = len(data) - len(b"\r\n--" + self.boundary) 

276 try: 

277 last_nl = data.rindex(b"\n") 

278 except ValueError: 

279 last_nl = len(data) 

280 else: 

281 if last_nl < complete_boundary_index: 

282 last_nl = len(data) 

283 try: 

284 last_cr = data.rindex(b"\r") 

285 except ValueError: 

286 last_cr = len(data) 

287 else: 

288 if last_cr < complete_boundary_index: 

289 last_cr = len(data) 

290 return min(last_nl, last_cr) 

291 

292 

293class MultipartEncoder: 

294 def __init__(self, boundary: bytes) -> None: 

295 self.boundary = boundary 

296 self.state = State.PREAMBLE 

297 

298 def send_event(self, event: Event) -> bytes: 

299 if isinstance(event, Preamble) and self.state == State.PREAMBLE: 

300 self.state = State.PART 

301 return event.data 

302 elif isinstance(event, (Field, File)) and self.state in { 

303 State.PREAMBLE, 

304 State.PART, 

305 State.DATA, 

306 }: 

307 data = b"\r\n--" + self.boundary + b"\r\n" 

308 data += b'Content-Disposition: form-data; name="%s"' % event.name.encode() 

309 if isinstance(event, File): 

310 data += b'; filename="%s"' % event.filename.encode() 

311 data += b"\r\n" 

312 for name, value in t.cast(Field, event).headers: 

313 if name.lower() != "content-disposition": 

314 data += f"{name}: {value}\r\n".encode() 

315 self.state = State.DATA_START 

316 return data 

317 elif isinstance(event, Data) and self.state == State.DATA_START: 

318 self.state = State.DATA 

319 if len(event.data) > 0: 

320 return b"\r\n" + event.data 

321 else: 

322 return event.data 

323 elif isinstance(event, Data) and self.state == State.DATA: 

324 return event.data 

325 elif isinstance(event, Epilogue): 

326 self.state = State.COMPLETE 

327 return b"\r\n--" + self.boundary + b"--\r\n" + event.data 

328 else: 

329 raise ValueError(f"Cannot generate {event} in state: {self.state}")