Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/email.py: 19%

113 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import collections.abc 

21import logging 

22import os 

23import re 

24import smtplib 

25import warnings 

26from email.mime.application import MIMEApplication 

27from email.mime.multipart import MIMEMultipart 

28from email.mime.text import MIMEText 

29from email.utils import formatdate 

30from typing import Any, Iterable 

31 

32from airflow.configuration import conf 

33from airflow.exceptions import AirflowConfigException, AirflowException, RemovedInAirflow3Warning 

34 

35log = logging.getLogger(__name__) 

36 

37 

38def send_email( 

39 to: list[str] | Iterable[str], 

40 subject: str, 

41 html_content: str, 

42 files: list[str] | None = None, 

43 dryrun: bool = False, 

44 cc: str | Iterable[str] | None = None, 

45 bcc: str | Iterable[str] | None = None, 

46 mime_subtype: str = "mixed", 

47 mime_charset: str = "utf-8", 

48 conn_id: str | None = None, 

49 custom_headers: dict[str, Any] | None = None, 

50 **kwargs, 

51) -> None: 

52 """ 

53 Send an email using the backend specified in the *EMAIL_BACKEND* configuration option. 

54 

55 :param to: A list or iterable of email addresses to send the email to. 

56 :param subject: The subject of the email. 

57 :param html_content: The content of the email in HTML format. 

58 :param files: A list of paths to files to attach to the email. 

59 :param dryrun: If *True*, the email will not actually be sent. Default: *False*. 

60 :param cc: A string or iterable of strings containing email addresses to send a copy of the email to. 

61 :param bcc: A string or iterable of strings containing email addresses to send a 

62 blind carbon copy of the email to. 

63 :param mime_subtype: The subtype of the MIME message. Default: "mixed". 

64 :param mime_charset: The charset of the email. Default: "utf-8". 

65 :param conn_id: The connection ID to use for the backend. If not provided, the default connection 

66 specified in the *EMAIL_CONN_ID* configuration option will be used. 

67 :param custom_headers: A dictionary of additional headers to add to the MIME message. 

68 No validations are run on these values, and they should be able to be encoded. 

69 :param kwargs: Additional keyword arguments to pass to the backend. 

70 """ 

71 backend = conf.getimport("email", "EMAIL_BACKEND") 

72 backend_conn_id = conn_id or conf.get("email", "EMAIL_CONN_ID") 

73 from_email = conf.get("email", "from_email", fallback=None) 

74 

75 to_list = get_email_address_list(to) 

76 to_comma_separated = ", ".join(to_list) 

77 

78 return backend( 

79 to_comma_separated, 

80 subject, 

81 html_content, 

82 files=files, 

83 dryrun=dryrun, 

84 cc=cc, 

85 bcc=bcc, 

86 mime_subtype=mime_subtype, 

87 mime_charset=mime_charset, 

88 conn_id=backend_conn_id, 

89 from_email=from_email, 

90 custom_headers=custom_headers, 

91 **kwargs, 

92 ) 

93 

94 

95def send_email_smtp( 

96 to: str | Iterable[str], 

97 subject: str, 

98 html_content: str, 

99 files: list[str] | None = None, 

100 dryrun: bool = False, 

101 cc: str | Iterable[str] | None = None, 

102 bcc: str | Iterable[str] | None = None, 

103 mime_subtype: str = "mixed", 

104 mime_charset: str = "utf-8", 

105 conn_id: str = "smtp_default", 

106 from_email: str | None = None, 

107 custom_headers: dict[str, Any] | None = None, 

108 **kwargs, 

109) -> None: 

110 """Send an email with html content. 

111 

112 :param to: Recipient email address or list of addresses. 

113 :param subject: Email subject. 

114 :param html_content: Email body in HTML format. 

115 :param files: List of file paths to attach to the email. 

116 :param dryrun: If True, the email will not be sent, but all other actions will be performed. 

117 :param cc: Carbon copy recipient email address or list of addresses. 

118 :param bcc: Blind carbon copy recipient email address or list of addresses. 

119 :param mime_subtype: MIME subtype of the email. 

120 :param mime_charset: MIME charset of the email. 

121 :param conn_id: Connection ID of the SMTP server. 

122 :param from_email: Sender email address. 

123 :param custom_headers: Dictionary of custom headers to include in the email. 

124 :param kwargs: Additional keyword arguments. 

125 

126 >>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True) 

127 """ 

