Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/email.py: 18%

116 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import collections.abc 

21import logging 

22import os 

23import smtplib 

24import warnings 

25from email.mime.application import MIMEApplication 

26from email.mime.multipart import MIMEMultipart 

27from email.mime.text import MIMEText 

28from email.utils import formatdate 

29from typing import Any, Iterable 

30 

31from airflow.configuration import conf 

32from airflow.exceptions import AirflowConfigException, AirflowException, RemovedInAirflow3Warning 

33 

34log = logging.getLogger(__name__) 

35 

36 

37def send_email( 

38 to: list[str] | Iterable[str], 

39 subject: str, 

40 html_content: str, 

41 files: list[str] | None = None, 

42 dryrun: bool = False, 

43 cc: str | Iterable[str] | None = None, 

44 bcc: str | Iterable[str] | None = None, 

45 mime_subtype: str = "mixed", 

46 mime_charset: str = "utf-8", 

47 conn_id: str | None = None, 

48 custom_headers: dict[str, Any] | None = None, 

49 **kwargs, 

50): 

51 """Send email using backend specified in EMAIL_BACKEND.""" 

52 backend = conf.getimport("email", "EMAIL_BACKEND") 

53 backend_conn_id = conn_id or conf.get("email", "EMAIL_CONN_ID") 

54 from_email = conf.get("email", "from_email", fallback=None) 

55 

56 to_list = get_email_address_list(to) 

57 to_comma_separated = ", ".join(to_list) 

58 

59 return backend( 

60 to_comma_separated, 

61 subject, 

62 html_content, 

63 files=files, 

64 dryrun=dryrun, 

65 cc=cc, 

66 bcc=bcc, 

67 mime_subtype=mime_subtype, 

68 mime_charset=mime_charset, 

69 conn_id=backend_conn_id, 

70 from_email=from_email, 

71 custom_headers=custom_headers, 

72 **kwargs, 

73 ) 

74 

75 

76def send_email_smtp( 

77 to: str | Iterable[str], 

78 subject: str, 

79 html_content: str, 

80 files: list[str] | None = None, 

81 dryrun: bool = False, 

82 cc: str | Iterable[str] | None = None, 

83 bcc: str | Iterable[str] | None = None, 

84 mime_subtype: str = "mixed", 

85 mime_charset: str = "utf-8", 

86 conn_id: str = "smtp_default", 

87 from_email: str | None = None, 

88 custom_headers: dict[str, Any] | None = None, 

89 **kwargs, 

90): 

91 """ 

92 Send an email with html content 

93 

94 >>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True) 

95 """ 

96 smtp_mail_from = conf.get("smtp", "SMTP_MAIL_FROM") 

97 

98 if smtp_mail_from is not None: 

99 mail_from = smtp_mail_from 

100 else: 

101 if from_email is None: 

102 raise Exception( 

103 "You should set from email - either by smtp/smtp_mail_from config or `from_email` parameter" 

104 ) 

105 mail_from = from_email 

106 

107 msg, recipients = build_mime_message( 

108 mail_from=mail_from, 

109 to=to, 

110 subject=subject, 

111 html_content=html_content, 

112 files=files, 

113 cc=cc, 

114 bcc=bcc, 

115 mime_subtype=mime_subtype, 

116 mime_charset=mime_charset, 

117 custom_headers=custom_headers, 

118 ) 

119 

120 send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun) 

121 

122 

123def build_mime_message( 

124 mail_from: str | None, 

125 to: str | Iterable[str], 

126 subject: str, 

127 html_content: str, 

128 files: list[str] | None = None, 

129 cc: str | Iterable[str] | None = None, 

130 bcc: str | Iterable[str] | None = None, 

131 mime_subtype: str = "mixed", 

132 mime_charset: str = "utf-8", 

133 custom_headers: dict[str, Any] | None = None, 

134) -> tuple[MIMEMultipart, list[str]]: 

135 """ 

136 Build a MIME message that can be used to send an email and 

137 returns full list of recipients. 

138 

139 :param mail_from: Email address to set as email's from 

140 :param to: List of email addresses to set as email's to 

141 :param subject: Email's subject 

142 :param html_content: Content of email in HTML format 

143 :param files: List of paths of files to be attached 

144 :param cc: List of email addresses to set as email's CC 

145 :param bcc: List of email addresses to set as email's BCC 

146 :param mime_subtype: Can be used to specify the subtype of the message. Default = mixed 

147 :param mime_charset: Email's charset. Default = UTF-8. 

148 :param custom_headers: Additional headers to add to the MIME message. 

149 No validations are run on these values and they should be able to be encoded. 

150 :return: Email as MIMEMultipart and list of recipients' addresses. 

151 """ 

152 to = get_email_address_list(to) 

153 

154 msg = MIMEMultipart(mime_subtype) 

155 msg["Subject"] = subject 

