Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/sansio/multipart.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1from __future__ import annotations 

2 

3import re 

4import typing as t 

5from dataclasses import dataclass 

6from enum import auto 

7from enum import Enum 

8 

9from ..datastructures import Headers 

10from ..exceptions import RequestEntityTooLarge 

11from ..http import parse_options_header 

12 

13 

14class Event: 

15 pass 

16 

17 

18@dataclass(frozen=True) 

19class Preamble(Event): 

20 data: bytes 

21 

22 

23@dataclass(frozen=True) 

24class Field(Event): 

25 name: str 

26 headers: Headers 

27 

28 

29@dataclass(frozen=True) 

30class File(Event): 

31 name: str 

32 filename: str 

33 headers: Headers 

34 

35 

36@dataclass(frozen=True) 

37class Data(Event): 

38 data: bytes 

39 more_data: bool 

40 

41 

42@dataclass(frozen=True) 

43class Epilogue(Event): 

44 data: bytes 

45 

46 

47class NeedData(Event): 

48 pass 

49 

50 

51NEED_DATA = NeedData() 

52 

53 

54class State(Enum): 

55 PREAMBLE = auto() 

56 PART = auto() 

57 DATA = auto() 

58 DATA_START = auto() 

59 EPILOGUE = auto() 

60 COMPLETE = auto() 

61 

62 

63# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that 

64# many implementations break this and either use CR or LF alone. 

65LINE_BREAK = b"(?:\r\n|\n|\r)" 

66BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE) 

67LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE) 

68# Header values can be continued via a space or tab after the linebreak, as 

69# per RFC2231 

70HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE) 

71# This must be long enough to contain any line breaks plus any 

72# additional boundary markers (--) such that they will be found in a 

73# subsequent search 

74SEARCH_EXTRA_LENGTH = 8 

75 

76 

77class MultipartDecoder: 

78 """Decodes a multipart message as bytes into Python events. 

79 

80 The part data is returned as available to allow the caller to save 

81 the data from memory to disk, if desired. 

82 

83 .. versionchanged:: 3.1.4 

84 Handle chunks that split a``\r\n`` sequence. 

85 """ 

86 

87 def __init__( 

88 self, 

89 boundary: bytes, 

90 max_form_memory_size: int | None = None, 

91 *, 

92 max_parts: int | None = None, 

93 ) -> None: 

94 self.buffer = bytearray() 

95 self.complete = False 

96 self.max_form_memory_size = max_form_memory_size 

97 self.max_parts = max_parts 

98 self.state = State.PREAMBLE 

99 self.boundary = boundary 

100 

101 # Note in the below \h i.e. horizontal whitespace is used 

102 # as [^\S\n\r] as \h isn't supported in python. 

103 

104 # The preamble must end with a boundary where the boundary is 

105 # prefixed by a line break, RFC2046. Except that many 

106 # implementations including Werkzeug's tests omit the line 

107 # break prefix. In addition the first boundary could be the 

108 # epilogue boundary (for empty form-data) hence the matching 

109 # group to understand if it is an epilogue boundary. 

110 self.preamble_re = re.compile( 

111 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

112 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

113 re.MULTILINE, 

114 ) 

115 # A boundary must include a line break prefix and suffix, and 

116 # may include trailing whitespace. In addition the boundary 

117 # could be the epilogue boundary hence the matching group to 

118 # understand if it is an epilogue boundary. 

119 self.boundary_re = re.compile( 

120 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

121 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

122 re.MULTILINE, 

123 ) 

124 self._search_position = 0 

125 self._parts_decoded = 0 

126 

127 def last_newline(self, data: bytes | bytearray) -> int: 

128 try: 

129 last_nl = data.rindex(b"\n") 

130 except ValueError: 

131 last_nl = len(data) 

132 try: 

133 last_cr = data.rindex(b"\r") 

134 except ValueError: 

135 last_cr = len(data) 

136 

137 return min(last_nl, last_cr) 

138 

139 def receive_data(self, data: bytes | None) -> None: 

140 if data is None: 

141 self.complete = True 

142 elif ( 

143 self.max_form_memory_size is not None 

144 and len(self.buffer) + len(data) > self.max_form_memory_size 

145 ): 

146 # Ensure that data within single event does not exceed limit. 

147 # Also checked across accumulated events in MultiPartParser. 

148 raise RequestEntityTooLarge() 

149 else: 

150 self.buffer.extend(data) 

151 

152 def next_event(self) -> Event: 

153 event: Event = NEED_DATA 

154 

155 if self.state == State.PREAMBLE: 

156 match = self.preamble_re.search(self.buffer, self._search_position) 

157 if match is not None: 

158 if match.group(1).startswith(b"--"): 

159 self.state = State.EPILOGUE 

160 else: 

161 self.state = State.PART 

162 data = bytes(self.buffer[: match.start()]) 

163 del self.buffer[: match.end()] 

164 event = Preamble(data=data) 

165 self._search_position = 0 

166 else: 

167 # Update the search start position to be equal to the 

168 # current buffer length (already searched) minus a 

169 # safe buffer for part of the search target. 

170 self._search_position = max( 

171 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH 

172 ) 

173 

174 elif self.state == State.PART: 

175 match = BLANK_LINE_RE.search(self.buffer, self._search_position) 

176 if match is not None: 

177 headers = self._parse_headers(self.buffer[: match.start()]) 

178 # The final header ends with a single CRLF, however a 

179 # blank line indicates the start of the 

180 # body. Therefore the end is after the first CRLF. 

181 headers_end = (match.start() + match.end()) // 2 

182 del self.buffer[:headers_end] 

183 

