Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/sansio/multipart.py: 36%

155 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:03 +0000

1import re 

2from dataclasses import dataclass 

3from enum import auto 

4from enum import Enum 

5from typing import cast 

6from typing import List 

7from typing import Optional 

8from typing import Tuple 

9 

10from .._internal import _to_bytes 

11from .._internal import _to_str 

12from ..datastructures import Headers 

13from ..exceptions import RequestEntityTooLarge 

14from ..http import parse_options_header 

15 

16 

17class Event: 

18 pass 

19 

20 

21@dataclass(frozen=True) 

22class Preamble(Event): 

23 data: bytes 

24 

25 

26@dataclass(frozen=True) 

27class Field(Event): 

28 name: str 

29 headers: Headers 

30 

31 

32@dataclass(frozen=True) 

33class File(Event): 

34 name: str 

35 filename: str 

36 headers: Headers 

37 

38 

39@dataclass(frozen=True) 

40class Data(Event): 

41 data: bytes 

42 more_data: bool 

43 

44 

45@dataclass(frozen=True) 

46class Epilogue(Event): 

47 data: bytes 

48 

49 

50class NeedData(Event): 

51 pass 

52 

53 

54NEED_DATA = NeedData() 

55 

56 

57class State(Enum): 

58 PREAMBLE = auto() 

59 PART = auto() 

60 DATA = auto() 

61 EPILOGUE = auto() 

62 COMPLETE = auto() 

63 

64 

65# Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that 

66# many implementations break this and either use CR or LF alone. 

67LINE_BREAK = b"(?:\r\n|\n|\r)" 

68BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE) 

69LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE) 

70# Header values can be continued via a space or tab after the linebreak, as 

71# per RFC2231 

72HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE) 

73 

74 

75class MultipartDecoder: 

76 """Decodes a multipart message as bytes into Python events. 

77 

78 The part data is returned as available to allow the caller to save 

79 the data from memory to disk, if desired. 

80 """ 

81 

82 def __init__( 

83 self, 

84 boundary: bytes, 

85 max_form_memory_size: Optional[int] = None, 

86 ) -> None: 

87 self.buffer = bytearray() 

88 self.complete = False 

89 self.max_form_memory_size = max_form_memory_size 

90 self.state = State.PREAMBLE 

91 self.boundary = boundary 

92 

93 # Note in the below \h i.e. horizontal whitespace is used 

94 # as [^\S\n\r] as \h isn't supported in python. 

95 

96 # The preamble must end with a boundary where the boundary is 

97 # prefixed by a line break, RFC2046. Except that many 

98 # implementations including Werkzeug's tests omit the line 

99 # break prefix. In addition the first boundary could be the 

100 # epilogue boundary (for empty form-data) hence the matching 

101 # group to understand if it is an epilogue boundary. 

102 self.preamble_re = re.compile( 

103 rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

104 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

105 re.MULTILINE, 

106 ) 

107 # A boundary must include a line break prefix and suffix, and 

108 # may include trailing whitespace. In addition the boundary 

109 # could be the epilogue boundary hence the matching group to 

110 # understand if it is an epilogue boundary. 

111 self.boundary_re = re.compile( 

112 rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)" 

113 % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK), 

114 re.MULTILINE, 

115 ) 

116 

117 def last_newline(self) -> int: 

118 try: 

119 last_nl = self.buffer.rindex(b"\n") 

120 except ValueError: 

121 last_nl = len(self.buffer) 

122 try: 

123 last_cr = self.buffer.rindex(b"\r") 

124 except ValueError: 

125 last_cr = len(self.buffer) 

126 

127 return min(last_nl, last_cr) 

128 

129 def receive_data(self, data: Optional[bytes]) -> None: 

130 if data is None: 

131 self.complete = True 

132 elif ( 

133 self.max_form_memory_size is not None 

134 and len(self.buffer) + len(data) > self.max_form_memory_size 

135 ): 

136 raise RequestEntityTooLarge() 

137 else: 

138 self.buffer.extend(data) 

139 

140 def next_event(self) -> Event: 

141 event: Event = NEED_DATA 

142 

143 if self.state == State.PREAMBLE: 

144 match = self.preamble_re.search(self.buffer) 

145 if match is not None: 

146 if match.group(1).startswith(b"--"): 

