1from __future__ import annotations
2
3import re
4import typing as t
5from dataclasses import dataclass
6from enum import auto
7from enum import Enum
8
9from ..datastructures import Headers
10from ..exceptions import RequestEntityTooLarge
11from ..http import parse_options_header
12
13
14class Event:
15 pass
16
17
18@dataclass(frozen=True)
19class Preamble(Event):
20 data: bytes
21
22
23@dataclass(frozen=True)
24class Field(Event):
25 name: str
26 headers: Headers
27
28
29@dataclass(frozen=True)
30class File(Event):
31 name: str
32 filename: str
33 headers: Headers
34
35
36@dataclass(frozen=True)
37class Data(Event):
38 data: bytes
39 more_data: bool
40
41
42@dataclass(frozen=True)
43class Epilogue(Event):
44 data: bytes
45
46
47class NeedData(Event):
48 pass
49
50
51NEED_DATA = NeedData()
52
53
54class State(Enum):
55 PREAMBLE = auto()
56 PART = auto()
57 DATA = auto()
58 DATA_START = auto()
59 EPILOGUE = auto()
60 COMPLETE = auto()
61
62
63# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that
64# many implementations break this and either use CR or LF alone.
65LINE_BREAK = b"(?:\r\n|\n|\r)"
66BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE)
67LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE)
68# Header values can be continued via a space or tab after the linebreak, as
69# per RFC2231
70HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE)
71# This must be long enough to contain any line breaks plus any
72# additional boundary markers (--) such that they will be found in a
73# subsequent search
74SEARCH_EXTRA_LENGTH = 8
75
76
77class MultipartDecoder:
78 """Decodes a multipart message as bytes into Python events.
79
80 The part data is returned as available to allow the caller to save
81 the data from memory to disk, if desired.
82
83 .. versionchanged:: 3.1.4
84 Handle chunks that split a``\r\n`` sequence.
85 """
86
87 def __init__(
88 self,
89 boundary: bytes,
90 max_form_memory_size: int | None = None,
91 *,
92 max_parts: int | None = None,
93 ) -> None:
94 self.buffer = bytearray()
95 self.complete = False
96 self.max_form_memory_size = max_form_memory_size
97 self.max_parts = max_parts
98 self.state = State.PREAMBLE
99 self.boundary = boundary
100
101 # Note in the below \h i.e. horizontal whitespace is used
102 # as [^\S\n\r] as \h isn't supported in python.
103
104 # The preamble must end with a boundary where the boundary is
105 # prefixed by a line break, RFC2046. Except that many
106 # implementations including Werkzeug's tests omit the line
107 # break prefix. In addition the first boundary could be the
108 # epilogue boundary (for empty form-data) hence the matching
109 # group to understand if it is an epilogue boundary.
110 self.preamble_re = re.compile(
111 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
112 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
113 re.MULTILINE,
114 )
115 # A boundary must include a line break prefix and suffix, and
116 # may include trailing whitespace. In addition the boundary
117 # could be the epilogue boundary hence the matching group to
118 # understand if it is an epilogue boundary.
119 self.boundary_re = re.compile(
120 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
121 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
122 re.MULTILINE,
123 )
124 self._search_position = 0
125 self._parts_decoded = 0
126
127 def receive_data(self, data: bytes | None) -> None:
128 if data is None:
129 self.complete = True
130 elif (
131 self.max_form_memory_size is not None
132 and len(self.buffer) + len(data) > self.max_form_memory_size
133 ):
134 # Ensure that data within single event does not exceed limit.
135 # Also checked across accumulated events in MultiPartParser.
136 raise RequestEntityTooLarge()
137 else:
138 self.buffer.extend(data)
139
140 def next_event(self) -> Event:
141 event: Event = NEED_DATA
142 if self.state == State.PREAMBLE:
143 match = self.preamble_re.search(self.buffer, self._search_position)
144 if match is not None:
145 if match.group(1).startswith(b"--"):
146 self.state = State.EPILOGUE
147 else:
148 self.state = State.PART
149 data = bytes(self.buffer[: match.start()])
150 del self.buffer[: match.end()]
151 event = Preamble(data=data)
152 self._search_position = 0
153 else:
154 # Update the search start position to be equal to the
155 # current buffer length (already searched) minus a
156 # safe buffer for part of the search target.
157 self._search_position = max(
158 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH
159 )
160
161 elif self.state == State.PART:
162 match = BLANK_LINE_RE.search(self.buffer, self._search_position)
163 if match is not None:
164 headers = self._parse_headers(self.buffer[: match.start()])
165 # The final header ends with a single CRLF, however a
166 # blank line indicates the start of the
167 # body. Therefore the end is after the first CRLF.
168 headers_end = (match.start() + match.end()) // 2
169 del self.buffer[:headers_end]
170
171 if "content-disposition" not in headers:
172 raise ValueError("Missing Content-Disposition header")
173
174 disposition, extra = parse_options_header(
175 headers["content-disposition"]
176 )
177 name = t.cast(str, extra.get("name"))
178 filename = extra.get("filename")
179 if filename is not None:
180 event = File(
181 filename=filename,
182 headers=headers,
183 name=name,
184 )
185 else:
186 event = Field(
187 headers=headers,
188 name=name,
189 )
190 self.state = State.DATA_START
191 self._search_position = 0
192 self._parts_decoded += 1
193
194 if self.max_parts is not None and self._parts_decoded > self.max_parts:
195 raise RequestEntityTooLarge()
196 else:
197 # Update the search start position to be equal to the
198 # current buffer length (already searched) minus a
199 # safe buffer for part of the search target.
200 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH)
201
202 elif self.state == State.DATA_START:
203 data, del_index, more_data = self._parse_data(self.buffer, start=True)
204 del self.buffer[:del_index]
205 event = Data(data=data, more_data=more_data)
206 if more_data:
207 self.state = State.DATA
208
209 elif self.state == State.DATA:
210 data, del_index, more_data = self._parse_data(self.buffer, start=False)
211 del self.buffer[:del_index]
212 if data or not more_data:
213 event = Data(data=data, more_data=more_data)
214
215 elif self.state == State.EPILOGUE and self.complete:
216 event = Epilogue(data=bytes(self.buffer))
217 del self.buffer[:]
218 self.state = State.COMPLETE
219
220 if self.complete and isinstance(event, NeedData):
221 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}")
222
223 return event
224
225 def _parse_headers(self, data: bytes | bytearray) -> Headers:
226 headers: list[tuple[str, str]] = []
227 # Merge the continued headers into one line
228 data = HEADER_CONTINUATION_RE.sub(b" ", data)
229 # Now there is one header per line
230 for line in data.splitlines():
231 line = line.strip()
232
233 if line != b"":
234 name, _, value = line.decode().partition(":")
235 headers.append((name.strip(), value.strip()))
236 return Headers(headers)
237
238 def _parse_data(
239 self, data: bytes | bytearray, *, start: bool
240 ) -> tuple[bytes, int, bool]:
241 # Body parts must start with CRLF (or CR or LF)
242 if start:
243 match = LINE_BREAK_RE.match(data)
244 data_start = t.cast(t.Match[bytes], match).end()
245 else:
246 data_start = 0
247
248 if self.buffer.find(b"--" + self.boundary) == -1:
249 # No complete boundary in the buffer, but there may be
250 # a partial boundary at the end.
251 data_end = del_index = (
252 self._last_partial_boundary_index(data[data_start:]) + data_start
253 )
254 more_data = True
255 else:
256 match = self.boundary_re.search(data)
257 if match is not None:
258 if match.group(1).startswith(b"--"):
259 self.state = State.EPILOGUE
260 else:
261 self.state = State.PART
262 data_end = match.start()
263 del_index = match.end()
264 else:
265 data_end = del_index = (
266 self._last_partial_boundary_index(data[data_start:]) + data_start
267 )
268 more_data = match is None
269 return bytes(data[data_start:data_end]), del_index, more_data
270
271 def _last_partial_boundary_index(self, data: bytes | bytearray) -> int:
272 # Find the last index following which a partial boundary
273 # could be present in the data. This will be the earliest
274 # position of a LF or a CR, unless that position is more
275 # than a complete boundary from the end in which case there
276 # is no partial boundary.
277 complete_boundary_index = len(data) - len(b"\r\n--" + self.boundary)
278 try:
279 last_nl = data.rindex(b"\n")
280 except ValueError:
281 last_nl = len(data)
282 else:
283 if last_nl < complete_boundary_index:
284 last_nl = len(data)
285 try:
286 last_cr = data.rindex(b"\r")
287 except ValueError:
288 last_cr = len(data)
289 else:
290 if last_cr < complete_boundary_index:
291 last_cr = len(data)
292 return min(last_nl, last_cr)
293
294
295class MultipartEncoder:
296 def __init__(self, boundary: bytes) -> None:
297 self.boundary = boundary
298 self.state = State.PREAMBLE
299
300 def send_event(self, event: Event) -> bytes:
301 if isinstance(event, Preamble) and self.state == State.PREAMBLE:
302 self.state = State.PART
303 return event.data
304 elif isinstance(event, (Field, File)) and self.state in {
305 State.PREAMBLE,
306 State.PART,
307 State.DATA,
308 }:
309 data = b"\r\n--" + self.boundary + b"\r\n"
310 data += b'Content-Disposition: form-data; name="%s"' % event.name.encode()
311 if isinstance(event, File):
312 data += b'; filename="%s"' % event.filename.encode()
313 data += b"\r\n"
314 for name, value in t.cast(Field, event).headers:
315 if name.lower() != "content-disposition":
316 data += f"{name}: {value}\r\n".encode()
317 self.state = State.DATA_START
318 return data
319 elif isinstance(event, Data) and self.state == State.DATA_START:
320 self.state = State.DATA
321 if len(event.data) > 0:
322 return b"\r\n" + event.data
323 else:
324 return event.data
325 elif isinstance(event, Data) and self.state == State.DATA:
326 return event.data
327 elif isinstance(event, Epilogue):
328 self.state = State.COMPLETE
329 return b"\r\n--" + self.boundary + b"--\r\n" + event.data
330 else:
331 raise ValueError(f"Cannot generate {event} in state: {self.state}")