184 if "content-disposition" not in headers: 

185 raise ValueError("Missing Content-Disposition header") 

186 

187 disposition, extra = parse_options_header( 

188 headers["content-disposition"] 

189 ) 

190 name = t.cast(str, extra.get("name")) 

191 filename = extra.get("filename") 

192 if filename is not None: 

193 event = File( 

194 filename=filename, 

195 headers=headers, 

196 name=name, 

197 ) 

198 else: 

199 event = Field( 

200 headers=headers, 

201 name=name, 

202 ) 

203 self.state = State.DATA_START 

204 self._search_position = 0 

205 self._parts_decoded += 1 

206 

207 if self.max_parts is not None and self._parts_decoded > self.max_parts: 

208 raise RequestEntityTooLarge() 

209 else: 

210 # Update the search start position to be equal to the 

211 # current buffer length (already searched) minus a 

212 # safe buffer for part of the search target. 

213 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH) 

214 

215 elif self.state == State.DATA_START: 

216 data, del_index, more_data = self._parse_data(self.buffer, start=True) 

217 del self.buffer[:del_index] 

218 event = Data(data=data, more_data=more_data) 

219 if more_data: 

220 self.state = State.DATA 

221 

222 elif self.state == State.DATA: 

223 data, del_index, more_data = self._parse_data(self.buffer, start=False) 

224 del self.buffer[:del_index] 

225 if data or not more_data: 

226 event = Data(data=data, more_data=more_data) 

227 

228 elif self.state == State.EPILOGUE and self.complete: 

229 event = Epilogue(data=bytes(self.buffer)) 

230 del self.buffer[:] 

231 self.state = State.COMPLETE 

232 

233 if self.complete and isinstance(event, NeedData): 

234 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}") 

235 

236 return event 

237 

238 def _parse_headers(self, data: bytes | bytearray) -> Headers: 

239 headers: list[tuple[str, str]] = [] 

240 # Merge the continued headers into one line 

241 data = HEADER_CONTINUATION_RE.sub(b" ", data) 

242 # Now there is one header per line 

243 for line in data.splitlines(): 

244 line = line.strip() 

245 

246 if line != b"": 

247 name, _, value = line.decode().partition(":") 

248 headers.append((name.strip(), value.strip())) 

249 return Headers(headers) 

250 

251 def _parse_data( 

252 self, data: bytes | bytearray, *, start: bool 

253 ) -> tuple[bytes, int, bool]: 

254 # Body parts must start with CRLF (or CR or LF) 

255 if start: 

256 match = LINE_BREAK_RE.match(data) 

257 data_start = t.cast(t.Match[bytes], match).end() 

258 else: 

259 data_start = 0 

260 

261 boundary = b"--" + self.boundary 

262 

263 if self.buffer.find(boundary) == -1: 

264 # No complete boundary in the buffer, but there may be 

265 # a partial boundary at the end. As the boundary 

266 # starts with either a nl or cr find the earliest and 

267 # return up to that as data. 

268 data_end = del_index = self.last_newline(data[data_start:]) + data_start 

269 # If amount of data after last newline is far from 

270 # possible length of partial boundary, we should 

271 # assume that there is no partial boundary in the buffer 

272 # and return all pending data. 

273 if (len(data) - data_end) > len(b"\n" + boundary): 

274 data_end = del_index = len(data) 

275 more_data = True 

276 else: 

277 match = self.boundary_re.search(data) 

278 if match is not None: 

279 if match.group(1).startswith(b"--"): 

280 self.state = State.EPILOGUE 

281 else: 

282 self.state = State.PART 

283 data_end = match.start() 

284 del_index = match.end() 

285 else: 

286 data_end = del_index = self.last_newline(data[data_start:]) + data_start 

287 more_data = match is None 

288 

289 # Keep \r\n sequence intact rather than splitting across chunks. 

290 if data_end > data_start and data[data_end - 1] == 0x0D: 

291 data_end -= 1 

292 del_index -= 1 

293 

294 return bytes(data[data_start:data_end]), del_index, more_data 

295 

296 

297class MultipartEncoder: 

298 def __init__(self, boundary: bytes) -> None: 

299 self.boundary = boundary 

300 self.state = State.PREAMBLE 

301 

302 def send_event(self, event: Event) -> bytes: 

303 if isinstance(event, Preamble) and self.state == State.PREAMBLE: 

304 self.state = State.PART 

305 return event.data 

306 elif isinstance(event, (Field, File)) and self.state in { 

307 State.PREAMBLE, 

308 State.PART, 

309 State.DATA, 

310 }: 

311 data = b"\r\n--" + self.boundary + b"\r\n" 

312 data += b'Content-Disposition: form-data; name="%s"' % event.name.encode() 

313 if isinstance(event, File): 

314 data += b'; filename="%s"' % event.filename.encode() 

315 data += b"\r\n" 

316 for name, value in t.cast(Field, event).headers: 

317 if name.lower() != "content-disposition": 

318 data += f"{name}: {value}\r\n".encode() 

319 self.state = State.DATA_START 

320 return data 

321 elif isinstance(event, Data) and self.state == State.DATA_START: 

322 self.state = State.DATA 

323 if len(event.data) > 0: 

324 return b"\r\n" + event.data 

325 else: 

326 return event.data 

327 elif isinstance(event, Data) and self.state == State.DATA: 

328 return event.data 

329 elif isinstance(event, Epilogue): 

330 self.state = State.COMPLETE 

331 return b"\r\n--" + self.boundary + b"--\r\n" + event.data 

332 else: 

333 raise ValueError(f"Cannot generate {event} in state: {self.state}")