147 self.state = State.EPILOGUE 

148 else: 

149 self.state = State.PART 

150 data = bytes(self.buffer[: match.start()]) 

151 del self.buffer[: match.end()] 

152 event = Preamble(data=data) 

153 

154 elif self.state == State.PART: 

155 match = BLANK_LINE_RE.search(self.buffer) 

156 if match is not None: 

157 headers = self._parse_headers(self.buffer[: match.start()]) 

158 del self.buffer[: match.end()] 

159 

160 if "content-disposition" not in headers: 

161 raise ValueError("Missing Content-Disposition header") 

162 

163 disposition, extra = parse_options_header( 

164 headers["content-disposition"] 

165 ) 

166 name = cast(str, extra.get("name")) 

167 filename = extra.get("filename") 

168 if filename is not None: 

169 event = File( 

170 filename=filename, 

171 headers=headers, 

172 name=name, 

173 ) 

174 else: 

175 event = Field( 

176 headers=headers, 

177 name=name, 

178 ) 

179 self.state = State.DATA 

180 

181 elif self.state == State.DATA: 

182 if self.buffer.find(b"--" + self.boundary) == -1: 

183 # No complete boundary in the buffer, but there may be 

184 # a partial boundary at the end. As the boundary 

185 # starts with either a nl or cr find the earliest and 

186 # return up to that as data. 

187 data_length = del_index = self.last_newline() 

188 more_data = True 

189 else: 

190 match = self.boundary_re.search(self.buffer) 

191 if match is not None: 

192 if match.group(1).startswith(b"--"): 

193 self.state = State.EPILOGUE 

194 else: 

195 self.state = State.PART 

196 data_length = match.start() 

197 del_index = match.end() 

198 else: 

199 data_length = del_index = self.last_newline() 

200 more_data = match is None 

201 

202 data = bytes(self.buffer[:data_length]) 

203 del self.buffer[:del_index] 

204 if data or not more_data: 

205 event = Data(data=data, more_data=more_data) 

206 

207 elif self.state == State.EPILOGUE and self.complete: 

208 event = Epilogue(data=bytes(self.buffer)) 

209 del self.buffer[:] 

210 self.state = State.COMPLETE 

211 

212 if self.complete and isinstance(event, NeedData): 

213 raise ValueError(f"Invalid form-data cannot parse beyond {self.state}") 

214 

215 return event 

216 

217 def _parse_headers(self, data: bytes) -> Headers: 

218 headers: List[Tuple[str, str]] = [] 

219 # Merge the continued headers into one line 

220 data = HEADER_CONTINUATION_RE.sub(b" ", data) 

221 # Now there is one header per line 

222 for line in data.splitlines(): 

223 if line.strip() != b"": 

224 name, value = _to_str(line).strip().split(":", 1) 

225 headers.append((name.strip(), value.strip())) 

226 return Headers(headers) 

227 

228 

229class MultipartEncoder: 

230 def __init__(self, boundary: bytes) -> None: 

231 self.boundary = boundary 

232 self.state = State.PREAMBLE 

233 

234 def send_event(self, event: Event) -> bytes: 

235 if isinstance(event, Preamble) and self.state == State.PREAMBLE: 

236 self.state = State.PART 

237 return event.data 

238 elif isinstance(event, (Field, File)) and self.state in { 

239 State.PREAMBLE, 

240 State.PART, 

241 State.DATA, 

242 }: 

243 self.state = State.DATA 

244 data = b"\r\n--" + self.boundary + b"\r\n" 

245 data += b'Content-Disposition: form-data; name="%s"' % _to_bytes(event.name) 

246 if isinstance(event, File): 

247 data += b'; filename="%s"' % _to_bytes(event.filename) 

248 data += b"\r\n" 

249 for name, value in cast(Field, event).headers: 

250 if name.lower() != "content-disposition": 

251 data += _to_bytes(f"{name}: {value}\r\n") 

252 data += b"\r\n" 

253 return data 

254 elif isinstance(event, Data) and self.state == State.DATA: 

255 return event.data 

256 elif isinstance(event, Epilogue): 

257 self.state = State.COMPLETE 

258 return b"\r\n--" + self.boundary + b"--\r\n" + event.data 

259 else: 

260 raise ValueError(f"Cannot generate {event} in state: {self.state}")