Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/sansio/multipart.py: 34%

166 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1import re 

2from dataclasses import dataclass 

3from enum import auto 

4from enum import Enum 

5from typing import cast 

6from typing import List 

7from typing import Optional 

8from typing import Tuple 

9 

10from .._internal import _to_bytes 

11from .._internal import _to_str 

12from ..datastructures import Headers 

13from ..exceptions import RequestEntityTooLarge 

14from ..http import parse_options_header 

15 

16 

17class Event: 

18 pass 

19 

20 

21@dataclass(frozen=True) 

22class Preamble(Event): 

23 data: bytes 

24 

25 

26@dataclass(frozen=True) 

27class Field(Event): 

28 name: str 

29 headers: Headers 

30 

31 

32@dataclass(frozen=True) 

33class File(Event): 

34 name: str 

35 filename: str 

36 headers: Headers 

37 

38 

39@dataclass(frozen=True) 

40class Data(Event): 

41 data: bytes 

42 more_data: bool 

43 

44 

45@dataclass(frozen=True) 

46class Epilogue(Event): 

47 data: bytes 

48 

49 

50class NeedData(Event): 

51 pass 

52 

53 

54NEED_DATA = NeedData() 

55 

56 

57class State(Enum): 

58 PREAMBLE = auto() 

59 PART = auto() 

60 DATA = auto() 

61 EPILOGUE = auto() 

62 COMPLETE = auto() 

63 

64 

65# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that 

66# many implementations break this and either use CR or LF alone. 

67LINE_BREAK = b"(?:\r\n|\n|\r)" 

68BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE) 

69LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE) 

70# Header values can be continued via a space or tab after the linebreak, as 

71# per RFC2231 

72HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE) 

73# This must be long enough to contain any line breaks plus any 

74# additional boundary markers (--) such that they will be found in a 

75# subsequent search 

76SEARCH_EXTRA_LENGTH = 8 

77 

78 

79class MultipartDecoder: 

80 """Decodes a multipart message as bytes into Python events. 

81 

82 The part data is returned as available to allow the caller to save 

83 the data from memory to disk, if desired. 

84 """ 

85 

86 def __init__( 

87 self, 

88 boundary: bytes, 

89 max_form_memory_size: Optional[int] = None, 

90 *, 

91 max_parts: Optional[int] = None, 

92 ) -> None: 

93 self.buffer = bytearray() 

94 self.complete = False 

95 self.max_form_memory_size = max_form_memory_size 

96 self.max_parts = max_parts 

97 self.state = State.PREAMBLE 

98 self.boundary = boundary 

99 

100 # Note in the below \h i.e. horizontal whitespace is used 

101 # as [^\S\n\r] as \h isn't supported in python. 

102 

103 # The preamble must end with a boundary where the boundary is 

104 # prefixed by a line break, RFC2046. Except that many 

105 # implementations including Werkzeug's tests omit the line 

106 # break prefix. In addition the first boundary could be the 

107 # epilogue boundary (for empty form-data) hence the matching 

108 # group to understand if it is an epilogue boundary. 

109 self.preamble_re = re.compile( 

110 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

111 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

112 re.MULTILINE, 

113 ) 

114 # A boundary must include a line break prefix and suffix, and 

115 # may include trailing whitespace. In addition the boundary 

116 # could be the epilogue boundary hence the matching group to 

117 # understand if it is an epilogue boundary. 

118 self.boundary_re = re.compile( 

119 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

120 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

121 re.MULTILINE, 

122 ) 

123 self._search_position = 0 

124 self._parts_decoded = 0 

125 

126 def last_newline(self) -> int: 

127 try: 

128 last_nl = self.buffer.rindex(b"\n") 

129 except ValueError: 

130 last_nl = len(self.buffer) 

131 try: 

132 last_cr = self.buffer.rindex(b"\r") 

133 except ValueError: 

134 last_cr = len(self.buffer) 

135 

136 return min(last_nl, last_cr) 

137 

138 def receive_data(self, data: Optional[bytes]) -> None: 

139 if data is None: 

140 self.complete = True 

141 elif ( 

142 self.max_form_memory_size is not None 

143 and len(self.buffer) + len(data) > self.max_form_memory_size 

144 ): 

145 raise RequestEntityTooLarge() 

146 else: 

147 self.buffer.extend(data) 

148 

149 def next_event(self) -> Event: 

150 event: Event = NEED_DATA 

151 

152 if self.state == State.PREAMBLE: 

153 match = self.preamble_re.search(self.buffer, self._search_position) 

154 if match is not None: 

155 if match.group(1).startswith(b"--"): 

156 self.state = State.EPILOGUE 

157 else: 

158 self.state = State.PART 

159 data = bytes(self.buffer[: match.start()]) 

