Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/email/header.py: 17%

309 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1# Copyright (C) 2002-2007 Python Software Foundation 

2# Author: Ben Gertzfield, Barry Warsaw 

3# Contact: email-sig@python.org 

4 

5"""Header encoding and decoding functionality.""" 

6 

7__all__ = [ 

8 'Header', 

9 'decode_header', 

10 'make_header', 

11 ] 

12 

13import re 

14import binascii 

15 

16import email.quoprimime 

17import email.base64mime 

18 

19from email.errors import HeaderParseError 

20from email import charset as _charset 

21Charset = _charset.Charset 

22 

23NL = '\n' 

24SPACE = ' ' 

25BSPACE = b' ' 

26SPACE8 = ' ' * 8 

27EMPTYSTRING = '' 

28MAXLINELEN = 78 

29FWS = ' \t' 

30 

31USASCII = Charset('us-ascii') 

32UTF8 = Charset('utf-8') 

33 

34# Match encoded-word strings in the form =?charset?q?Hello_World?= 

35ecre = re.compile(r''' 

36 =\? # literal =? 

37 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset 

38 \? # literal ? 

39 (?P<encoding>[qQbB]) # either a "q" or a "b", case insensitive 

40 \? # literal ? 

41 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string 

42 \?= # literal ?= 

43 ''', re.VERBOSE | re.MULTILINE) 

44 

45# Field name regexp, including trailing colon, but not separating whitespace, 

46# according to RFC 2822. Character range is from tilde to exclamation mark. 

47# For use with .match() 

48fcre = re.compile(r'[\041-\176]+:$') 

49 

50# Find a header embedded in a putative header value. Used to check for 

51# header injection attack. 

52_embedded_header = re.compile(r'\n[^ \t]+:') 

53 

54 

55 

56# Helpers 

57_max_append = email.quoprimime._max_append 

58 

59 

60 

61def decode_header(header): 

62 """Decode a message header value without converting charset. 

63 

64 Returns a list of (string, charset) pairs containing each of the decoded 

65 parts of the header. Charset is None for non-encoded parts of the header, 

66 otherwise a lower-case string containing the name of the character set 

67 specified in the encoded string. 

68 

69 header may be a string that may or may not contain RFC2047 encoded words, 

70 or it may be a Header object. 

71 

72 An email.errors.HeaderParseError may be raised when certain decoding error 

73 occurs (e.g. a base64 decoding exception). 

74 """ 

75 # If it is a Header object, we can just return the encoded chunks. 

76 if hasattr(header, '_chunks'): 

77 return [(_charset._encode(string, str(charset)), str(charset)) 

78 for string, charset in header._chunks] 

79 # If no encoding, just return the header with no charset. 

80 if not ecre.search(header): 

81 return [(header, None)] 

82 # First step is to parse all the encoded parts into triplets of the form 

83 # (encoded_string, encoding, charset). For unencoded strings, the last 

84 # two parts will be None. 

85 words = [] 

86 for line in header.splitlines(): 

87 parts = ecre.split(line) 

88 first = True 

89 while parts: 

90 unencoded = parts.pop(0) 

91 if first: 

92 unencoded = unencoded.lstrip() 

93 first = False 

94 if unencoded: 

95 words.append((unencoded, None, None)) 

96 if parts: 

97 charset = parts.pop(0).lower() 

98 encoding = parts.pop(0).lower() 

99 encoded = parts.pop(0) 

100 words.append((encoded, encoding, charset)) 

101 # Now loop over words and remove words that consist of whitespace 

102 # between two encoded strings. 

103 droplist = [] 

104 for n, w in enumerate(words): 

105 if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace(): 

106 droplist.append(n-1) 

107 for d in reversed(droplist): 

108 del words[d] 

109 

110 # The next step is to decode each encoded word by applying the reverse 

111 # base64 or quopri transformation. decoded_words is now a list of the 

112 # form (decoded_word, charset). 

113 decoded_words = [] 

114 for encoded_string, encoding, charset in words: 

115 if encoding is None: 

116 # This is an unencoded word. 

117 decoded_words.append((encoded_string, charset)) 

118 elif encoding == 'q': 

119 word = email.quoprimime.header_decode(encoded_string) 

120 decoded_words.append((word, charset)) 

121 elif encoding == 'b': 

122 paderr = len(encoded_string) % 4 # Postel's law: add missing padding 

123 if paderr: 

124 encoded_string += '==='[:4 - paderr] 

125 try: 

126 word = email.base64mime.decode(encoded_string) 

127 except binascii.Error: 

