Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/future/backports/email/header.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

319 statements  

1# Copyright (C) 2002-2007 Python Software Foundation 

2# Author: Ben Gertzfield, Barry Warsaw 

3# Contact: email-sig@python.org 

4 

5"""Header encoding and decoding functionality.""" 

6from __future__ import unicode_literals 

7from __future__ import division 

8from __future__ import absolute_import 

9from future.builtins import bytes, range, str, super, zip 

10 

11__all__ = [ 

12 'Header', 

13 'decode_header', 

14 'make_header', 

15 ] 

16 

17import re 

18import binascii 

19 

20from future.backports import email 

21from future.backports.email import base64mime 

22from future.backports.email.errors import HeaderParseError 

23import future.backports.email.charset as _charset 

24 

25# Helpers 

26from future.backports.email.quoprimime import _max_append, header_decode 

27 

28Charset = _charset.Charset 

29 

30NL = '\n' 

31SPACE = ' ' 

32BSPACE = b' ' 

33SPACE8 = ' ' * 8 

34EMPTYSTRING = '' 

35MAXLINELEN = 78 

36FWS = ' \t' 

37 

38USASCII = Charset('us-ascii') 

39UTF8 = Charset('utf-8') 

40 

41# Match encoded-word strings in the form =?charset?q?Hello_World?= 

