1from __future__ import annotations
2
3import re
4import typing as t
5from dataclasses import dataclass
6from enum import auto
7from enum import Enum
8
9from ..datastructures import Headers
10from ..exceptions import RequestEntityTooLarge
11from ..http import parse_options_header
12
13
14class Event:
15 pass
16
17
18@dataclass(frozen=True)
19class Preamble(Event):
20 data: bytes
21
22
23@dataclass(frozen=True)
24class Field(Event):
25 name: str
26 headers: Headers
27
28
29@dataclass(frozen=True)
30class File(Event):
31 name: str
32 filename: str
33 headers: Headers
34
35
36@dataclass(frozen=True)
37class Data(Event):
38 data: bytes
39 more_data: bool
40
41
42@dataclass(frozen=True)
43class Epilogue(Event):
44 data: bytes
45
46
47class NeedData(Event):
48 pass
49
50
51NEED_DATA = NeedData()
52
53
54class State(Enum):
55 PREAMBLE = auto()
56 PART = auto()
57 DATA = auto()
58 DATA_START = auto()
59 EPILOGUE = auto()
60 COMPLETE = auto()
61
62
63# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that
64# many implementations break this and either use CR or LF alone.
65LINE_BREAK = b"(?:\r\n|\n|\r)"
66BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE)
67LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE)
68# Header values can be continued via a space or tab after the linebreak, as
69# per RFC2231
70HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE)
71# This must be long enough to contain any line breaks plus any
72# additional boundary markers (--) such that they will be found in a
73# subsequent search
74SEARCH_EXTRA_LENGTH = 8
75
76
77class MultipartDecoder:
78 """Decodes a multipart message as bytes into Python events.
79
80 The part data is returned as available to allow the caller to save
81 the data from memory to disk, if desired.
82 """
83
84 def __init__(
85 self,
86 boundary: bytes,
87 max_form_memory_size: int | None = None,
88 *,
89 max_parts: int | None = None,
90 ) -> None:
91 self.buffer = bytearray()
92 self.complete = False
93 self.max_form_memory_size = max_form_memory_size
94 self.max_parts = max_parts
95 self.state = State.PREAMBLE
96 self.boundary = boundary
97
98 # Note in the below \h i.e. horizontal whitespace is used
99 # as [^\S\n\r] as \h isn't supported in python.
100
101 # The preamble must end with a boundary where the boundary is
102 # prefixed by a line break, RFC2046. Except that many
103 # implementations including Werkzeug's tests omit the line
104 # break prefix. In addition the first boundary could be the
105 # epilogue boundary (for empty form-data) hence the matching
106 # group to understand if it is an epilogue boundary.
107 self.preamble_re = re.compile(
108 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
109 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
110 re.MULTILINE,
111 )
112 # A boundary must include a line break prefix and suffix, and
113 # may include trailing whitespace. In addition the boundary
114 # could be the epilogue boundary hence the matching group to
115 # understand if it is an epilogue boundary.
116 self.boundary_re = re.compile(
117 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
118 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
119 re.MULTILINE,
120 )
121 self._search_position = 0
122 self._parts_decoded = 0
123
124 def last_newline(self, data: bytes) -> int:
125 try:
126 last_nl = data.rindex(b"\n")
127 except ValueError:
128 last_nl = len(data)
129 try:
130 last_cr = data.rindex(b"\r")
131 except ValueError:
132 last_cr = len(data)
133
134 return min(last_nl, last_cr)
135
136 def receive_data(self, data: bytes | None) -> None:
137 if data is None:
138 self.complete = True
139 elif (
140 self.max_form_memory_size is not None
141 and len(self.buffer) + len(data) > self.max_form_memory_size
142 ):
143 # Ensure that data within single event does not exceed limit.
144 # Also checked across accumulated events in MultiPartParser.
145 raise RequestEntityTooLarge()
146 else:
147 self.buffer.extend(data)
148
149 def next_event(self) -> Event:
150 event: Event = NEED_DATA
151
152 if self.state == State.PREAMBLE:
153 match = self.preamble_re.search(self.buffer, self._search_position)
154 if match is not None:
155 if match.group(1).startswith(b"--"):
156 self.state = State.EPILOGUE
157 else:
158 self.state = State.PART
159 data = bytes(self.buffer[: match.start()])
160 del self.buffer[: match.end()]
161 event = Preamble(data=data)
162 self._search_position = 0
163 else:
164 # Update the search start position to be equal to the
165 # current buffer length (already searched) minus a
166 # safe buffer for part of the search target.
167 self._search_position = max(
168 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH
169 )
170
171 elif self.state == State.PART:
172 match = BLANK_LINE_RE.search(self.buffer, self._search_position)
173 if match is not None:
174 headers = self._parse_headers(self.buffer[: match.start()])
175 # The final header ends with a single CRLF, however a
176 # blank line indicates the start of the
177 # body. Therefore the end is after the first CRLF.
178 headers_end = (match.start() + match.end()) // 2
179 del self.buffer[:headers_end]
180
181 if "content-disposition" not in headers:
182 raise ValueError("Missing Content-Disposition header")
183
184 disposition, extra = parse_options_header(
185 headers["content-disposition"]
186 )
187 name = t.cast(str, extra.get("name"))
188 filename = extra.get("filename")
189 if filename is not None:
190 event = File(
191 filename=filename,
192 headers=headers,
193 name=name,
194 )
195 else:
196 event = Field(
197 headers=headers,
198 name=name,
199 )
200 self.state = State.DATA_START
201 self._search_position = 0
202 self._parts_decoded += 1
203
204 if self.max_parts is not None and self._parts_decoded > self.max_parts:
205 raise RequestEntityTooLarge()
206 else:
207 # Update the search start position to be equal to the
208 # current buffer length (already searched) minus a
209 # safe buffer for part of the search target.
210 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH)
211
212 elif self.state == State.DATA_START:
213 data, del_index, more_data = self._parse_data(self.buffer, start=True)
214 del self.buffer[:del_index]
215 event = Data(data=data, more_data=more_data)
216 if more_data:
217 self.state = State.DATA
218
219 elif self.state == State.DATA:
220 data, del_index, more_data = self._parse_data(self.buffer, start=False)
221 del self.buffer[:del_index]
222 if data or not more_data:
223 event = Data(data=data, more_data=more_data)
224
225 elif self.state == State.EPILOGUE and self.complete:
226 event = Epilogue(data=bytes(self.buffer))
227 del self.buffer[:]
228 self.state = State.COMPLETE
229
230 if self.complete and isinstance(event, NeedData):
231 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}")
232
233 return event
234
235 def _parse_headers(self, data: bytes) -> Headers:
236 headers: list[tuple[str, str]] = []
237 # Merge the continued headers into one line
238 data = HEADER_CONTINUATION_RE.sub(b" ", data)
239 # Now there is one header per line
240 for line in data.splitlines():
241 line = line.strip()
242
243 if line != b"":
244 name, _, value = line.decode().partition(":")
245 headers.append((name.strip(), value.strip()))
246 return Headers(headers)
247
248 def _parse_data(self, data: bytes, *, start: bool) -> tuple[bytes, int, bool]:
249 # Body parts must start with CRLF (or CR or LF)
250 if start:
251 match = LINE_BREAK_RE.match(data)
252 data_start = t.cast(t.Match[bytes], match).end()
253 else:
254 data_start = 0
255
256 boundary = b"--" + self.boundary
257
258 if self.buffer.find(boundary) == -1:
259 # No complete boundary in the buffer, but there may be
260 # a partial boundary at the end. As the boundary
261 # starts with either a nl or cr find the earliest and
262 # return up to that as data.
263 data_end = del_index = self.last_newline(data[data_start:]) + data_start
264 # If amount of data after last newline is far from
265 # possible length of partial boundary, we should
266 # assume that there is no partial boundary in the buffer
267 # and return all pending data.
268 if (len(data) - data_end) > len(b"\n" + boundary):
269 data_end = del_index = len(data)
270 more_data = True
271 else:
272 match = self.boundary_re.search(data)
273 if match is not None:
274 if match.group(1).startswith(b"--"):
275 self.state = State.EPILOGUE
276 else:
277 self.state = State.PART
278 data_end = match.start()
279 del_index = match.end()
280 else:
281 data_end = del_index = self.last_newline(data[data_start:]) + data_start
282 more_data = match is None
283
284 return bytes(data[data_start:data_end]), del_index, more_data
285
286
287class MultipartEncoder:
288 def __init__(self, boundary: bytes) -> None:
289 self.boundary = boundary
290 self.state = State.PREAMBLE
291
292 def send_event(self, event: Event) -> bytes:
293 if isinstance(event, Preamble) and self.state == State.PREAMBLE:
294 self.state = State.PART
295 return event.data
296 elif isinstance(event, (Field, File)) and self.state in {
297 State.PREAMBLE,
298 State.PART,
299 State.DATA,
300 }:
301 data = b"\r\n--" + self.boundary + b"\r\n"
302 data += b'Content-Disposition: form-data; name="%s"' % event.name.encode()
303 if isinstance(event, File):
304 data += b'; filename="%s"' % event.filename.encode()
305 data += b"\r\n"
306 for name, value in t.cast(Field, event).headers:
307 if name.lower() != "content-disposition":
308 data += f"{name}: {value}\r\n".encode()
309 self.state = State.DATA_START
310 return data
311 elif isinstance(event, Data) and self.state == State.DATA_START:
312 self.state = State.DATA
313 if len(event.data) > 0:
314 return b"\r\n" + event.data
315 else:
316 return event.data
317 elif isinstance(event, Data) and self.state == State.DATA:
318 return event.data
319 elif isinstance(event, Epilogue):
320 self.state = State.COMPLETE
321 return b"\r\n--" + self.boundary + b"--\r\n" + event.data
322 else:
323 raise ValueError(f"Cannot generate {event} in state: {self.state}")