1from __future__ import annotations
2
3import re
4import typing as t
5from dataclasses import dataclass
6from enum import auto
7from enum import Enum
8
9from ..datastructures import Headers
10from ..exceptions import RequestEntityTooLarge
11from ..http import parse_options_header
12
13
14class Event:
15 pass
16
17
18@dataclass(frozen=True)
19class Preamble(Event):
20 data: bytes
21
22
23@dataclass(frozen=True)
24class Field(Event):
25 name: str
26 headers: Headers
27
28
29@dataclass(frozen=True)
30class File(Event):
31 name: str
32 filename: str
33 headers: Headers
34
35
36@dataclass(frozen=True)
37class Data(Event):
38 data: bytes
39 more_data: bool
40
41
42@dataclass(frozen=True)
43class Epilogue(Event):
44 data: bytes
45
46
47class NeedData(Event):
48 pass
49
50
51NEED_DATA = NeedData()
52
53
54class State(Enum):
55 PREAMBLE = auto()
56 PART = auto()
57 DATA = auto()
58 DATA_START = auto()
59 EPILOGUE = auto()
60 COMPLETE = auto()
61
62
63# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that
64# many implementations break this and either use CR or LF alone.
65LINE_BREAK = b"(?:\r\n|\n|\r)"
66BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE)
67LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE)
68# Header values can be continued via a space or tab after the linebreak, as
69# per RFC2231
70HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE)
71# This must be long enough to contain any line breaks plus any
72# additional boundary markers (--) such that they will be found in a
73# subsequent search
74SEARCH_EXTRA_LENGTH = 8
75
76
77class MultipartDecoder:
78 """Decodes a multipart message as bytes into Python events.
79
80 The part data is returned as available to allow the caller to save
81 the data from memory to disk, if desired.
82
83 .. versionchanged:: 3.1.4
84 Handle chunks that split a``\r\n`` sequence.
85 """
86
87 def __init__(
88 self,
89 boundary: bytes,
90 max_form_memory_size: int | None = None,
91 *,
92 max_parts: int | None = None,
93 ) -> None:
94 self.buffer = bytearray()
95 self.complete = False
96 self.max_form_memory_size = max_form_memory_size
97 self.max_parts = max_parts
98 self.state = State.PREAMBLE
99 self.boundary = boundary
100
101 # Note in the below \h i.e. horizontal whitespace is used
102 # as [^\S\n\r] as \h isn't supported in python.
103
104 # The preamble must end with a boundary where the boundary is
105 # prefixed by a line break, RFC2046. Except that many
106 # implementations including Werkzeug's tests omit the line
107 # break prefix. In addition the first boundary could be the
108 # epilogue boundary (for empty form-data) hence the matching
109 # group to understand if it is an epilogue boundary.
110 self.preamble_re = re.compile(
111 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
112 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
113 re.MULTILINE,
114 )
115 # A boundary must include a line break prefix and suffix, and
116 # may include trailing whitespace. In addition the boundary
117 # could be the epilogue boundary hence the matching group to
118 # understand if it is an epilogue boundary.
119 self.boundary_re = re.compile(
120 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
121 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
122 re.MULTILINE,
123 )
124 self._search_position = 0
125 self._parts_decoded = 0
126
127 def receive_data(self, data: bytes | None) -> None:
128 if data is None:
129 self.complete = True
130 elif (
131 self.max_form_memory_size is not None
132 and len(self.buffer) + len(data) > self.max_form_memory_size
133 ):
134 # Ensure that data within single event does not exceed limit.
135 # Also checked across accumulated events in MultiPartParser.
136 raise RequestEntityTooLarge()
137 else:
138 self.buffer.extend(data)
139
140 def next_event(self) -> Event:
141 event: Event = NEED_DATA
142
143 if self.state == State.PREAMBLE:
144 match = self.preamble_re.search(self.buffer, self._search_position)
145 if match is not None:
146 if match.group(1).startswith(b"--"):
147 self.state = State.EPILOGUE
148 else:
149 self.state = State.PART
150 data = bytes(self.buffer[: match.start()])
151 del self.buffer[: match.end()]
152 event = Preamble(data=data)
153 self._search_position = 0
154 else:
155 # Update the search start position to be equal to the
156 # current buffer length (already searched) minus a
157 # safe buffer for part of the search target.
158 self._search_position = max(
159 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH
160 )
161
162 elif self.state == State.PART:
163 match = BLANK_LINE_RE.search(self.buffer, self._search_position)
164 if match is not None:
165 headers = self._parse_headers(self.buffer[: match.start()])
166 # The final header ends with a single CRLF, however a
167 # blank line indicates the start of the
168 # body. Therefore the end is after the first CRLF.
169 headers_end = (match.start() + match.end()) // 2
170 del self.buffer[:headers_end]
171
172 if "content-disposition" not in headers:
173 raise ValueError("Missing Content-Disposition header")
174
175 disposition, extra = parse_options_header(
176 headers["content-disposition"]
177 )
178 name = t.cast(str, extra.get("name"))
179 filename = extra.get("filename")
180 if filename is not None:
181 event = File(
182 filename=filename,
183 headers=headers,
184 name=name,
185 )
186 else:
187 event = Field(
188 headers=headers,
189 name=name,
190 )
191 self.state = State.DATA_START
192 self._search_position = 0
193 self._parts_decoded += 1
194
195 if self.max_parts is not None and self._parts_decoded > self.max_parts:
196 raise RequestEntityTooLarge()
197 else:
198 # Update the search start position to be equal to the
199 # current buffer length (already searched) minus a
200 # safe buffer for part of the search target.
201 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH)
202
203 elif self.state == State.DATA_START:
204 data, del_index, more_data = self._parse_data(self.buffer, start=True)
205 del self.buffer[:del_index]
206 event = Data(data=data, more_data=more_data)
207 if more_data:
208 self.state = State.DATA
209
210 elif self.state == State.DATA:
211 data, del_index, more_data = self._parse_data(self.buffer, start=False)
212 del self.buffer[:del_index]
213 if data or not more_data:
214 event = Data(data=data, more_data=more_data)
215
216 elif self.state == State.EPILOGUE and self.complete:
217 event = Epilogue(data=bytes(self.buffer))
218 del self.buffer[:]
219 self.state = State.COMPLETE
220
221 if self.complete and isinstance(event, NeedData):
222 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}")
223
224 return event
225
226 def _parse_headers(self, data: bytes | bytearray) -> Headers:
227 headers: list[tuple[str, str]] = []
228 # Merge the continued headers into one line
229 data = HEADER_CONTINUATION_RE.sub(b" ", data)
230 # Now there is one header per line
231 for line in data.splitlines():
232 line = line.strip()
233
234 if line != b"":
235 name, _, value = line.decode().partition(":")
236 headers.append((name.strip(), value.strip()))
237 return Headers(headers)
238
239 def _parse_data(
240 self, data: bytes | bytearray, *, start: bool
241 ) -> tuple[bytes, int, bool]:
242 # Body parts must start with CRLF (or CR or LF)
243 if start:
244 match = LINE_BREAK_RE.match(data)
245 data_start = t.cast(t.Match[bytes], match).end()
246 else:
247 data_start = 0
248
249 if self.buffer.find(b"--" + self.boundary) == -1:
250 # No complete boundary in the buffer, but there may be
251 # a partial boundary at the end.
252 data_end = del_index = self._last_partial_boundary_index(data)
253 more_data = True
254 else:
255 match = self.boundary_re.search(data)
256 if match is not None:
257 if match.group(1).startswith(b"--"):
258 self.state = State.EPILOGUE
259 else:
260 self.state = State.PART
261 data_end = match.start()
262 del_index = match.end()
263 else:
264 data_end = del_index = self._last_partial_boundary_index(data)
265 more_data = match is None
266
267 return bytes(data[data_start:data_end]), del_index, more_data
268
269 def _last_partial_boundary_index(self, data: bytes | bytearray) -> int:
270 # Find the last index following which a partial boundary
271 # could be present in the data. This will be the earliest
272 # position of a LR or a CR, unless that position is more
273 # than a complete boundary from the end in which case there
274 # is no partial boundary.
275 complete_boundary_index = len(data) - len(b"\r\n--" + self.boundary)
276 try:
277 last_nl = data.rindex(b"\n")
278 except ValueError:
279 last_nl = len(data)
280 else:
281 if last_nl < complete_boundary_index:
282 last_nl = len(data)
283 try:
284 last_cr = data.rindex(b"\r")
285 except ValueError:
286 last_cr = len(data)
287 else:
288 if last_cr < complete_boundary_index:
289 last_cr = len(data)
290 return min(last_nl, last_cr)
291
292
293class MultipartEncoder:
294 def __init__(self, boundary: bytes) -> None:
295 self.boundary = boundary
296 self.state = State.PREAMBLE
297
298 def send_event(self, event: Event) -> bytes:
299 if isinstance(event, Preamble) and self.state == State.PREAMBLE:
300 self.state = State.PART
301 return event.data
302 elif isinstance(event, (Field, File)) and self.state in {
303 State.PREAMBLE,
304 State.PART,
305 State.DATA,
306 }:
307 data = b"\r\n--" + self.boundary + b"\r\n"
308 data += b'Content-Disposition: form-data; name="%s"' % event.name.encode()
309 if isinstance(event, File):
310 data += b'; filename="%s"' % event.filename.encode()
311 data += b"\r\n"
312 for name, value in t.cast(Field, event).headers:
313 if name.lower() != "content-disposition":
314 data += f"{name}: {value}\r\n".encode()
315 self.state = State.DATA_START
316 return data
317 elif isinstance(event, Data) and self.state == State.DATA_START:
318 self.state = State.DATA
319 if len(event.data) > 0:
320 return b"\r\n" + event.data
321 else:
322 return event.data
323 elif isinstance(event, Data) and self.state == State.DATA:
324 return event.data
325 elif isinstance(event, Epilogue):
326 self.state = State.COMPLETE
327 return b"\r\n--" + self.boundary + b"--\r\n" + event.data
328 else:
329 raise ValueError(f"Cannot generate {event} in state: {self.state}")