160 del self.buffer[: match.end()] 

161 event = Preamble(data=data) 

162 self._search_position = 0 

163 else: 

164 # Update the search start position to be equal to the 

165 # current buffer length (already searched) minus a 

166 # safe buffer for part of the search target. 

167 self._search_position = max( 

168 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH 

169 ) 

170 

171 elif self.state == State.PART: 

172 match = BLANK_LINE_RE.search(self.buffer, self._search_position) 

173 if match is not None: 

174 headers = self._parse_headers(self.buffer[: match.start()]) 

175 del self.buffer[: match.end()] 

176 

177 if "content-disposition" not in headers: 

178 raise ValueError("Missing Content-Disposition header") 

179 

180 disposition, extra = parse_options_header( 

181 headers["content-disposition"] 

182 ) 

183 name = cast(str, extra.get("name")) 

184 filename = extra.get("filename") 

185 if filename is not None: 

186 event = File( 

187 filename=filename, 

188 headers=headers, 

189 name=name, 

190 ) 

191 else: 

192 event = Field( 

193 headers=headers, 

194 name=name, 

195 ) 

196 self.state = State.DATA 

197 self._search_position = 0 

198 self._parts_decoded += 1 

199 

200 if self.max_parts is not None and self._parts_decoded > self.max_parts: 

201 raise RequestEntityTooLarge() 

202 else: 

203 # Update the search start position to be equal to the 

204 # current buffer length (already searched) minus a 

205 # safe buffer for part of the search target. 

206 self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH) 

207 

208 elif self.state == State.DATA: 

209 if self.buffer.find(b"--" + self.boundary) == -1: 

210 # No complete boundary in the buffer, but there may be 

211 # a partial boundary at the end. As the boundary 

212 # starts with either a nl or cr find the earliest and 

213 # return up to that as data. 

214 data_length = del_index = self.last_newline() 

215 more_data = True 

216 else: 

217 match = self.boundary_re.search(self.buffer) 

218 if match is not None: 

219 if match.group(1).startswith(b"--"): 

220 self.state = State.EPILOGUE 

221 else: 

222 self.state = State.PART 

223 data_length = match.start() 

224 del_index = match.end() 

225 else: 

226 data_length = del_index = self.last_newline() 

227 more_data = match is None 

228 

229 data = bytes(self.buffer[:data_length]) 

230 del self.buffer[:del_index] 

231 if data or not more_data: 

232 event = Data(data=data, more_data=more_data) 

233 

234 elif self.state == State.EPILOGUE and self.complete: 

235 event = Epilogue(data=bytes(self.buffer)) 

236 del self.buffer[:] 

237 self.state = State.COMPLETE 

238 

239 if self.complete and isinstance(event, NeedData): 

240 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}") 

241 

242 return event 

243 

244 def _parse_headers(self, data: bytes) -> Headers: 

245 headers: List[Tuple[str, str]] = [] 

246 # Merge the continued headers into one line 

247 data = HEADER_CONTINUATION_RE.sub(b" ", data) 

248 # Now there is one header per line 

249 for line in data.splitlines(): 

250 if line.strip() != b"": 

251 name, value = _to_str(line).strip().split(":", 1) 

252 headers.append((name.strip(), value.strip())) 

253 return Headers(headers) 

254 

255 

256class MultipartEncoder: 

257 def __init__(self, boundary: bytes) -> None: 

258 self.boundary = boundary 

259 self.state = State.PREAMBLE 

260 

261 def send_event(self, event: Event) -> bytes: 

262 if isinstance(event, Preamble) and self.state == State.PREAMBLE: 

263 self.state = State.PART 

264 return event.data 

265 elif isinstance(event, (Field, File)) and self.state in { 

266 State.PREAMBLE, 

267 State.PART, 

268 State.DATA, 

269 }: 

270 self.state = State.DATA 

271 data = b"\r\n--" + self.boundary + b"\r\n" 

272 data += b'Content-Disposition: form-data; name="%s"' % _to_bytes(event.name) 

273 if isinstance(event, File): 

274 data += b'; filename="%s"' % _to_bytes(event.filename) 

275 data += b"\r\n" 

276 for name, value in cast(Field, event).headers: 

277 if name.lower() != "content-disposition": 

278 data += _to_bytes(f"{name}: {value}\r\n") 

279 data += b"\r\n" 

280 return data 

281 elif isinstance(event, Data) and self.state == State.DATA: 

282 return event.data 

283 elif isinstance(event, Epilogue): 

284 self.state = State.COMPLETE 

285 return b"\r\n--" + self.boundary + b"--\r\n" + event.data 

286 else: 

287 raise ValueError(f"Cannot generate {event} in state: {self.state}")