Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/core/mail/message.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

268 statements  

1import mimetypes 

2from collections import namedtuple 

3from email import charset as Charset 

4from email import encoders as Encoders 

5from email import generator, message_from_string 

6from email.errors import HeaderParseError 

7from email.header import Header 

8from email.headerregistry import Address, parser 

9from email.message import Message 

10from email.mime.base import MIMEBase 

11from email.mime.message import MIMEMessage 

12from email.mime.multipart import MIMEMultipart 

13from email.mime.text import MIMEText 

14from email.utils import formataddr, formatdate, getaddresses, make_msgid 

15from io import BytesIO, StringIO 

16from pathlib import Path 

17 

18from django.conf import settings 

19from django.core.mail.utils import DNS_NAME 

20from django.utils.encoding import force_str, punycode 

21 

22# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from 

23# some spam filters. 

24utf8_charset = Charset.Charset("utf-8") 

25utf8_charset.body_encoding = None # Python defaults to BASE64 

26utf8_charset_qp = Charset.Charset("utf-8") 

27utf8_charset_qp.body_encoding = Charset.QP 

28 

29# Default MIME type to use on attachments (if it is not explicitly given 

30# and cannot be guessed). 

31DEFAULT_ATTACHMENT_MIME_TYPE = "application/octet-stream" 

32 

33RFC5322_EMAIL_LINE_LENGTH_LIMIT = 998 

34 

35 

36class BadHeaderError(ValueError): 

37 pass 

38 

39 

40# Header names that contain structured address data (RFC 5322). 

41ADDRESS_HEADERS = { 

42 "from", 

43 "sender", 

44 "reply-to", 

45 "to", 

46 "cc", 

47 "bcc", 

48 "resent-from", 

49 "resent-sender", 

50 "resent-to", 

51 "resent-cc", 

52 "resent-bcc", 

53} 

54 

55 

56def forbid_multi_line_headers(name, val, encoding): 

57 """Forbid multi-line headers to prevent header injection.""" 

58 encoding = encoding or settings.DEFAULT_CHARSET 

59 val = str(val) # val may be lazy 

60 if "\n" in val or "\r" in val: 

61 raise BadHeaderError( 

62 "Header values can't contain newlines (got %r for header %r)" % (val, name) 

63 ) 

64 try: 

65 val.encode("ascii") 

66 except UnicodeEncodeError: 

67 if name.lower() in ADDRESS_HEADERS: 

68 val = ", ".join( 

69 sanitize_address(addr, encoding) for addr in getaddresses((val,)) 

70 ) 

71 else: 

72 val = Header(val, encoding).encode() 

73 else: 

74 if name.lower() == "subject": 

75 val = Header(val).encode() 

76 return name, val 

77 

78 

79def sanitize_address(addr, encoding): 

80 """ 

81 Format a pair of (name, address) or an email address string. 

82 """ 

83 address = None 

84 if not isinstance(addr, tuple): 

85 addr = force_str(addr) 

86 try: 

87 token, rest = parser.get_mailbox(addr) 

88 except (HeaderParseError, ValueError, IndexError): 

89 raise ValueError('Invalid address "%s"' % addr) 

90 else: 

91 if rest: 

92 # The entire email address must be parsed. 

93 raise ValueError( 

94 'Invalid address; only %s could be parsed from "%s"' % (token, addr) 

95 ) 

96 nm = token.display_name or "" 

97 localpart = token.local_part 

98 domain = token.domain or "" 

99 else: 

100 nm, address = addr 

101 if "@" not in address: 

102 raise ValueError(f'Invalid address "{address}"') 

103 localpart, domain = address.rsplit("@", 1) 

104 

105 address_parts = nm + localpart + domain 

106 if "\n" in address_parts or "\r" in address_parts: 

107 raise ValueError("Invalid address; address parts cannot contain newlines.") 

108 

109 # Avoid UTF-8 encode, if it's possible. 

110 try: 

111 nm.encode("ascii") 

112 nm = Header(nm).encode() 

113 except UnicodeEncodeError: 

114 nm = Header(nm, encoding).encode() 

115 try: 

116 localpart.encode("ascii") 

117 except UnicodeEncodeError: 

118 localpart = Header(localpart, encoding).encode() 

119 domain = punycode(domain) 

120 

121 parsed_address = Address(username=localpart, domain=domain) 

122 return formataddr((nm, parsed_address.addr_spec)) 

123 

124 

125class MIMEMixin: 

126 def as_string(self, unixfrom=False, linesep="\n"): 

127 """Return the entire formatted message as a string. 

128 Optional `unixfrom' when True, means include the Unix From_ envelope 

129 header. 

130 

131 This overrides the default as_string() implementation to not mangle 

132 lines that begin with 'From '. See bug #13433 for details. 

133 """ 