128 smtp_mail_from = conf.get("smtp", "SMTP_MAIL_FROM") 

129 

130 if smtp_mail_from is not None: 

131 mail_from = smtp_mail_from 

132 else: 

133 if from_email is None: 

134 raise Exception( 

135 "You should set from email - either by smtp/smtp_mail_from config or `from_email` parameter" 

136 ) 

137 mail_from = from_email 

138 

139 msg, recipients = build_mime_message( 

140 mail_from=mail_from, 

141 to=to, 

142 subject=subject, 

143 html_content=html_content, 

144 files=files, 

145 cc=cc, 

146 bcc=bcc, 

147 mime_subtype=mime_subtype, 

148 mime_charset=mime_charset, 

149 custom_headers=custom_headers, 

150 ) 

151 

152 send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun) 

153 

154 

155def build_mime_message( 

156 mail_from: str | None, 

157 to: str | Iterable[str], 

158 subject: str, 

159 html_content: str, 

160 files: list[str] | None = None, 

161 cc: str | Iterable[str] | None = None, 

162 bcc: str | Iterable[str] | None = None, 

163 mime_subtype: str = "mixed", 

164 mime_charset: str = "utf-8", 

165 custom_headers: dict[str, Any] | None = None, 

166) -> tuple[MIMEMultipart, list[str]]: 

167 """ 

168 Build a MIME message that can be used to send an email and returns a full list of recipients. 

169 

170 :param mail_from: Email address to set as the email's "From" field. 

171 :param to: A string or iterable of strings containing email addresses to set as the email's "To" field. 

172 :param subject: The subject of the email. 

173 :param html_content: The content of the email in HTML format. 

174 :param files: A list of paths to files to be attached to the email. 

175 :param cc: A string or iterable of strings containing email addresses to set as the email's "CC" field. 

176 :param bcc: A string or iterable of strings containing email addresses to set as the email's "BCC" field. 

177 :param mime_subtype: The subtype of the MIME message. Default: "mixed". 

178 :param mime_charset: The charset of the email. Default: "utf-8". 

179 :param custom_headers: Additional headers to add to the MIME message. No validations are run on these 

180 values, and they should be able to be encoded. 

181 :return: A tuple containing the email as a MIMEMultipart object and a list of recipient email addresses. 

182 """ 

183 to = get_email_address_list(to) 

184 

185 msg = MIMEMultipart(mime_subtype) 

186 msg["Subject"] = subject 

187 msg["From"] = mail_from 

188 msg["To"] = ", ".join(to) 

189 recipients = to 

190 if cc: 

191 cc = get_email_address_list(cc) 

192 msg["CC"] = ", ".join(cc) 

193 recipients += cc 

194 

195 if bcc: 

196 # don't add bcc in header 

197 bcc = get_email_address_list(bcc) 

198 recipients += bcc 

199 

200 msg["Date"] = formatdate(localtime=True) 

201 mime_text = MIMEText(html_content, "html", mime_charset) 

202 msg.attach(mime_text) 

203 

204 for fname in files or []: 

205 basename = os.path.basename(fname) 

206 with open(fname, "rb") as file: 

207 part = MIMEApplication(file.read(), Name=basename) 

208 part["Content-Disposition"] = f'attachment; filename="{basename}"' 

209 part["Content-ID"] = f"<{basename}>" 

210 msg.attach(part) 

211 

212 if custom_headers: 

213 for header_key, header_value in custom_headers.items(): 

214 msg[header_key] = header_value 

215 

216 return msg, recipients 

217 

218 

219def send_mime_email( 

220 e_from: str, 

221 e_to: str | list[str], 

222 mime_msg: MIMEMultipart, 

223 conn_id: str = "smtp_default", 

224 dryrun: bool = False, 

225) -> None: 

226 """ 

227 Send a MIME email. 

228 

229 :param e_from: The email address of the sender. 

230 :param e_to: The email address or a list of email addresses of the recipient(s). 

231 :param mime_msg: The MIME message to send. 

232 :param conn_id: The ID of the SMTP connection to use. 

233 :param dryrun: If True, the email will not be sent, but a log message will be generated. 

234 """ 