128 raise HeaderParseError('Base64 decoding error') 

129 else: 

130 decoded_words.append((word, charset)) 

131 else: 

132 raise AssertionError('Unexpected encoding: ' + encoding) 

133 # Now convert all words to bytes and collapse consecutive runs of 

134 # similarly encoded words. 

135 collapsed = [] 

136 last_word = last_charset = None 

137 for word, charset in decoded_words: 

138 if isinstance(word, str): 

139 word = bytes(word, 'raw-unicode-escape') 

140 if last_word is None: 

141 last_word = word 

142 last_charset = charset 

143 elif charset != last_charset: 

144 collapsed.append((last_word, last_charset)) 

145 last_word = word 

146 last_charset = charset 

147 elif last_charset is None: 

148 last_word += BSPACE + word 

149 else: 

150 last_word += word 

151 collapsed.append((last_word, last_charset)) 

152 return collapsed 

153 

154 

155 

156def make_header(decoded_seq, maxlinelen=None, header_name=None, 

157 continuation_ws=' '): 

158 """Create a Header from a sequence of pairs as returned by decode_header() 

159 

160 decode_header() takes a header value string and returns a sequence of 

161 pairs of the format (decoded_string, charset) where charset is the string 

162 name of the character set. 

163 

164 This function takes one of those sequence of pairs and returns a Header 

165 instance. Optional maxlinelen, header_name, and continuation_ws are as in 

166 the Header constructor. 

167 """ 

168 h = Header(maxlinelen=maxlinelen, header_name=header_name, 

169 continuation_ws=continuation_ws) 

170 for s, charset in decoded_seq: 

171 # None means us-ascii but we can simply pass it on to h.append() 

172 if charset is not None and not isinstance(charset, Charset): 

173 charset = Charset(charset) 

174 h.append(s, charset) 

175 return h 

176 

177 

178 

179class Header: 

180 def __init__(self, s=None, charset=None, 

181 maxlinelen=None, header_name=None, 

182 continuation_ws=' ', errors='strict'): 

183 """Create a MIME-compliant header that can contain many character sets. 

184 

185 Optional s is the initial header value. If None, the initial header 

186 value is not set. You can later append to the header with .append() 

187 method calls. s may be a byte string or a Unicode string, but see the 

188 .append() documentation for semantics. 

189 

190 Optional charset serves two purposes: it has the same meaning as the 

191 charset argument to the .append() method. It also sets the default 

192 character set for all subsequent .append() calls that omit the charset 

193 argument. If charset is not provided in the constructor, the us-ascii 

194 charset is used both as s's initial charset and as the default for 

195 subsequent .append() calls. 

196 

197 The maximum line length can be specified explicitly via maxlinelen. For 

198 splitting the first line to a shorter value (to account for the field 

199 header which isn't included in s, e.g. `Subject') pass in the name of 

200 the field in header_name. The default maxlinelen is 78 as recommended 

201 by RFC 2822. 

202 

203 continuation_ws must be RFC 2822 compliant folding whitespace (usually 

204 either a space or a hard tab) which will be prepended to continuation 

205 lines. 

206 

207 errors is passed through to the .append() call. 

208 """ 

209 if charset is None: 

210 charset = USASCII 

211 elif not isinstance(charset, Charset): 

212 charset = Charset(charset) 

213 self._charset = charset 

214 self._continuation_ws = continuation_ws 

215 self._chunks = [] 

216 if s is not None: 

217 self.append(s, charset, errors) 

218 if maxlinelen is None: 

219 maxlinelen = MAXLINELEN 

220 self._maxlinelen = maxlinelen 

221 if header_name is None: 

222 self._headerlen = 0 

223 else: 

224 # Take the separating colon and space into account. 

225 self._headerlen = len(header_name) + 2 

226 

227 def __str__(self): 

228 """Return the string value of the header.""" 

229 self._normalize() 

230 uchunks = [] 

231 lastcs = None 

232 lastspace = None 

233 for string, charset in self._chunks: 

234 # We must preserve spaces between encoded and non-encoded word 

235 # boundaries, which means for us we need to add a space when we go 

236 # from a charset to None/us-ascii, or from None/us-ascii to a 

237 # charset. Only do this for the second and subsequent chunks. 

238 # Don't add a space if the None/us-ascii string already has 

239 # a space (trailing or leading depending on transition) 

240 nextcs = charset 

241 if nextcs == _charset.UNKNOWN8BIT: 

242 original_bytes = string.encode('ascii', 'surrogateescape') 

