Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/sansio/multipart.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

184 statements  

1from __future__ import annotations 

2 

3import re 

4import typing as t 

5from dataclasses import dataclass 

6from enum import auto 

7from enum import Enum 

8 

9from ..datastructures import Headers 

10from ..exceptions import RequestEntityTooLarge 

11from ..http import parse_options_header 

12 

13 

14class Event: 

15 pass 

16 

17 

18@dataclass(frozen=True) 

19class Preamble(Event): 

20 data: bytes 

21 

22 

23@dataclass(frozen=True) 

24class Field(Event): 

25 name: str 

26 headers: Headers 

27 

28 

29@dataclass(frozen=True) 

30class File(Event): 

31 name: str 

32 filename: str 

33 headers: Headers 

34 

35 

36@dataclass(frozen=True) 

37class Data(Event): 

38 data: bytes 

39 more_data: bool 

40 

41 

42@dataclass(frozen=True) 

43class Epilogue(Event): 

44 data: bytes 

45 

46 

47class NeedData(Event): 

48 pass 

49 

50 

51NEED_DATA = NeedData() 

52 

53 

54class State(Enum): 

55 PREAMBLE = auto() 

56 PART = auto() 

57 DATA = auto() 

58 DATA_START = auto() 

59 EPILOGUE = auto() 

60 COMPLETE = auto() 

61 

62 

63# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that 

64# many implementations break this and either use CR or LF alone. 

65LINE_BREAK = b"(?:\r\n|\n|\r)" 

66BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE) 

67LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE) 

68# Header values can be continued via a space or tab after the linebreak, as 

69# per RFC2231 

70HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE) 

71# This must be long enough to contain any line breaks plus any 

72# additional boundary markers (--) such that they will be found in a 

73# subsequent search 

74SEARCH_EXTRA_LENGTH = 8 

75 

76 

77class MultipartDecoder: 

78 """Decodes a multipart message as bytes into Python events. 

79 

80 The part data is returned as available to allow the caller to save 

81 the data from memory to disk, if desired. 

82 """ 

83 

84 def __init__( 

85 self, 

86 boundary: bytes, 

87 max_form_memory_size: int | None = None, 

88 *, 

89 max_parts: int | None = None, 

90 ) -> None: 

91 self.buffer = bytearray() 

92 self.complete = False 

93 self.max_form_memory_size = max_form_memory_size 

94 self.max_parts = max_parts 

95 self.state = State.PREAMBLE 

96 self.boundary = boundary 

97 

98 # Note in the below \h i.e. horizontal whitespace is used 

99 # as [^\S\n\r] as \h isn't supported in python. 

100 

101 # The preamble must end with a boundary where the boundary is 

102 # prefixed by a line break, RFC2046. Except that many 

103 # implementations including Werkzeug's tests omit the line 

104 # break prefix. In addition the first boundary could be the 

105 # epilogue boundary (for empty form-data) hence the matching 

106 # group to understand if it is an epilogue boundary. 

107 self.preamble_re = re.compile( 

108 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

109 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

110 re.MULTILINE, 

111 ) 

112 # A boundary must include a line break prefix and suffix, and 

113 # may include trailing whitespace. In addition the boundary 

114 # could be the epilogue boundary hence the matching group to 

115 # understand if it is an epilogue boundary. 

116 self.boundary_re = re.compile( 

117 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

118 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

119 re.MULTILINE, 

120 ) 

121 self._search_position = 0 

122 self._parts_decoded = 0 

123 

124 def last_newline(self, data: bytes) -> int: 

125 try: 

126 last_nl = data.rindex(b"\n") 

127 except ValueError: 

128 last_nl = len(data) 

129 try: 

130 last_cr = data.rindex(b"\r") 

131 except ValueError: 

132 last_cr = len(data) 

133 

134 return min(last_nl, last_cr) 

135 

136 def receive_data(self, data: bytes | None) -> None: 

137 if data is None: 

138 self.complete = True 

139 elif ( 

140 self.max_form_memory_size is not None 

141 and len(self.buffer) + len(data) > self.max_form_memory_size 

142 ): 

143 # Ensure that data within single event does not exceed limit. 

144 # Also checked across accumulated events in MultiPartParser. 

145 raise RequestEntityTooLarge() 

146 else: 

147 self.buffer.extend(data) 

148 

149 def next_event(self) -> Event: 

150 event: Event = NEED_DATA 

151 

152 if self.state == State.PREAMBLE: 

153 match = self.preamble_re.search(self.buffer, self._search_position) 

154 if match is not None: 

155 if match.group(1).startswith(b"--"): 

156 self.state = State.EPILOGUE 

157 else: 

158 self.state = State.PART 

159 data = bytes(self.buffer[: match.start()]) 

160 del self.buffer[: match.end()] 

161 event = Preamble(data=data) 

162 self._search_position = 0 

163 else: 

164 # Update the search start position to be equal to the 

165 # current buffer length (already searched) minus a 

166 # safe buffer for part of the search target. 

167 self._search_position = max( 

168 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH 

169 ) 

170 

171 elif self.state == State.PART: 

172 match = BLANK_LINE_RE.search(self.buffer, self._search_position) 

173 if match is not None: 

174 headers = self._parse_headers(self.buffer[: match.start()]) 

175 # The final header ends with a single CRLF, however a 

176 # blank line indicates the start of the 

177 # body. Therefore the end is after the first CRLF. 

178 headers_end = (match.start() + match.end()) // 2 

