1from __future__ import annotations
2
3import re
4import typing as t
5from dataclasses import dataclass
6from enum import auto
7from enum import Enum
8
9from ..datastructures import Headers
10from ..exceptions import RequestEntityTooLarge
11from ..http import parse_options_header
12
13
14class Event:
15 pass
16
17
18@dataclass(frozen=True)
19class Preamble(Event):
20 data: bytes
21
22
23@dataclass(frozen=True)
24class Field(Event):
25 name: str
26 headers: Headers
27
28
29@dataclass(frozen=True)
30class File(Event):
31 name: str
32 filename: str
33 headers: Headers
34
35
36@dataclass(frozen=True)
37class Data(Event):
38 data: bytes
39 more_data: bool
40
41
42@dataclass(frozen=True)
43class Epilogue(Event):
44 data: bytes
45
46
47class NeedData(Event):
48 pass
49
50
51NEED_DATA = NeedData()
52
53
54class State(Enum):
55 PREAMBLE = auto()
56 PART = auto()
57 DATA = auto()
58 DATA_START = auto()
59 EPILOGUE = auto()
60 COMPLETE = auto()
61
62
63# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that
64# many implementations break this and either use CR or LF alone.
65LINE_BREAK = b"(?:\r\n|\n|\r)"
66BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE)
67LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE)
68# Header values can be continued via a space or tab after the linebreak, as
69# per RFC2231
70HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE)
71# This must be long enough to contain any line breaks plus any
72# additional boundary markers (--) such that they will be found in a
73# subsequent search
74SEARCH_EXTRA_LENGTH = 8
75
76
77class MultipartDecoder:
78 """Decodes a multipart message as bytes into Python events.
79
80 The part data is returned as available to allow the caller to save
81 the data from memory to disk, if desired.
82
83 .. versionchanged:: 3.1.4
84 Handle chunks that split a``\r\n`` sequence.
85 """
86
87 def __init__(
88 self,
89 boundary: bytes,
90 max_form_memory_size: int | None = None,
91 *,
92 max_parts: int | None = None,
93 ) -> None:
94 self.buffer = bytearray()
95 self.complete = False
96 self.max_form_memory_size = max_form_memory_size
97 self.max_parts = max_parts
98 self.state = State.PREAMBLE
99 self.boundary = boundary
100
101 # Note in the below \h i.e. horizontal whitespace is used
102 # as [^\S\n\r] as \h isn't supported in python.
103
104 # The preamble must end with a boundary where the boundary is
105 # prefixed by a line break, RFC2046. Except that many
106 # implementations including Werkzeug's tests omit the line
107 # break prefix. In addition the first boundary could be the
108 # epilogue boundary (for empty form-data) hence the matching
109 # group to understand if it is an epilogue boundary.
110 self.preamble_re = re.compile(
111 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
112 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
113 re.MULTILINE,
114 )
115 # A boundary must include a line break prefix and suffix, and
116 # may include trailing whitespace. In addition the boundary
117 # could be the epilogue boundary hence the matching group to
118 # understand if it is an epilogue boundary.
119 self.boundary_re = re.compile(
120 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
121 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
122 re.MULTILINE,
123 )
124 self._search_position = 0
125 self._parts_decoded = 0
126
127 def last_newline(self, data: bytes | bytearray) -> int:
128 try:
129 last_nl = data.rindex(b"\n")
130 except ValueError:
131 last_nl = len(data)
132 try:
133 last_cr = data.rindex(b"\r")
134 except ValueError:
135 last_cr = len(data)
136
137 return min(last_nl, last_cr)
138
139 def receive_data(self, data: bytes | None) -> None:
140 if data is None:
141 self.complete = True
142 elif (
143 self.max_form_memory_size is not None
144 and len(self.buffer) + len(data) > self.max_form_memory_size
145 ):
146 # Ensure that data within single event does not exceed limit.
147 # Also checked across accumulated events in MultiPartParser.
148 raise RequestEntityTooLarge()
149 else:
150 self.buffer.extend(data)
151
152 def next_event(self) -> Event:
153 event: Event = NEED_DATA
154
155 if self.state == State.PREAMBLE:
156 match = self.preamble_re.search(self.buffer, self._search_position)
157 if match is not None:
158 if match.group(1).startswith(b"--"):
159 self.state = State.EPILOGUE
160 else:
161 self.state = State.PART
162 data = bytes(self.buffer[: match.start()])
163 del self.buffer[: match.end()]
164 event = Preamble(data=data)
165 self._search_position = 0
166 else:
167 # Update the search start position to be equal to the
168 # current buffer length (already searched) minus a
169 # safe buffer for part of the search target.
170 self._search_position = max(
171 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH
172 )
173
174 elif self.state == State.PART:
175 match = BLANK_LINE_RE.search(self.buffer, self._search_position)
176 if match is not None:
177 headers = self._parse_headers(self.buffer[: match.start()])
178 # The final header ends with a single CRLF, however a
179 # blank line indicates the start of the
180 # body. Therefore the end is after the first CRLF.
181 headers_end = (match.start() + match.end()) // 2
182 del self.buffer[:headers_end]
183
184 if "content-disposition" not in headers:
185 raise ValueError("Missing Content-Disposition header")
186
187 disposition, extra = parse_options_header(
188 headers["content-disposition"]
189 )
190 name = t.cast(str, extra.get("name"))
191 filename = extra.get("filename")
192 if filename is not None:
193 event = File(
194 filename=filename,
195 headers=headers,
196 name=name,
197 )
198 else:
199 event = Field(
200 headers=headers,
201 name=name,
202 )
203 self.state = State.DATA_START
204 self._search_position = 0
205 self._parts_decoded += 1
206
207 if self.max_parts is not None and self._parts_decoded > self.max_parts:
208 raise RequestEntityTooLarge()
209 else:
210 # Update the search start position to be equal to the
211 # current buffer length (already searched) minus a
212 # safe buffer for part of the search target.
213 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH)
214
215 elif self.state == State.DATA_START:
216 data, del_index, more_data = self._parse_data(self.buffer, start=True)
217 del self.buffer[:del_index]
218 event = Data(data=data, more_data=more_data)
219 if more_data:
220 self.state = State.DATA
221
222 elif self.state == State.DATA:
223 data, del_index, more_data = self._parse_data(self.buffer, start=False)
224 del self.buffer[:del_index]
225 if data or not more_data:
226 event = Data(data=data, more_data=more_data)
227
228 elif self.state == State.EPILOGUE and self.complete:
229 event = Epilogue(data=bytes(self.buffer))
230 del self.buffer[:]
231 self.state = State.COMPLETE
232
233 if self.complete and isinstance(event, NeedData):
234 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}")
235
236 return event
237
238 def _parse_headers(self, data: bytes | bytearray) -> Headers:
239 headers: list[tuple[str, str]] = []
240 # Merge the continued headers into one line
241 data = HEADER_CONTINUATION_RE.sub(b" ", data)
242 # Now there is one header per line
243 for line in data.splitlines():
244 line = line.strip()
245
246 if line != b"":
247 name, _, value = line.decode().partition(":")
248 headers.append((name.strip(), value.strip()))
249 return Headers(headers)
250
251 def _parse_data(
252 self, data: bytes | bytearray, *, start: bool
253 ) -> tuple[bytes, int, bool]:
254 # Body parts must start with CRLF (or CR or LF)
255 if start:
256 match = LINE_BREAK_RE.match(data)
257 data_start = t.cast(t.Match[bytes], match).end()
258 else:
259 data_start = 0
260
261 boundary = b"--" + self.boundary
262
263 if self.buffer.find(boundary) == -1:
264 # No complete boundary in the buffer, but there may be
265 # a partial boundary at the end. As the boundary
266 # starts with either a nl or cr find the earliest and
267 # return up to that as data.
268 data_end = del_index = self.last_newline(data[data_start:]) + data_start
269 # If amount of data after last newline is far from
270 # possible length of partial boundary, we should
271 # assume that there is no partial boundary in the buffer
272 # and return all pending data.
273 if (len(data) - data_end) > len(b"\n" + boundary):
274 data_end = del_index = len(data)
275 more_data = True
276 else:
277 match = self.boundary_re.search(data)
278 if match is not None:
279 if match.group(1).startswith(b"--"):
280 self.state = State.EPILOGUE
281 else:
282 self.state = State.PART
283 data_end = match.start()
284 del_index = match.end()
285 else:
286 data_end = del_index = self.last_newline(data[data_start:]) + data_start
287 more_data = match is None
288
289 # Keep \r\n sequence intact rather than splitting across chunks.
290 if data_end > data_start and data[data_end - 1] == 0x0D:
291 data_end -= 1
292 del_index -= 1
293
294 return bytes(data[data_start:data_end]), del_index, more_data
295
296
297class MultipartEncoder:
298 def __init__(self, boundary: bytes) -> None:
299 self.boundary = boundary
300 self.state = State.PREAMBLE
301
302 def send_event(self, event: Event) -> bytes:
303 if isinstance(event, Preamble) and self.state == State.PREAMBLE:
304 self.state = State.PART
305 return event.data
306 elif isinstance(event, (Field, File)) and self.state in {
307 State.PREAMBLE,
308 State.PART,
309 State.DATA,
310 }:
311 data = b"\r\n--" + self.boundary + b"\r\n"
312 data += b'Content-Disposition: form-data; name="%s"' % event.name.encode()
313 if isinstance(event, File):
314 data += b'; filename="%s"' % event.filename.encode()
315 data += b"\r\n"
316 for name, value in t.cast(Field, event).headers:
317 if name.lower() != "content-disposition":
318 data += f"{name}: {value}\r\n".encode()
319 self.state = State.DATA_START
320 return data
321 elif isinstance(event, Data) and self.state == State.DATA_START:
322 self.state = State.DATA
323 if len(event.data) > 0:
324 return b"\r\n" + event.data
325 else:
326 return event.data
327 elif isinstance(event, Data) and self.state == State.DATA:
328 return event.data
329 elif isinstance(event, Epilogue):
330 self.state = State.COMPLETE
331 return b"\r\n--" + self.boundary + b"--\r\n" + event.data
332 else:
333 raise ValueError(f"Cannot generate {event} in state: {self.state}")