243 string = original_bytes.decode('ascii', 'replace') 

244 if uchunks: 

245 hasspace = string and self._nonctext(string[0]) 

246 if lastcs not in (None, 'us-ascii'): 

247 if nextcs in (None, 'us-ascii') and not hasspace: 

248 uchunks.append(SPACE) 

249 nextcs = None 

250 elif nextcs not in (None, 'us-ascii') and not lastspace: 

251 uchunks.append(SPACE) 

252 lastspace = string and self._nonctext(string[-1]) 

253 lastcs = nextcs 

254 uchunks.append(string) 

255 return EMPTYSTRING.join(uchunks) 

256 

257 # Rich comparison operators for equality only. BAW: does it make sense to 

258 # have or explicitly disable <, <=, >, >= operators? 

259 def __eq__(self, other): 

260 # other may be a Header or a string. Both are fine so coerce 

261 # ourselves to a unicode (of the unencoded header value), swap the 

262 # args and do another comparison. 

263 return other == str(self) 

264 

265 def append(self, s, charset=None, errors='strict'): 

266 """Append a string to the MIME header. 

267 

268 Optional charset, if given, should be a Charset instance or the name 

269 of a character set (which will be converted to a Charset instance). A 

270 value of None (the default) means that the charset given in the 

271 constructor is used. 

272 

273 s may be a byte string or a Unicode string. If it is a byte string 

274 (i.e. isinstance(s, str) is false), then charset is the encoding of 

275 that byte string, and a UnicodeError will be raised if the string 

276 cannot be decoded with that charset. If s is a Unicode string, then 

277 charset is a hint specifying the character set of the characters in 

278 the string. In either case, when producing an RFC 2822 compliant 

279 header using RFC 2047 rules, the string will be encoded using the 

280 output codec of the charset. If the string cannot be encoded to the 

281 output codec, a UnicodeError will be raised. 

282 

283 Optional `errors' is passed as the errors argument to the decode 

284 call if s is a byte string. 

285 """ 

286 if charset is None: 

287 charset = self._charset 

288 elif not isinstance(charset, Charset): 

289 charset = Charset(charset) 

290 if not isinstance(s, str): 

291 input_charset = charset.input_codec or 'us-ascii' 

292 if input_charset == _charset.UNKNOWN8BIT: 

293 s = s.decode('us-ascii', 'surrogateescape') 

294 else: 

295 s = s.decode(input_charset, errors) 

296 # Ensure that the bytes we're storing can be decoded to the output 

297 # character set, otherwise an early error is raised. 

298 output_charset = charset.output_codec or 'us-ascii' 

299 if output_charset != _charset.UNKNOWN8BIT: 

300 try: 

301 s.encode(output_charset, errors) 

302 except UnicodeEncodeError: 

303 if output_charset!='us-ascii': 

304 raise 

305 charset = UTF8 

306 self._chunks.append((s, charset)) 

307 

308 def _nonctext(self, s): 

309 """True if string s is not a ctext character of RFC822. 

310 """ 

311 return s.isspace() or s in ('(', ')', '\\') 

312 

313 def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'): 

314 r"""Encode a message header into an RFC-compliant format. 

315 

316 There are many issues involved in converting a given string for use in 

317 an email header. Only certain character sets are readable in most 

318 email clients, and as header strings can only contain a subset of 

319 7-bit ASCII, care must be taken to properly convert and encode (with 

320 Base64 or quoted-printable) header strings. In addition, there is a 

321 75-character length limit on any given encoded header field, so 

322 line-wrapping must be performed, even with double-byte character sets. 

323 

324 Optional maxlinelen specifies the maximum length of each generated 

325 line, exclusive of the linesep string. Individual lines may be longer 

326 than maxlinelen if a folding point cannot be found. The first line 

327 will be shorter by the length of the header name plus ": " if a header 

328 name was specified at Header construction time. The default value for 

329 maxlinelen is determined at header construction time. 

330 

331 Optional splitchars is a string containing characters which should be 

332 given extra weight by the splitting algorithm during normal header 

333 wrapping. This is in very rough support of RFC 2822's `higher level 

334 syntactic breaks': split points preceded by a splitchar are preferred 

335 during line splitting, with the characters preferred in the order in 

336 which they appear in the string. Space and tab may be included in the 

337 string to indicate whether preference should be given to one over the 

338 other as a split point when other split chars do not appear in the line 

339 being split. Splitchars does not affect RFC 2047 encoded lines. 

340 

341 Optional linesep is a string to be used to separate the lines of 

342 the value. The default value is the most useful for typical 

343 Python applications, but it can be set to \r\n to produce RFC-compliant 

344 line separators when needed. 

345 """ 