179 del self.buffer[:headers_end] 

180 

181 if "content-disposition" not in headers: 

182 raise ValueError("Missing Content-Disposition header") 

183 

184 disposition, extra = parse_options_header( 

185 headers["content-disposition"] 

186 ) 

187 name = t.cast(str, extra.get("name")) 

188 filename = extra.get("filename") 

189 if filename is not None: 

190 event = File( 

191 filename=filename, 

192 headers=headers, 

193 name=name, 

194 ) 

195 else: 

196 event = Field( 

197 headers=headers, 

198 name=name, 

199 ) 

200 self.state = State.DATA_START 

201 self._search_position = 0 

202 self._parts_decoded += 1 

203 

204 if self.max_parts is not None and self._parts_decoded > self.max_parts: 

205 raise RequestEntityTooLarge() 

206 else: 

207 # Update the search start position to be equal to the 

208 # current buffer length (already searched) minus a 

209 # safe buffer for part of the search target. 

210 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH) 

211 

212 elif self.state == State.DATA_START: 

213 data, del_index, more_data = self._parse_data(self.buffer, start=True) 

214 del self.buffer[:del_index] 

215 event = Data(data=data, more_data=more_data) 

216 if more_data: 

217 self.state = State.DATA 

218 

219 elif self.state == State.DATA: 

220 data, del_index, more_data = self._parse_data(self.buffer, start=False) 

221 del self.buffer[:del_index] 

222 if data or not more_data: 

223 event = Data(data=data, more_data=more_data) 

224 

225 elif self.state == State.EPILOGUE and self.complete: 

226 event = Epilogue(data=bytes(self.buffer)) 

227 del self.buffer[:] 

228 self.state = State.COMPLETE 

229 

230 if self.complete and isinstance(event, NeedData): 

231 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}") 

232 

233 return event 

234 

235 def _parse_headers(self, data: bytes) -> Headers: 

236 headers: list[tuple[str, str]] = [] 

237 # Merge the continued headers into one line 

238 data = HEADER_CONTINUATION_RE.sub(b" ", data) 

239 # Now there is one header per line 

240 for line in data.splitlines(): 

241 line = line.strip() 

242 

243 if line != b"": 

244 name, _, value = line.decode().partition(":") 

245 headers.append((name.strip(), value.strip())) 

246 return Headers(headers) 

247 

248 def _parse_data(self, data: bytes, *, start: bool) -> tuple[bytes, int, bool]: 

249 # Body parts must start with CRLF (or CR or LF) 

250 if start: 

251 match = LINE_BREAK_RE.match(data) 

252 data_start = t.cast(t.Match[bytes], match).end() 

253 else: 

254 data_start = 0 

255 

256 boundary = b"--" + self.boundary 

257 

258 if self.buffer.find(boundary) == -1: 

259 # No complete boundary in the buffer, but there may be 

260 # a partial boundary at the end. As the boundary 

261 # starts with either a nl or cr find the earliest and 

262 # return up to that as data. 

263 data_end = del_index = self.last_newline(data[data_start:]) + data_start 

264 # If amount of data after last newline is far from 

265 # possible length of partial boundary, we should 

266 # assume that there is no partial boundary in the buffer 

267 # and return all pending data. 

268 if (len(data) - data_end) > len(b"\n" + boundary): 

269 data_end = del_index = len(data) 

270 more_data = True 

271 else: 

272 match = self.boundary_re.search(data) 

273 if match is not None: 

274 if match.group(1).startswith(b"--"): 

275 self.state = State.EPILOGUE 

276 else: 

277 self.state = State.PART 

278 data_end = match.start() 

279 del_index = match.end() 

280 else: 

281 data_end = del_index = self.last_newline(data[data_start:]) + data_start 

282 more_data = match is None 

283 

284 return bytes(data[data_start:data_end]), del_index, more_data 

285 

286 

287class MultipartEncoder: 

288 def __init__(self, boundary: bytes) -> None: 

289 self.boundary = boundary 

290 self.state = State.PREAMBLE 

291 

292 def send_event(self, event: Event) -> bytes: 

293 if isinstance(event, Preamble) and self.state == State.PREAMBLE: 

294 self.state = State.PART 

295 return event.data 

296 elif isinstance(event, (Field, File)) and self.state in { 

297 State.PREAMBLE, 

298 State.PART, 

299 State.DATA, 

300 }: 

301 data = b"\r\n--" + self.boundary + b"\r\n" 

302 data += b'Content-Disposition: form-data; name="%s"' % event.name.encode() 

303 if isinstance(event, File): 

304 data += b'; filename="%s"' % event.filename.encode() 

305 data += b"\r\n" 

306 for name, value in t.cast(Field, event).headers: 

307 if name.lower() != "content-disposition": 

308 data += f"{name}: {value}\r\n".encode() 

309 self.state = State.DATA_START 

310 return data 

311 elif isinstance(event, Data) and self.state == State.DATA_START: 

312 self.state = State.DATA 

313 if len(event.data) > 0: 

314 return b"\r\n" + event.data 

315 else: 

316 return event.data 

317 elif isinstance(event, Data) and self.state == State.DATA: 

318 return event.data 

319 elif isinstance(event, Epilogue): 

320 self.state = State.COMPLETE 

321 return b"\r\n--" + self.boundary + b"--\r\n" + event.data 

322 else: 

323 raise ValueError(f"Cannot generate {event} in state: {self.state}")