Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/sansio/multipart.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

186 statements  

1from __future__ import annotations 

2 

3import re 

4import typing as t 

5from dataclasses import dataclass 

6from enum import auto 

7from enum import Enum 

8 

9from ..datastructures import Headers 

10from ..exceptions import RequestEntityTooLarge 

11from ..http import parse_options_header 

12 

13 

14class Event: 

15 pass 

16 

17 

18@dataclass(frozen=True) 

19class Preamble(Event): 

20 data: bytes 

21 

22 

23@dataclass(frozen=True) 

24class Field(Event): 

25 name: str 

26 headers: Headers 

27 

28 

29@dataclass(frozen=True) 

30class File(Event): 

31 name: str 

32 filename: str 

33 headers: Headers 

34 

35 

36@dataclass(frozen=True) 

37class Data(Event): 

38 data: bytes 

39 more_data: bool 

40 

41 

42@dataclass(frozen=True) 

43class Epilogue(Event): 

44 data: bytes 

45 

46 

47class NeedData(Event): 

48 pass 

49 

50 

51NEED_DATA = NeedData() 

52 

53 

54class State(Enum): 

55 PREAMBLE = auto() 

56 PART = auto() 

57 DATA = auto() 

58 DATA_START = auto() 

59 EPILOGUE = auto() 

60 COMPLETE = auto() 

61 

62 

63# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that 

64# many implementations break this and either use CR or LF alone. 

65LINE_BREAK = b"(?:\r\n|\n|\r)" 

66BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE) 

67LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE) 

68# Header values can be continued via a space or tab after the linebreak, as 

69# per RFC2231 

70HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE) 

71# This must be long enough to contain any line breaks plus any 

72# additional boundary markers (--) such that they will be found in a 

73# subsequent search 

74SEARCH_EXTRA_LENGTH = 8 

75 

76 

77class MultipartDecoder: 

78 """Decodes a multipart message as bytes into Python events. 

79 

80 The part data is returned as available to allow the caller to save 

81 the data from memory to disk, if desired. 

82 

83 .. versionchanged:: 3.1.4 

84 Handle chunks that split a``\r\n`` sequence. 

85 """ 

86 

87 def __init__( 

88 self, 

89 boundary: bytes, 

90 max_form_memory_size: int | None = None, 

91 *, 

92 max_parts: int | None = None, 

93 ) -> None: 

94 self.buffer = bytearray() 

95 self.complete = False 

96 self.max_form_memory_size = max_form_memory_size 

97 self.max_parts = max_parts 

98 self.state = State.PREAMBLE 

99 self.boundary = boundary 

100 

101 # Note in the below \h i.e. horizontal whitespace is used 

102 # as [^\S\n\r] as \h isn't supported in python. 

103 

104 # The preamble must end with a boundary where the boundary is 

105 # prefixed by a line break, RFC2046. Except that many 

106 # implementations including Werkzeug's tests omit the line 

107 # break prefix. In addition the first boundary could be the 

108 # epilogue boundary (for empty form-data) hence the matching 

109 # group to understand if it is an epilogue boundary. 

110 self.preamble_re = re.compile( 

111 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

112 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

113 re.MULTILINE, 

114 ) 

115 # A boundary must include a line break prefix and suffix, and 

116 # may include trailing whitespace. In addition the boundary 

117 # could be the epilogue boundary hence the matching group to 

118 # understand if it is an epilogue boundary. 

119 self.boundary_re = re.compile( 

120 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

121 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

122 re.MULTILINE, 

123 ) 

124 self._search_position = 0 

125 self._parts_decoded = 0 

126 

127 def receive_data(self, data: bytes | None) -> None: 

128 if data is None: 

129 self.complete = True 

130 elif ( 

131 self.max_form_memory_size is not None 

132 and len(self.buffer) + len(data) > self.max_form_memory_size 

133 ): 

134 # Ensure that data within single event does not exceed limit. 

135 # Also checked across accumulated events in MultiPartParser. 

136 raise RequestEntityTooLarge() 

137 else: 

138 self.buffer.extend(data) 

139 

140 def next_event(self) -> Event: 

141 event: Event = NEED_DATA 

142 if self.state == State.PREAMBLE: 

143 match = self.preamble_re.search(self.buffer, self._search_position) 

144 if match is not None: 

145 if match.group(1).startswith(b"--"): 

146 self.state = State.EPILOGUE 

147 else: 

148 self.state = State.PART 

149 data = bytes(self.buffer[: match.start()]) 

150 del self.buffer[: match.end()] 

151 event = Preamble(data=data) 

152 self._search_position = 0 

153 else: 

154 # Update the search start position to be equal to the 

155 # current buffer length (already searched) minus a 

156 # safe buffer for part of the search target. 

157 self._search_position = max( 

158 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH 

159 ) 

160 

161 elif self.state == State.PART: 

162 match = BLANK_LINE_RE.search(self.buffer, self._search_position) 

163 if match is not None: 

164 headers = self._parse_headers(self.buffer[: match.start()]) 

165 # The final header ends with a single CRLF, however a 

166 # blank line indicates the start of the 

167 # body. Therefore the end is after the first CRLF. 

168 headers_end = (match.start() + match.end()) // 2 

169 del self.buffer[:headers_end] 

170 

171 if "content-disposition" not in headers: 

172 raise ValueError("Missing Content-Disposition header") 

173 

174 disposition, extra = parse_options_header( 

175 headers["content-disposition"] 

176 ) 

177 name = t.cast(str, extra.get("name")) 

178 filename = extra.get("filename") 

179 if filename is not None: 