346 self._normalize() 

347 if maxlinelen is None: 

348 maxlinelen = self._maxlinelen 

349 # A maxlinelen of 0 means don't wrap. For all practical purposes, 

350 # choosing a huge number here accomplishes that and makes the 

351 # _ValueFormatter algorithm much simpler. 

352 if maxlinelen == 0: 

353 maxlinelen = 1000000 

354 formatter = _ValueFormatter(self._headerlen, maxlinelen, 

355 self._continuation_ws, splitchars) 

356 lastcs = None 

357 hasspace = lastspace = None 

358 for string, charset in self._chunks: 

359 if hasspace is not None: 

360 hasspace = string and self._nonctext(string[0]) 

361 if lastcs not in (None, 'us-ascii'): 

362 if not hasspace or charset not in (None, 'us-ascii'): 

363 formatter.add_transition() 

364 elif charset not in (None, 'us-ascii') and not lastspace: 

365 formatter.add_transition() 

366 lastspace = string and self._nonctext(string[-1]) 

367 lastcs = charset 

368 hasspace = False 

369 lines = string.splitlines() 

370 if lines: 

371 formatter.feed('', lines[0], charset) 

372 else: 

373 formatter.feed('', '', charset) 

374 for line in lines[1:]: 

375 formatter.newline() 

376 if charset.header_encoding is not None: 

377 formatter.feed(self._continuation_ws, ' ' + line.lstrip(), 

378 charset) 

379 else: 

380 sline = line.lstrip() 

381 fws = line[:len(line)-len(sline)] 

382 formatter.feed(fws, sline, charset) 

383 if len(lines) > 1: 

384 formatter.newline() 

385 if self._chunks: 

386 formatter.add_transition() 

387 value = formatter._str(linesep) 

388 if _embedded_header.search(value): 

389 raise HeaderParseError("header value appears to contain " 

390 "an embedded header: {!r}".format(value)) 

391 return value 

392 

393 def _normalize(self): 

394 # Step 1: Normalize the chunks so that all runs of identical charsets 

395 # get collapsed into a single unicode string. 

396 chunks = [] 

397 last_charset = None 

398 last_chunk = [] 

399 for string, charset in self._chunks: 

400 if charset == last_charset: 

401 last_chunk.append(string) 

402 else: 

403 if last_charset is not None: 

404 chunks.append((SPACE.join(last_chunk), last_charset)) 

405 last_chunk = [string] 

406 last_charset = charset 

407 if last_chunk: 

408 chunks.append((SPACE.join(last_chunk), last_charset)) 

409 self._chunks = chunks 

410 

411 

412 

413class _ValueFormatter: 

414 def __init__(self, headerlen, maxlen, continuation_ws, splitchars): 

415 self._maxlen = maxlen 

416 self._continuation_ws = continuation_ws 

417 self._continuation_ws_len = len(continuation_ws) 

418 self._splitchars = splitchars 

419 self._lines = [] 

420 self._current_line = _Accumulator(headerlen) 

421 

422 def _str(self, linesep): 

423 self.newline() 

424 return linesep.join(self._lines) 

425 

426 def __str__(self): 

427 return self._str(NL) 

428 

429 def newline(self): 

430 end_of_line = self._current_line.pop() 

431 if end_of_line != (' ', ''): 

432 self._current_line.push(*end_of_line) 

433 if len(self._current_line) > 0: 

434 if self._current_line.is_onlyws() and self._lines: 

435 self._lines[-1] += str(self._current_line) 

436 else: 

437 self._lines.append(str(self._current_line)) 

438 self._current_line.reset() 

439 

440 def add_transition(self): 

441 self._current_line.push(' ', '') 

442 

443 def feed(self, fws, string, charset): 

444 # If the charset has no header encoding (i.e. it is an ASCII encoding) 

445 # then we must split the header at the "highest level syntactic break" 

446 # possible. Note that we don't have a lot of smarts about field 

447 # syntax; we just try to break on semi-colons, then commas, then 

448 # whitespace. Eventually, this should be pluggable. 

449 if charset.header_encoding is None: 

450 self._ascii_split(fws, string, self._splitchars) 

451 return 

452 # Otherwise, we're doing either a Base64 or a quoted-printable 

453 # encoding which means we don't need to split the line on syntactic 

454 # breaks. We can basically just find enough characters to fit on the 

455 # current line, minus the RFC 2047 chrome. What makes this trickier 