134 fp = StringIO() 

135 g = generator.Generator(fp, mangle_from_=False) 

136 g.flatten(self, unixfrom=unixfrom, linesep=linesep) 

137 return fp.getvalue() 

138 

139 def as_bytes(self, unixfrom=False, linesep="\n"): 

140 """Return the entire formatted message as bytes. 

141 Optional `unixfrom' when True, means include the Unix From_ envelope 

142 header. 

143 

144 This overrides the default as_bytes() implementation to not mangle 

145 lines that begin with 'From '. See bug #13433 for details. 

146 """ 

147 fp = BytesIO() 

148 g = generator.BytesGenerator(fp, mangle_from_=False) 

149 g.flatten(self, unixfrom=unixfrom, linesep=linesep) 

150 return fp.getvalue() 

151 

152 

153class SafeMIMEMessage(MIMEMixin, MIMEMessage): 

154 def __setitem__(self, name, val): 

155 # message/rfc822 attachments must be ASCII 

156 name, val = forbid_multi_line_headers(name, val, "ascii") 

157 MIMEMessage.__setitem__(self, name, val) 

158 

159 

160class SafeMIMEText(MIMEMixin, MIMEText): 

161 def __init__(self, _text, _subtype="plain", _charset=None): 

162 self.encoding = _charset 

163 MIMEText.__init__(self, _text, _subtype=_subtype, _charset=_charset) 

164 

165 def __setitem__(self, name, val): 

166 name, val = forbid_multi_line_headers(name, val, self.encoding) 

167 MIMEText.__setitem__(self, name, val) 

168 

169 def set_payload(self, payload, charset=None): 

170 if charset == "utf-8" and not isinstance(charset, Charset.Charset): 

171 has_long_lines = any( 

172 len(line.encode(errors="surrogateescape")) 

173 > RFC5322_EMAIL_LINE_LENGTH_LIMIT 

174 for line in payload.splitlines() 

175 ) 

176 # Quoted-Printable encoding has the side effect of shortening long 

177 # lines, if any (#22561). 

178 charset = utf8_charset_qp if has_long_lines else utf8_charset 

179 MIMEText.set_payload(self, payload, charset=charset) 

180 

181 

182class SafeMIMEMultipart(MIMEMixin, MIMEMultipart): 

183 def __init__( 

184 self, _subtype="mixed", boundary=None, _subparts=None, encoding=None, **_params 

185 ): 

186 self.encoding = encoding 

187 MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params) 

188 

189 def __setitem__(self, name, val): 

190 name, val = forbid_multi_line_headers(name, val, self.encoding) 

191 MIMEMultipart.__setitem__(self, name, val) 

192 

193 

194EmailAlternative = namedtuple("Alternative", ["content", "mimetype"]) 

195EmailAttachment = namedtuple("Attachment", ["filename", "content", "mimetype"]) 

196 

197 

198class EmailMessage: 

199 """A container for email information.""" 

200 

201 content_subtype = "plain" 

202 mixed_subtype = "mixed" 

203 encoding = None # None => use settings default 

204 

205 def __init__( 

206 self, 

207 subject="", 

208 body="", 

209 from_email=None, 

210 to=None, 

211 bcc=None, 

212 connection=None, 

213 attachments=None, 

214 headers=None, 

215 cc=None, 

216 reply_to=None, 

217 ): 

218 """ 

219 Initialize a single email message (which can be sent to multiple 

220 recipients). 

221 """ 

222 if to: 

223 if isinstance(to, str): 

224 raise TypeError('"to" argument must be a list or tuple') 

225 self.to = list(to) 

226 else: 

227 self.to = [] 

228 if cc: 

229 if isinstance(cc, str): 

230 raise TypeError('"cc" argument must be a list or tuple') 

231 self.cc = list(cc) 

232 else: 

233 self.cc = [] 

234 if bcc: 

235 if isinstance(bcc, str): 

236 raise TypeError('"bcc" argument must be a list or tuple') 

237 self.bcc = list(bcc) 

238 else: 

239 self.bcc = [] 

240 if reply_to: 

241 if isinstance(reply_to, str): 

242 raise TypeError('"reply_to" argument must be a list or tuple') 

243 self.reply_to = list(reply_to) 

244 else: 

245 self.reply_to = [] 

246 self.from_email = from_email or settings.DEFAULT_FROM_EMAIL 

247 self.subject = subject 

248 self.body = body or "" 

249 self.attachments = [] 

250 if attachments: 

251 for attachment in attachments: 

252 if isinstance(attachment, MIMEBase): 

253 self.attach(attachment) 

254 else: 

255 self.attach(*attachment) 

256 self.extra_headers = headers or {} 

257 self.connection = connection 

258 

259 def get_connection(self, fail_silently=False): 

260 from django.core.mail import get_connection 