235 smtp_host = conf.get_mandatory_value("smtp", "SMTP_HOST") 

236 smtp_port = conf.getint("smtp", "SMTP_PORT") 

237 smtp_starttls = conf.getboolean("smtp", "SMTP_STARTTLS") 

238 smtp_ssl = conf.getboolean("smtp", "SMTP_SSL") 

239 smtp_retry_limit = conf.getint("smtp", "SMTP_RETRY_LIMIT") 

240 smtp_timeout = conf.getint("smtp", "SMTP_TIMEOUT") 

241 smtp_user = None 

242 smtp_password = None 

243 

244 if conn_id is not None: 

245 try: 

246 from airflow.hooks.base import BaseHook 

247 

248 airflow_conn = BaseHook.get_connection(conn_id) 

249 smtp_user = airflow_conn.login 

250 smtp_password = airflow_conn.password 

251 except AirflowException: 

252 pass 

253 if smtp_user is None or smtp_password is None: 

254 warnings.warn( 

255 "Fetching SMTP credentials from configuration variables will be deprecated in a future " 

256 "release. Please set credentials using a connection instead.", 

257 RemovedInAirflow3Warning, 

258 stacklevel=2, 

259 ) 

260 try: 

261 smtp_user = conf.get("smtp", "SMTP_USER") 

262 smtp_password = conf.get("smtp", "SMTP_PASSWORD") 

263 except AirflowConfigException: 

264 log.debug("No user/password found for SMTP, so logging in with no authentication.") 

265 

266 if not dryrun: 

267 for attempt in range(1, smtp_retry_limit + 1): 

268 log.info("Email alerting: attempt %s", str(attempt)) 

269 try: 

270 smtp_conn = _get_smtp_connection(smtp_host, smtp_port, smtp_timeout, smtp_ssl) 

271 except smtplib.SMTPServerDisconnected: 

272 if attempt < smtp_retry_limit: 

273 continue 

274 raise 

275 

276 if smtp_starttls: 

277 smtp_conn.starttls() 

278 if smtp_user and smtp_password: 

279 smtp_conn.login(smtp_user, smtp_password) 

280 log.info("Sent an alert email to %s", e_to) 

281 smtp_conn.sendmail(e_from, e_to, mime_msg.as_string()) 

282 smtp_conn.quit() 

283 break 

284 

285 

286def get_email_address_list(addresses: str | Iterable[str]) -> list[str]: 

287 """ 

288 Returns a list of email addresses from the provided input. 

289 

290 :param addresses: A string or iterable of strings containing email addresses. 

291 :return: A list of email addresses. 

292 :raises TypeError: If the input is not a string or iterable of strings. 

293 """ 

294 if isinstance(addresses, str): 

295 return _get_email_list_from_str(addresses) 

296 elif isinstance(addresses, collections.abc.Iterable): 

297 if not all(isinstance(item, str) for item in addresses): 

298 raise TypeError("The items in your iterable must be strings.") 

299 return list(addresses) 

300 else: 

301 raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.") 

302 

303 

304def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) -> smtplib.SMTP: 

305 """ 

306 Returns an SMTP connection to the specified host and port, with optional SSL encryption. 

307 

308 :param host: The hostname or IP address of the SMTP server. 

309 :param port: The port number to connect to on the SMTP server. 

310 :param timeout: The timeout in seconds for the connection. 

311 :param with_ssl: Whether to use SSL encryption for the connection. 

312 :return: An SMTP connection to the specified host and port. 

313 """ 

314 return ( 

315 smtplib.SMTP_SSL(host=host, port=port, timeout=timeout) 

316 if with_ssl 

317 else smtplib.SMTP(host=host, port=port, timeout=timeout) 

318 ) 

319 

320 

321def _get_email_list_from_str(addresses: str) -> list[str]: 

322 """ 

323 Extract a list of email addresses from a string. The string 

324 can contain multiple email addresses separated by 

325 any of the following delimiters: ',' or ';'. 

326 

327 :param addresses: A string containing one or more email addresses. 

328 :return: A list of email addresses. 

329 """ 

330 pattern = r"\s*[,;]\s*" 

331 return [address for address in re.split(pattern, addresses)]