42ecre = re.compile(r''' 

43 =\? # literal =? 

44 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset 

45 \? # literal ? 

46 (?P<encoding>[qb]) # either a "q" or a "b", case insensitive 

47 \? # literal ? 

48 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string 

49 \?= # literal ?= 

50 ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE) 

51 

52# Field name regexp, including trailing colon, but not separating whitespace, 

53# according to RFC 2822. Character range is from tilde to exclamation mark. 

54# For use with .match() 

55fcre = re.compile(r'[\041-\176]+:$') 

56 

57# Find a header embedded in a putative header value. Used to check for 

58# header injection attack. 

59_embeded_header = re.compile(r'\n[^ \t]+:') 

60 

61 

62def decode_header(header): 

63 """Decode a message header value without converting charset. 

64 

65 Returns a list of (string, charset) pairs containing each of the decoded 

66 parts of the header. Charset is None for non-encoded parts of the header, 

67 otherwise a lower-case string containing the name of the character set 

68 specified in the encoded string. 

69 

70 header may be a string that may or may not contain RFC2047 encoded words, 

71 or it may be a Header object. 

72 

73 An email.errors.HeaderParseError may be raised when certain decoding error 

74 occurs (e.g. a base64 decoding exception). 

75 """ 

76 # If it is a Header object, we can just return the encoded chunks. 

77 if hasattr(header, '_chunks'): 

78 return [(_charset._encode(string, str(charset)), str(charset)) 

79 for string, charset in header._chunks] 

80 # If no encoding, just return the header with no charset. 

81 if not ecre.search(header): 

82 return [(header, None)] 

83 # First step is to parse all the encoded parts into triplets of the form 

84 # (encoded_string, encoding, charset). For unencoded strings, the last 

85 # two parts will be None. 

86 words = [] 

87 for line in header.splitlines(): 

88 parts = ecre.split(line) 

89 first = True 

90 while parts: 

91 unencoded = parts.pop(0) 

92 if first: 

93 unencoded = unencoded.lstrip() 

94 first = False 

95 if unencoded: 

96 words.append((unencoded, None, None)) 

97 if parts: 

98 charset = parts.pop(0).lower() 

99 encoding = parts.pop(0).lower() 

100 encoded = parts.pop(0) 

101 words.append((encoded, encoding, charset)) 

102 # Now loop over words and remove words that consist of whitespace 

103 # between two encoded strings. 

104 import sys 

105 droplist = [] 

106 for n, w in enumerate(words): 

107 if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace(): 

108 droplist.append(n-1) 

109 for d in reversed(droplist): 

110 del words[d] 

111 

112 # The next step is to decode each encoded word by applying the reverse 

113 # base64 or quopri transformation. decoded_words is now a list of the 

114 # form (decoded_word, charset). 

115 decoded_words = [] 

116 for encoded_string, encoding, charset in words: 

117 if encoding is None: 

118 # This is an unencoded word. 

119 decoded_words.append((encoded_string, charset)) 

120 elif encoding == 'q': 

121 word = header_decode(encoded_string) 

122 decoded_words.append((word, charset)) 

123 elif encoding == 'b': 

124 paderr = len(encoded_string) % 4 # Postel's law: add missing padding 

125 if paderr: 

126 encoded_string += '==='[:4 - paderr] 

127 try: 

128 word = base64mime.decode(encoded_string) 

129 except binascii.Error: 

130 raise HeaderParseError('Base64 decoding error') 

131 else: 

132 decoded_words.append((word, charset)) 

133 else: 

134 raise AssertionError('Unexpected encoding: ' + encoding) 

135 # Now convert all words to bytes and collapse consecutive runs of 

136 # similarly encoded words. 

137 collapsed = [] 

138 last_word = last_charset = None 

139 for word, charset in decoded_words: 

140 if isinstance(word, str): 

141 word = bytes(word, 'raw-unicode-escape') 

142 if last_word is None: 

143 last_word = word 

144 last_charset = charset 

145 elif charset != last_charset: 

146 collapsed.append((last_word, last_charset)) 

147 last_word = word 

148 last_charset = charset 

149 elif last_charset is None: 

150 last_word += BSPACE + word 

151 else: 

152 last_word += word 

153 collapsed.append((last_word, last_charset)) 

154 return collapsed 

155 

156 

157def make_header(decoded_seq, maxlinelen=None, header_name=None, 

158 continuation_ws=' '): 

159 """Create a Header from a sequence of pairs as returned by decode_header() 

160 

161 decode_header() takes a header value string and returns a sequence of 

162 pairs of the format (decoded_string, charset) where charset is the string 

163 name of the character set. 

164 

165 This function takes one of those sequence of pairs and returns a Header 

166 instance. Optional maxlinelen, header_name, and continuation_ws are as in 

167 the Header constructor. 

168 """ 

169 h = Header(maxlinelen=maxlinelen, header_name=header_name, 

170 continuation_ws=continuation_ws) 

171 for s, charset in decoded_seq: 

172 # None means us-ascii but we can simply pass it on to h.append() 

173 if charset is not None and not isinstance(charset, Charset): 

174 charset = Charset(charset) 

175 h.append(s, charset) 

176 return h 

177 

178 

179class Header(object): 

180 def __init__(self, s=None, charset=None, 

181 maxlinelen=None, header_name=None, 

182 continuation_ws=' ', errors='strict'): 

183 """Create a MIME-compliant header that can contain many character sets. 

184 

185 Optional s is the initial header value. If None, the initial header 

186 value is not set. You can later append to the header with .append() 

187 method calls. s may be a byte string or a Unicode string, but see the 

188 .append() documentation for semantics. 

189 

190 Optional charset serves two purposes: it has the same meaning as the 

191 charset argument to the .append() method. It also sets the default 

192 character set for all subsequent .append() calls that omit the charset 

193 argument. If charset is not provided in the constructor, the us-ascii 

194 charset is used both as s's initial charset and as the default for 

195 subsequent .append() calls. 

196 

197 The maximum line length can be specified explicitly via maxlinelen. For 

198 splitting the first line to a shorter value (to account for the field 

199 header which isn't included in s, e.g. `Subject') pass in the name of 

200 the field in header_name. The default maxlinelen is 78 as recommended 

201 by RFC 2822. 

202 

203 continuation_ws must be RFC 2822 compliant folding whitespace (usually 

204 either a space or a hard tab) which will be prepended to continuation 

205 lines. 

206 

207 errors is passed through to the .append() call. 

208 """ 

209 if charset is None: 

210 charset = USASCII 

211 elif not isinstance(charset, Charset): 

212 charset = Charset(charset) 

213 self._charset = charset 

214 self._continuation_ws = continuation_ws 

215 self._chunks = [] 

216 if s is not None: 

217 self.append(s, charset, errors) 

218 if maxlinelen is None: 

219 maxlinelen = MAXLINELEN 

220 self._maxlinelen = maxlinelen 

221 if header_name is None: 

222 self._headerlen = 0 

223 else: 

224 # Take the separating colon and space into account. 

225 self._headerlen = len(header_name) + 2 

226 

227 def __str__(self): 

228 """Return the string value of the header.""" 

229 self._normalize() 

230 uchunks = [] 

231 lastcs = None 

232 lastspace = None 

233 for string, charset in self._chunks: 

234 # We must preserve spaces between encoded and non-encoded word 

235 # boundaries, which means for us we need to add a space when we go 

236 # from a charset to None/us-ascii, or from None/us-ascii to a 

237 # charset. Only do this for the second and subsequent chunks. 

238 # Don't add a space if the None/us-ascii string already has 

239 # a space (trailing or leading depending on transition) 

240 nextcs = charset 

241 if nextcs == _charset.UNKNOWN8BIT: 

242 original_bytes = string.encode('ascii', 'surrogateescape') 

243 string = original_bytes.decode('ascii', 'replace') 

244 if uchunks: 

245 hasspace = string and self._nonctext(string[0]) 

246 if lastcs not in (None, 'us-ascii'): 

247 if nextcs in (None, 'us-ascii') and not hasspace: 

248 uchunks.append(SPACE) 

249 nextcs = None 

250 elif nextcs not in (None, 'us-ascii') and not lastspace: 

251 uchunks.append(SPACE) 

252 lastspace = string and self._nonctext(string[-1]) 

253 lastcs = nextcs 

254 uchunks.append(string) 

255 return EMPTYSTRING.join(uchunks) 

256 

257 # Rich comparison operators for equality only. BAW: does it make sense to 

258 # have or explicitly disable <, <=, >, >= operators? 

259 def __eq__(self, other): 

260 # other may be a Header or a string. Both are fine so coerce 

261 # ourselves to a unicode (of the unencoded header value), swap the 

262 # args and do another comparison. 

263 return other == str(self) 

264 

265 def __ne__(self, other): 

266 return not self == other 

267 

268 def append(self, s, charset=None, errors='strict'): 

269 """Append a string to the MIME header. 

270 

271 Optional charset, if given, should be a Charset instance or the name 

272 of a character set (which will be converted to a Charset instance). A 

273 value of None (the default) means that the charset given in the 

274 constructor is used. 

275 

276 s may be a byte string or a Unicode string. If it is a byte string 

277 (i.e. isinstance(s, str) is false), then charset is the encoding of 

278 that byte string, and a UnicodeError will be raised if the string 

279 cannot be decoded with that charset. If s is a Unicode string, then 

280 charset is a hint specifying the character set of the characters in 

281 the string. In either case, when producing an RFC 2822 compliant 

282 header using RFC 2047 rules, the string will be encoded using the 

283 output codec of the charset. If the string cannot be encoded to the 

284 output codec, a UnicodeError will be raised. 

285 

286 Optional `errors' is passed as the errors argument to the decode 

287 call if s is a byte string. 

288 """ 

289 if charset is None: 

290 charset = self._charset 

291 elif not isinstance(charset, Charset): 

292 charset = Charset(charset) 

293 if not isinstance(s, str): 

294 input_charset = charset.input_codec or 'us-ascii' 

295 if input_charset == _charset.UNKNOWN8BIT: 

296 s = s.decode('us-ascii', 'surrogateescape') 

297 else: 

298 s = s.decode(input_charset, errors) 

299 # Ensure that the bytes we're storing can be decoded to the output 

300 # character set, otherwise an early error is raised. 

301 output_charset = charset.output_codec or 'us-ascii' 

302 if output_charset != _charset.UNKNOWN8BIT: 

303 try: 

304 s.encode(output_charset, errors) 

305 except UnicodeEncodeError: 

306 if output_charset!='us-ascii': 

307 raise 

308 charset = UTF8 

309 self._chunks.append((s, charset)) 

310 

311 def _nonctext(self, s): 

312 """True if string s is not a ctext character of RFC822. 

313 """ 

314 return s.isspace() or s in ('(', ')', '\\') 

315 

316 def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'): 

317 r"""Encode a message header into an RFC-compliant format. 

318 

319 There are many issues involved in converting a given string for use in 

320 an email header. Only certain character sets are readable in most 

321 email clients, and as header strings can only contain a subset of 

322 7-bit ASCII, care must be taken to properly convert and encode (with 

323 Base64 or quoted-printable) header strings. In addition, there is a 

324 75-character length limit on any given encoded header field, so 

325 line-wrapping must be performed, even with double-byte character sets. 

326 

327 Optional maxlinelen specifies the maximum length of each generated 

328 line, exclusive of the linesep string. Individual lines may be longer 

329 than maxlinelen if a folding point cannot be found. The first line 

330 will be shorter by the length of the header name plus ": " if a header 

331 name was specified at Header construction time. The default value for 

332 maxlinelen is determined at header construction time. 

333 

334 Optional splitchars is a string containing characters which should be 

335 given extra weight by the splitting algorithm during normal header 

336 wrapping. This is in very rough support of RFC 2822's `higher level 

337 syntactic breaks': split points preceded by a splitchar are preferred 

338 during line splitting, with the characters preferred in the order in 

339 which they appear in the string. Space and tab may be included in the 

340 string to indicate whether preference should be given to one over the 

341 other as a split point when other split chars do not appear in the line 

342 being split. Splitchars does not affect RFC 2047 encoded lines. 

343 

344 Optional linesep is a string to be used to separate the lines of 

345 the value. The default value is the most useful for typical 

346 Python applications, but it can be set to \r\n to produce RFC-compliant 

347 line separators when needed. 

348 """ 

349 self._normalize() 

350 if maxlinelen is None: 

351 maxlinelen = self._maxlinelen 

352 # A maxlinelen of 0 means don't wrap. For all practical purposes, 

353 # choosing a huge number here accomplishes that and makes the 

354 # _ValueFormatter algorithm much simpler. 

355 if maxlinelen == 0: 

356 maxlinelen = 1000000 

357 formatter = _ValueFormatter(self._headerlen, maxlinelen, 

358 self._continuation_ws, splitchars) 

359 lastcs = None 

360 hasspace = lastspace = None 

361 for string, charset in self._chunks: 

362 if hasspace is not None: 

363 hasspace = string and self._nonctext(string[0]) 

364 import sys 

365 if lastcs not in (None, 'us-ascii'): 

366 if not hasspace or charset not in (None, 'us-ascii'): 

367 formatter.add_transition() 

368 elif charset not in (None, 'us-ascii') and not lastspace: 

369 formatter.add_transition() 

370 lastspace = string and self._nonctext(string[-1]) 

371 lastcs = charset 

372 hasspace = False 

373 lines = string.splitlines() 

374 if lines: 

375 formatter.feed('', lines[0], charset) 

376 else: 

377 formatter.feed('', '', charset) 

378 for line in lines[1:]: 

379 formatter.newline() 

380 if charset.header_encoding is not None: 

381 formatter.feed(self._continuation_ws, ' ' + line.lstrip(), 

382 charset) 

383 else: 

384 sline = line.lstrip() 

385 fws = line[:len(line)-len(sline)] 

386 formatter.feed(fws, sline, charset) 

387 if len(lines) > 1: 

388 formatter.newline() 

389 if self._chunks: 

390 formatter.add_transition() 

391 value = formatter._str(linesep) 

392 if _embeded_header.search(value): 

393 raise HeaderParseError("header value appears to contain " 

394 "an embedded header: {!r}".format(value)) 

395 return value 

396 

397 def _normalize(self): 

398 # Step 1: Normalize the chunks so that all runs of identical charsets 

399 # get collapsed into a single unicode string. 

400 chunks = [] 

401 last_charset = None 

402 last_chunk = [] 

403 for string, charset in self._chunks: 

404 if charset == last_charset: 

405 last_chunk.append(string) 

406 else: 

407 if last_charset is not None: 

408 chunks.append((SPACE.join(last_chunk), last_charset)) 

409 last_chunk = [string] 

410 last_charset = charset 

411 if last_chunk: 

412 chunks.append((SPACE.join(last_chunk), last_charset)) 

413 self._chunks = chunks 

414 

415 

416class _ValueFormatter(object): 

417 def __init__(self, headerlen, maxlen, continuation_ws, splitchars): 

418 self._maxlen = maxlen 

419 self._continuation_ws = continuation_ws 

420 self._continuation_ws_len = len(continuation_ws) 

421 self._splitchars = splitchars 

422 self._lines = [] 

423 self._current_line = _Accumulator(headerlen) 

424 

425 def _str(self, linesep): 

426 self.newline() 

427 return linesep.join(self._lines) 

428 

429 def __str__(self): 

430 return self._str(NL) 

431 

432 def newline(self): 

433 end_of_line = self._current_line.pop() 

434 if end_of_line != (' ', ''): 

435 self._current_line.push(*end_of_line) 

436 if len(self._current_line) > 0: 

437 if self._current_line.is_onlyws(): 

438 self._lines[-1] += str(self._current_line) 

439 else: 

440 self._lines.append(str(self._current_line)) 

441 self._current_line.reset() 

442 

443 def add_transition(self): 

444 self._current_line.push(' ', '') 

445 

446 def feed(self, fws, string, charset): 

447 # If the charset has no header encoding (i.e. it is an ASCII encoding) 

448 # then we must split the header at the "highest level syntactic break" 

449 # possible. Note that we don't have a lot of smarts about field 

450 # syntax; we just try to break on semi-colons, then commas, then 

451 # whitespace. Eventually, this should be pluggable. 

452 if charset.header_encoding is None: 

453 self._ascii_split(fws, string, self._splitchars) 

454 return 

455 # Otherwise, we're doing either a Base64 or a quoted-printable 

456 # encoding which means we don't need to split the line on syntactic 

457 # breaks. We can basically just find enough characters to fit on the 

458 # current line, minus the RFC 2047 chrome. What makes this trickier 

459 # though is that we have to split at octet boundaries, not character 

460 # boundaries but it's only safe to split at character boundaries so at 

461 # best we can only get close. 

462 encoded_lines = charset.header_encode_lines(string, self._maxlengths()) 

463 # The first element extends the current line, but if it's None then 

464 # nothing more fit on the current line so start a new line. 

465 try: 

466 first_line = encoded_lines.pop(0) 

467 except IndexError: 

468 # There are no encoded lines, so we're done. 

469 return 

470 if first_line is not None: 

471 self._append_chunk(fws, first_line) 

472 try: 

473 last_line = encoded_lines.pop() 

474 except IndexError: 

475 # There was only one line. 

476 return 

477 self.newline() 

478 self._current_line.push(self._continuation_ws, last_line) 

479 # Everything else are full lines in themselves. 

480 for line in encoded_lines: 

481 self._lines.append(self._continuation_ws + line) 

482 

483 def _maxlengths(self): 

484 # The first line's length. 

485 yield self._maxlen - len(self._current_line) 

486 while True: 

487 yield self._maxlen - self._continuation_ws_len 

488 

489 def _ascii_split(self, fws, string, splitchars): 

490 # The RFC 2822 header folding algorithm is simple in principle but 

491 # complex in practice. Lines may be folded any place where "folding 

492 # white space" appears by inserting a linesep character in front of the 

493 # FWS. The complication is that not all spaces or tabs qualify as FWS, 

494 # and we are also supposed to prefer to break at "higher level 

495 # syntactic breaks". We can't do either of these without intimate 

496 # knowledge of the structure of structured headers, which we don't have 

497 # here. So the best we can do here is prefer to break at the specified 

498 # splitchars, and hope that we don't choose any spaces or tabs that 

499 # aren't legal FWS. (This is at least better than the old algorithm, 

500 # where we would sometimes *introduce* FWS after a splitchar, or the 

501 # algorithm before that, where we would turn all white space runs into 

502 # single spaces or tabs.) 

503 parts = re.split("(["+FWS+"]+)", fws+string) 

504 if parts[0]: 

505 parts[:0] = [''] 

506 else: 

507 parts.pop(0) 

508 for fws, part in zip(*[iter(parts)]*2): 

509 self._append_chunk(fws, part) 

510 

511 def _append_chunk(self, fws, string): 

512 self._current_line.push(fws, string) 

513 if len(self._current_line) > self._maxlen: 

514 # Find the best split point, working backward from the end. 

515 # There might be none, on a long first line. 

516 for ch in self._splitchars: 

517 for i in range(self._current_line.part_count()-1, 0, -1): 

518 if ch.isspace(): 

519 fws = self._current_line[i][0] 

520 if fws and fws[0]==ch: 

521 break 

522 prevpart = self._current_line[i-1][1] 

523 if prevpart and prevpart[-1]==ch: 

524 break 

525 else: 

526 continue 

527 break 

528 else: 

529 fws, part = self._current_line.pop() 

530 if self._current_line._initial_size > 0: 

531 # There will be a header, so leave it on a line by itself. 

532 self.newline() 

533 if not fws: 

534 # We don't use continuation_ws here because the whitespace 

535 # after a header should always be a space. 

536 fws = ' ' 

537 self._current_line.push(fws, part) 

538 return 

539 remainder = self._current_line.pop_from(i) 

540 self._lines.append(str(self._current_line)) 

541 self._current_line.reset(remainder) 

542 

543 

544class _Accumulator(list): 

545 

546 def __init__(self, initial_size=0): 

547 self._initial_size = initial_size 

548 super().__init__() 

549 

550 def push(self, fws, string): 

551 self.append((fws, string)) 

552 

553 def pop_from(self, i=0): 

554 popped = self[i:] 

555 self[i:] = [] 

556 return popped 

557 

558 def pop(self): 

559 if self.part_count()==0: 

560 return ('', '') 

561 return super().pop() 

562 

563 def __len__(self): 

564 return sum((len(fws)+len(part) for fws, part in self), 

565 self._initial_size) 

566 

567 def __str__(self): 

568 return EMPTYSTRING.join((EMPTYSTRING.join((fws, part)) 

569 for fws, part in self)) 

570 

571 def reset(self, startval=None): 

572 if startval is None: 

573 startval = [] 

574 self[:] = startval 

575 self._initial_size = 0 

576 

577 def is_onlyws(self): 

578 return self._initial_size==0 and (not self or str(self).isspace()) 

579 

580 def part_count(self): 

581 return super().__len__()