261 

262 if not self.connection: 

263 self.connection = get_connection(fail_silently=fail_silently) 

264 return self.connection 

265 

266 def message(self): 

267 encoding = self.encoding or settings.DEFAULT_CHARSET 

268 msg = SafeMIMEText(self.body, self.content_subtype, encoding) 

269 msg = self._create_message(msg) 

270 msg["Subject"] = self.subject 

271 msg["From"] = self.extra_headers.get("From", self.from_email) 

272 self._set_list_header_if_not_empty(msg, "To", self.to) 

273 self._set_list_header_if_not_empty(msg, "Cc", self.cc) 

274 self._set_list_header_if_not_empty(msg, "Reply-To", self.reply_to) 

275 

276 # Email header names are case-insensitive (RFC 2045), so we have to 

277 # accommodate that when doing comparisons. 

278 header_names = [key.lower() for key in self.extra_headers] 

279 if "date" not in header_names: 

280 # formatdate() uses stdlib methods to format the date, which use 

281 # the stdlib/OS concept of a timezone, however, Django sets the 

282 # TZ environment variable based on the TIME_ZONE setting which 

283 # will get picked up by formatdate(). 

284 msg["Date"] = formatdate(localtime=settings.EMAIL_USE_LOCALTIME) 

285 if "message-id" not in header_names: 

286 # Use cached DNS_NAME for performance 

287 msg["Message-ID"] = make_msgid(domain=DNS_NAME) 

288 for name, value in self.extra_headers.items(): 

289 # Avoid headers handled above. 

290 if name.lower() not in {"from", "to", "cc", "reply-to"}: 

291 msg[name] = value 

292 return msg 

293 

294 def recipients(self): 

295 """ 

296 Return a list of all recipients of the email (includes direct 

297 addressees as well as Cc and Bcc entries). 

298 """ 

299 return [email for email in (self.to + self.cc + self.bcc) if email] 

300 

301 def send(self, fail_silently=False): 

302 """Send the email message.""" 

303 if not self.recipients(): 

304 # Don't bother creating the network connection if there's nobody to 

305 # send to. 

306 return 0 

307 return self.get_connection(fail_silently).send_messages([self]) 

308 

309 def attach(self, filename=None, content=None, mimetype=None): 

310 """ 

311 Attach a file with the given filename and content. The filename can 

312 be omitted and the mimetype is guessed, if not provided. 

313 

314 If the first parameter is a MIMEBase subclass, insert it directly 

315 into the resulting message attachments. 

316 

317 For a text/* mimetype (guessed or specified), when a bytes object is 

318 specified as content, decode it as UTF-8. If that fails, set the 

319 mimetype to DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content. 

320 """ 

321 if isinstance(filename, MIMEBase): 

322 if content is not None or mimetype is not None: 

323 raise ValueError( 

324 "content and mimetype must not be given when a MIMEBase " 

325 "instance is provided." 

326 ) 

327 self.attachments.append(filename) 

328 elif content is None: 

329 raise ValueError("content must be provided.") 

330 else: 

331 mimetype = ( 

332 mimetype 

333 or mimetypes.guess_type(filename)[0] 

334 or DEFAULT_ATTACHMENT_MIME_TYPE 

335 ) 

336 basetype, subtype = mimetype.split("/", 1) 

337 

338 if basetype == "text": 

339 if isinstance(content, bytes): 

340 try: 

341 content = content.decode() 

342 except UnicodeDecodeError: 

343 # If mimetype suggests the file is text but it's 

344 # actually binary, read() raises a UnicodeDecodeError. 

345 mimetype = DEFAULT_ATTACHMENT_MIME_TYPE 

346 

347 self.attachments.append(EmailAttachment(filename, content, mimetype)) 

348 

349 def attach_file(self, path, mimetype=None): 

350 """ 

351 Attach a file from the filesystem. 

352 

353 Set the mimetype to DEFAULT_ATTACHMENT_MIME_TYPE if it isn't specified 

354 and cannot be guessed. 

355 

356 For a text/* mimetype (guessed or specified), decode the file's content 

357 as UTF-8. If that fails, set the mimetype to 

358 DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content. 

359 """ 

360 path = Path(path) 

361 with path.open("rb") as file: 

362 content = file.read() 

363 self.attach(path.name, content, mimetype) 

364 

365 def _create_message(self, msg): 

366 return self._create_attachments(msg) 

367 

368 def _create_attachments(self, msg): 

369 if self.attachments: 

370 encoding = self.encoding or settings.DEFAULT_CHARSET 

371 body_msg = msg 

372 msg = SafeMIMEMultipart(_subtype=self.mixed_subtype, encoding=encoding) 

373 if self.body or body_msg.is_multipart(): 

374 msg.attach(body_msg) 

375 for attachment in self.attachments: 