456 # though is that we have to split at octet boundaries, not character 

457 # boundaries but it's only safe to split at character boundaries so at 

458 # best we can only get close. 

459 encoded_lines = charset.header_encode_lines(string, self._maxlengths()) 

460 # The first element extends the current line, but if it's None then 

461 # nothing more fit on the current line so start a new line. 

462 try: 

463 first_line = encoded_lines.pop(0) 

464 except IndexError: 

465 # There are no encoded lines, so we're done. 

466 return 

467 if first_line is not None: 

468 self._append_chunk(fws, first_line) 

469 try: 

470 last_line = encoded_lines.pop() 

471 except IndexError: 

472 # There was only one line. 

473 return 

474 self.newline() 

475 self._current_line.push(self._continuation_ws, last_line) 

476 # Everything else are full lines in themselves. 

477 for line in encoded_lines: 

478 self._lines.append(self._continuation_ws + line) 

479 

480 def _maxlengths(self): 

481 # The first line's length. 

482 yield self._maxlen - len(self._current_line) 

483 while True: 

484 yield self._maxlen - self._continuation_ws_len 

485 

486 def _ascii_split(self, fws, string, splitchars): 

487 # The RFC 2822 header folding algorithm is simple in principle but 

488 # complex in practice. Lines may be folded any place where "folding 

489 # white space" appears by inserting a linesep character in front of the 

490 # FWS. The complication is that not all spaces or tabs qualify as FWS, 

491 # and we are also supposed to prefer to break at "higher level 

492 # syntactic breaks". We can't do either of these without intimate 

493 # knowledge of the structure of structured headers, which we don't have 

494 # here. So the best we can do here is prefer to break at the specified 

495 # splitchars, and hope that we don't choose any spaces or tabs that 

496 # aren't legal FWS. (This is at least better than the old algorithm, 

497 # where we would sometimes *introduce* FWS after a splitchar, or the 

498 # algorithm before that, where we would turn all white space runs into 

499 # single spaces or tabs.) 

500 parts = re.split("(["+FWS+"]+)", fws+string) 

501 if parts[0]: 

502 parts[:0] = [''] 

503 else: 

504 parts.pop(0) 

505 for fws, part in zip(*[iter(parts)]*2): 

506 self._append_chunk(fws, part) 

507 

508 def _append_chunk(self, fws, string): 

509 self._current_line.push(fws, string) 

510 if len(self._current_line) > self._maxlen: 

511 # Find the best split point, working backward from the end. 

512 # There might be none, on a long first line. 

513 for ch in self._splitchars: 

514 for i in range(self._current_line.part_count()-1, 0, -1): 

515 if ch.isspace(): 

516 fws = self._current_line[i][0] 

517 if fws and fws[0]==ch: 

518 break 

519 prevpart = self._current_line[i-1][1] 

520 if prevpart and prevpart[-1]==ch: 

521 break 

522 else: 

523 continue 

524 break 

525 else: 

526 fws, part = self._current_line.pop() 

527 if self._current_line._initial_size > 0: 

528 # There will be a header, so leave it on a line by itself. 

529 self.newline() 

530 if not fws: 

531 # We don't use continuation_ws here because the whitespace 

532 # after a header should always be a space. 

533 fws = ' ' 

534 self._current_line.push(fws, part) 

535 return 

536 remainder = self._current_line.pop_from(i) 

537 self._lines.append(str(self._current_line)) 

538 self._current_line.reset(remainder) 

539 

540 

541class _Accumulator(list): 

542 

543 def __init__(self, initial_size=0): 

544 self._initial_size = initial_size 

545 super().__init__() 

546 

547 def push(self, fws, string): 

548 self.append((fws, string)) 

549 

550 def pop_from(self, i=0): 

551 popped = self[i:] 

552 self[i:] = [] 

553 return popped 

554 

555 def pop(self): 

556 if self.part_count()==0: 

557 return ('', '') 

558 return super().pop() 

559 

560 def __len__(self): 

561 return sum((len(fws)+len(part) for fws, part in self), 

562 self._initial_size) 

563 

564 def __str__(self): 

565 return EMPTYSTRING.join((EMPTYSTRING.join((fws, part)) 

566 for fws, part in self)) 

567 

568 def reset(self, startval=None): 

569 if startval is None: 

570 startval = [] 

571 self[:] = startval 

572 self._initial_size = 0 

573 

574 def is_onlyws(self): 

575 return self._initial_size==0 and (not self or str(self).isspace()) 

576 

577 def part_count(self): 

578 return super().__len__()