180 event = File( 

181 filename=filename, 

182 headers=headers, 

183 name=name, 

184 ) 

185 else: 

186 event = Field( 

187 headers=headers, 

188 name=name, 

189 ) 

190 self.state = State.DATA_START 

191 self._search_position = 0 

192 self._parts_decoded += 1 

193 

194 if self.max_parts is not None and self._parts_decoded > self.max_parts: 

195 raise RequestEntityTooLarge() 

196 else: 

197 # Update the search start position to be equal to the 

198 # current buffer length (already searched) minus a 

199 # safe buffer for part of the search target. 

200 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH) 

201 

202 elif self.state == State.DATA_START: 

203 data, del_index, more_data = self._parse_data(self.buffer, start=True) 

204 del self.buffer[:del_index] 

205 event = Data(data=data, more_data=more_data) 

206 if more_data: 

207 self.state = State.DATA 

208 

209 elif self.state == State.DATA: 

210 data, del_index, more_data = self._parse_data(self.buffer, start=False) 

211 del self.buffer[:del_index] 

212 if data or not more_data: 

213 event = Data(data=data, more_data=more_data) 

214 

215 elif self.state == State.EPILOGUE and self.complete: 

216 event = Epilogue(data=bytes(self.buffer)) 

217 del self.buffer[:] 

218 self.state = State.COMPLETE 

219 

220 if self.complete and isinstance(event, NeedData): 

221 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}") 

222 

223 return event 

224 

225 def _parse_headers(self, data: bytes | bytearray) -> Headers: 

226 headers: list[tuple[str, str]] = [] 

227 # Merge the continued headers into one line 

228 data = HEADER_CONTINUATION_RE.sub(b" ", data) 

229 # Now there is one header per line 

230 for line in data.splitlines(): 

231 line = line.strip() 

232 

233 if line != b"": 

234 name, _, value = line.decode().partition(":") 

235 headers.append((name.strip(), value.strip())) 

236 return Headers(headers) 

237 

238 def _parse_data( 

239 self, data: bytes | bytearray, *, start: bool 

240 ) -> tuple[bytes, int, bool]: 

241 # Body parts must start with CRLF (or CR or LF) 

242 if start: 

243 match = LINE_BREAK_RE.match(data) 

244 data_start = t.cast(t.Match[bytes], match).end() 

245 else: 

246 data_start = 0 

247 

248 if self.buffer.find(b"--" + self.boundary) == -1: 

249 # No complete boundary in the buffer, but there may be 

250 # a partial boundary at the end. 

251 data_end = del_index = ( 

252 self._last_partial_boundary_index(data[data_start:]) + data_start 

253 ) 

254 more_data = True 

255 else: 

256 match = self.boundary_re.search(data) 

257 if match is not None: 

258 if match.group(1).startswith(b"--"): 

259 self.state = State.EPILOGUE 

260 else: 

261 self.state = State.PART 

262 data_end = match.start() 

263 del_index = match.end() 

264 else: 

265 data_end = del_index = ( 

266 self._last_partial_boundary_index(data[data_start:]) + data_start 

267 ) 

268 more_data = match is None 

269 return bytes(data[data_start:data_end]), del_index, more_data 

270 

271 def _last_partial_boundary_index(self, data: bytes | bytearray) -> int: 

272 # Find the last index following which a partial boundary 

273 # could be present in the data. This will be the earliest 

274 # position of a LF or a CR, unless that position is more 

275 # than a complete boundary from the end in which case there 

276 # is no partial boundary. 

277 complete_boundary_index = len(data) - len(b"\r\n--" + self.boundary) 

278 try: 

279 last_nl = data.rindex(b"\n") 

280 except ValueError: 

281 last_nl = len(data) 

282 else: 

283 if last_nl < complete_boundary_index: 

284 last_nl = len(data) 

285 try: 

286 last_cr = data.rindex(b"\r") 

287 except ValueError: 

288 last_cr = len(data) 

289 else: 

290 if last_cr < complete_boundary_index: 

291 last_cr = len(data) 

292 return min(last_nl, last_cr) 

293 

294 

295class MultipartEncoder: 

296 def __init__(self, boundary: bytes) -> None: 

297 self.boundary = boundary 

298 self.state = State.PREAMBLE 

299 

300 def send_event(self, event: Event) -> bytes: 

301 if isinstance(event, Preamble) and self.state == State.PREAMBLE: 

302 self.state = State.PART 

303 return event.data 

304 elif isinstance(event, (Field, File)) and self.state in { 

305 State.PREAMBLE, 

306 State.PART, 

307 State.DATA, 

308 }: 

309 data = b"\r\n--" + self.boundary + b"\r\n" 

310 data += b'Content-Disposition: form-data; name="%s"' % event.name.encode() 

311 if isinstance(event, File): 

312 data += b'; filename="%s"' % event.filename.encode() 

313 data += b"\r\n" 

314 for name, value in t.cast(Field, event).headers: 

315 if name.lower() != "content-disposition": 

316 data += f"{name}: {value}\r\n".encode() 

317 self.state = State.DATA_START 

318 return data 

319 elif isinstance(event, Data) and self.state == State.DATA_START: 

320 self.state = State.DATA 

321 if len(event.data) > 0: 

322 return b"\r\n" + event.data 

323 else: 

324 return event.data 

325 elif isinstance(event, Data) and self.state == State.DATA: 

326 return event.data 

327 elif isinstance(event, Epilogue): 

328 self.state = State.COMPLETE 

329 return b"\r\n--" + self.boundary + b"--\r\n" + event.data 

330 else: 

331 raise ValueError(f"Cannot generate {event} in state: {self.state}")