376 if isinstance(attachment, MIMEBase): 

377 msg.attach(attachment) 

378 else: 

379 msg.attach(self._create_attachment(*attachment)) 

380 return msg 

381 

382 def _create_mime_attachment(self, content, mimetype): 

383 """ 

384 Convert the content, mimetype pair into a MIME attachment object. 

385 

386 If the mimetype is message/rfc822, content may be an 

387 email.Message or EmailMessage object, as well as a str. 

388 """ 

389 basetype, subtype = mimetype.split("/", 1) 

390 if basetype == "text": 

391 encoding = self.encoding or settings.DEFAULT_CHARSET 

392 attachment = SafeMIMEText(content, subtype, encoding) 

393 elif basetype == "message" and subtype == "rfc822": 

394 # Bug #18967: Per RFC 2046 Section 5.2.1, message/rfc822 

395 # attachments must not be base64 encoded. 

396 if isinstance(content, EmailMessage): 

397 # convert content into an email.Message first 

398 content = content.message() 

399 elif not isinstance(content, Message): 

400 # For compatibility with existing code, parse the message 

401 # into an email.Message object if it is not one already. 

402 content = message_from_string(force_str(content)) 

403 

404 attachment = SafeMIMEMessage(content, subtype) 

405 else: 

406 # Encode non-text attachments with base64. 

407 attachment = MIMEBase(basetype, subtype) 

408 attachment.set_payload(content) 

409 Encoders.encode_base64(attachment) 

410 return attachment 

411 

412 def _create_attachment(self, filename, content, mimetype=None): 

413 """ 

414 Convert the filename, content, mimetype triple into a MIME attachment 

415 object. 

416 """ 

417 attachment = self._create_mime_attachment(content, mimetype) 

418 if filename: 

419 try: 

420 filename.encode("ascii") 

421 except UnicodeEncodeError: 

422 filename = ("utf-8", "", filename) 

423 attachment.add_header( 

424 "Content-Disposition", "attachment", filename=filename 

425 ) 

426 return attachment 

427 

428 def _set_list_header_if_not_empty(self, msg, header, values): 

429 """ 

430 Set msg's header, either from self.extra_headers, if present, or from 

431 the values argument if not empty. 

432 """ 

433 try: 

434 msg[header] = self.extra_headers[header] 

435 except KeyError: 

436 if values: 

437 msg[header] = ", ".join(str(v) for v in values) 

438 

439 

440class EmailMultiAlternatives(EmailMessage): 

441 """ 

442 A version of EmailMessage that makes it easy to send multipart/alternative 

443 messages. For example, including text and HTML versions of the text is 

444 made easier. 

445 """ 

446 

447 alternative_subtype = "alternative" 

448 

449 def __init__( 

450 self, 

451 subject="", 

452 body="", 

453 from_email=None, 

454 to=None, 

455 bcc=None, 

456 connection=None, 

457 attachments=None, 

458 headers=None, 

459 alternatives=None, 

460 cc=None, 

461 reply_to=None, 

462 ): 

463 """ 

464 Initialize a single email message (which can be sent to multiple 

465 recipients). 

466 """ 

467 super().__init__( 

468 subject, 

469 body, 

470 from_email, 

471 to, 

472 bcc, 

473 connection, 

474 attachments, 

475 headers, 

476 cc, 

477 reply_to, 

478 ) 

479 self.alternatives = [ 

480 EmailAlternative(*alternative) for alternative in (alternatives or []) 

481 ] 

482 

483 def attach_alternative(self, content, mimetype): 

484 """Attach an alternative content representation.""" 

485 if content is None or mimetype is None: 

486 raise ValueError("Both content and mimetype must be provided.") 

487 self.alternatives.append(EmailAlternative(content, mimetype)) 

488 

489 def _create_message(self, msg): 

490 return self._create_attachments(self._create_alternatives(msg)) 

491 

492 def _create_alternatives(self, msg): 

493 encoding = self.encoding or settings.DEFAULT_CHARSET 

494 if self.alternatives: 

495 body_msg = msg 

496 msg = SafeMIMEMultipart( 

497 _subtype=self.alternative_subtype, encoding=encoding 

498 ) 

499 if self.body: 

500 msg.attach(body_msg) 

501 for alternative in self.alternatives: 

502 msg.attach( 

503 self._create_mime_attachment( 

504 alternative.content, alternative.mimetype 

505 ) 

506 ) 

507 return msg 

508 

509 def body_contains(self, text): 

510 """ 

511 Checks that ``text`` occurs in the email body and in all attached MIME 

512 type text/* alternatives. 

513 """ 

514 if text not in self.body: 

515 return False 

516 

517 for content, mimetype in self.alternatives: 

518 if mimetype.startswith("text/") and text not in content: 

519 return False 

520 return True