156 msg["From"] = mail_from 

157 msg["To"] = ", ".join(to) 

158 recipients = to 

159 if cc: 

160 cc = get_email_address_list(cc) 

161 msg["CC"] = ", ".join(cc) 

162 recipients = recipients + cc 

163 

164 if bcc: 

165 # don't add bcc in header 

166 bcc = get_email_address_list(bcc) 

167 recipients = recipients + bcc 

168 

169 msg["Date"] = formatdate(localtime=True) 

170 mime_text = MIMEText(html_content, "html", mime_charset) 

171 msg.attach(mime_text) 

172 

173 for fname in files or []: 

174 basename = os.path.basename(fname) 

175 with open(fname, "rb") as file: 

176 part = MIMEApplication(file.read(), Name=basename) 

177 part["Content-Disposition"] = f'attachment; filename="{basename}"' 

178 part["Content-ID"] = f"<{basename}>" 

179 msg.attach(part) 

180 

181 if custom_headers: 

182 for header_key, header_value in custom_headers.items(): 

183 msg[header_key] = header_value 

184 

185 return msg, recipients 

186 

187 

188def send_mime_email( 

189 e_from: str, 

190 e_to: str | list[str], 

191 mime_msg: MIMEMultipart, 

192 conn_id: str = "smtp_default", 

193 dryrun: bool = False, 

194) -> None: 

195 """Send MIME email.""" 

196 smtp_host = conf.get_mandatory_value("smtp", "SMTP_HOST") 

197 smtp_port = conf.getint("smtp", "SMTP_PORT") 

198 smtp_starttls = conf.getboolean("smtp", "SMTP_STARTTLS") 

199 smtp_ssl = conf.getboolean("smtp", "SMTP_SSL") 

200 smtp_retry_limit = conf.getint("smtp", "SMTP_RETRY_LIMIT") 

201 smtp_timeout = conf.getint("smtp", "SMTP_TIMEOUT") 

202 smtp_user = None 

203 smtp_password = None 

204 

205 if conn_id is not None: 

206 try: 

207 from airflow.hooks.base import BaseHook 

208 

209 airflow_conn = BaseHook.get_connection(conn_id) 

210 smtp_user = airflow_conn.login 

211 smtp_password = airflow_conn.password 

212 except AirflowException: 

213 pass 

214 if smtp_user is None or smtp_password is None: 

215 warnings.warn( 

216 "Fetching SMTP credentials from configuration variables will be deprecated in a future " 

217 "release. Please set credentials using a connection instead.", 

218 RemovedInAirflow3Warning, 

219 stacklevel=2, 

220 ) 

221 try: 

222 smtp_user = conf.get("smtp", "SMTP_USER") 

223 smtp_password = conf.get("smtp", "SMTP_PASSWORD") 

224 except AirflowConfigException: 

225 log.debug("No user/password found for SMTP, so logging in with no authentication.") 

226 

227 if not dryrun: 

228 for attempt in range(1, smtp_retry_limit + 1): 

229 log.info("Email alerting: attempt %s", str(attempt)) 

230 try: 

231 smtp_conn = _get_smtp_connection(smtp_host, smtp_port, smtp_timeout, smtp_ssl) 

232 except smtplib.SMTPServerDisconnected: 

233 if attempt < smtp_retry_limit: 

234 continue 

235 raise 

236 

237 if smtp_starttls: 

238 smtp_conn.starttls() 

239 if smtp_user and smtp_password: 

240 smtp_conn.login(smtp_user, smtp_password) 

241 log.info("Sent an alert email to %s", e_to) 

242 smtp_conn.sendmail(e_from, e_to, mime_msg.as_string()) 

243 smtp_conn.quit() 

244 break 

245 

246 

247def get_email_address_list(addresses: str | Iterable[str]) -> list[str]: 

248 """Get list of email addresses.""" 

249 if isinstance(addresses, str): 

250 return _get_email_list_from_str(addresses) 

251 

252 elif isinstance(addresses, collections.abc.Iterable): 

253 if not all(isinstance(item, str) for item in addresses): 

254 raise TypeError("The items in your iterable must be strings.") 

255 return list(addresses) 

256 

257 received_type = type(addresses).__name__ 

258 raise TypeError(f"Unexpected argument type: Received '{received_type}'.") 

259 

260 

261def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) -> smtplib.SMTP: 

262 return ( 

263 smtplib.SMTP_SSL(host=host, port=port, timeout=timeout) 

264 if with_ssl 

265 else smtplib.SMTP(host=host, port=port, timeout=timeout) 

266 ) 

267 

268 

269def _get_email_list_from_str(addresses: str) -> list[str]: 

270 delimiters = [",", ";"] 

271 for delimiter in delimiters: 

272 if delimiter in addresses: 

273 return [address.strip() for address in addresses.split(delimiter)] 

274 return [addresses]