Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/email.py: 18%
116 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import collections.abc
21import logging
22import os
23import smtplib
24import warnings
25from email.mime.application import MIMEApplication
26from email.mime.multipart import MIMEMultipart
27from email.mime.text import MIMEText
28from email.utils import formatdate
29from typing import Any, Iterable
31from airflow.configuration import conf
32from airflow.exceptions import AirflowConfigException, AirflowException, RemovedInAirflow3Warning
34log = logging.getLogger(__name__)
37def send_email(
38 to: list[str] | Iterable[str],
39 subject: str,
40 html_content: str,
41 files: list[str] | None = None,
42 dryrun: bool = False,
43 cc: str | Iterable[str] | None = None,
44 bcc: str | Iterable[str] | None = None,
45 mime_subtype: str = "mixed",
46 mime_charset: str = "utf-8",
47 conn_id: str | None = None,
48 custom_headers: dict[str, Any] | None = None,
49 **kwargs,
50):
51 """Send email using backend specified in EMAIL_BACKEND."""
52 backend = conf.getimport("email", "EMAIL_BACKEND")
53 backend_conn_id = conn_id or conf.get("email", "EMAIL_CONN_ID")
54 from_email = conf.get("email", "from_email", fallback=None)
56 to_list = get_email_address_list(to)
57 to_comma_separated = ", ".join(to_list)
59 return backend(
60 to_comma_separated,
61 subject,
62 html_content,
63 files=files,
64 dryrun=dryrun,
65 cc=cc,
66 bcc=bcc,
67 mime_subtype=mime_subtype,
68 mime_charset=mime_charset,
69 conn_id=backend_conn_id,
70 from_email=from_email,
71 custom_headers=custom_headers,
72 **kwargs,
73 )
76def send_email_smtp(
77 to: str | Iterable[str],
78 subject: str,
79 html_content: str,
80 files: list[str] | None = None,
81 dryrun: bool = False,
82 cc: str | Iterable[str] | None = None,
83 bcc: str | Iterable[str] | None = None,
84 mime_subtype: str = "mixed",
85 mime_charset: str = "utf-8",
86 conn_id: str = "smtp_default",
87 from_email: str | None = None,
88 custom_headers: dict[str, Any] | None = None,
89 **kwargs,
90):
91 """
92 Send an email with html content
94 >>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True)
95 """
96 smtp_mail_from = conf.get("smtp", "SMTP_MAIL_FROM")
98 if smtp_mail_from is not None:
99 mail_from = smtp_mail_from
100 else:
101 if from_email is None:
102 raise Exception(
103 "You should set from email - either by smtp/smtp_mail_from config or `from_email` parameter"
104 )
105 mail_from = from_email
107 msg, recipients = build_mime_message(
108 mail_from=mail_from,
109 to=to,
110 subject=subject,
111 html_content=html_content,
112 files=files,
113 cc=cc,
114 bcc=bcc,
115 mime_subtype=mime_subtype,
116 mime_charset=mime_charset,
117 custom_headers=custom_headers,
118 )
120 send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun)
123def build_mime_message(
124 mail_from: str | None,
125 to: str | Iterable[str],
126 subject: str,
127 html_content: str,
128 files: list[str] | None = None,
129 cc: str | Iterable[str] | None = None,
130 bcc: str | Iterable[str] | None = None,
131 mime_subtype: str = "mixed",
132 mime_charset: str = "utf-8",
133 custom_headers: dict[str, Any] | None = None,
134) -> tuple[MIMEMultipart, list[str]]:
135 """
136 Build a MIME message that can be used to send an email and
137 returns full list of recipients.
139 :param mail_from: Email address to set as email's from
140 :param to: List of email addresses to set as email's to
141 :param subject: Email's subject
142 :param html_content: Content of email in HTML format
143 :param files: List of paths of files to be attached
144 :param cc: List of email addresses to set as email's CC
145 :param bcc: List of email addresses to set as email's BCC
146 :param mime_subtype: Can be used to specify the subtype of the message. Default = mixed
147 :param mime_charset: Email's charset. Default = UTF-8.
148 :param custom_headers: Additional headers to add to the MIME message.
149 No validations are run on these values and they should be able to be encoded.
150 :return: Email as MIMEMultipart and list of recipients' addresses.
151 """
152 to = get_email_address_list(to)
154 msg = MIMEMultipart(mime_subtype)
155 msg["Subject"] = subject
156 msg["From"] = mail_from
157 msg["To"] = ", ".join(to)
158 recipients = to
159 if cc:
160 cc = get_email_address_list(cc)
161 msg["CC"] = ", ".join(cc)
162 recipients = recipients + cc
164 if bcc:
165 # don't add bcc in header
166 bcc = get_email_address_list(bcc)
167 recipients = recipients + bcc
169 msg["Date"] = formatdate(localtime=True)
170 mime_text = MIMEText(html_content, "html", mime_charset)
171 msg.attach(mime_text)
173 for fname in files or []:
174 basename = os.path.basename(fname)
175 with open(fname, "rb") as file:
176 part = MIMEApplication(file.read(), Name=basename)
177 part["Content-Disposition"] = f'attachment; filename="{basename}"'
178 part["Content-ID"] = f"<{basename}>"
179 msg.attach(part)
181 if custom_headers:
182 for header_key, header_value in custom_headers.items():
183 msg[header_key] = header_value
185 return msg, recipients
188def send_mime_email(
189 e_from: str,
190 e_to: str | list[str],
191 mime_msg: MIMEMultipart,
192 conn_id: str = "smtp_default",
193 dryrun: bool = False,
194) -> None:
195 """Send MIME email."""
196 smtp_host = conf.get_mandatory_value("smtp", "SMTP_HOST")
197 smtp_port = conf.getint("smtp", "SMTP_PORT")
198 smtp_starttls = conf.getboolean("smtp", "SMTP_STARTTLS")
199 smtp_ssl = conf.getboolean("smtp", "SMTP_SSL")
200 smtp_retry_limit = conf.getint("smtp", "SMTP_RETRY_LIMIT")
201 smtp_timeout = conf.getint("smtp", "SMTP_TIMEOUT")
202 smtp_user = None
203 smtp_password = None
205 if conn_id is not None:
206 try:
207 from airflow.hooks.base import BaseHook
209 airflow_conn = BaseHook.get_connection(conn_id)
210 smtp_user = airflow_conn.login
211 smtp_password = airflow_conn.password
212 except AirflowException:
213 pass
214 if smtp_user is None or smtp_password is None:
215 warnings.warn(
216 "Fetching SMTP credentials from configuration variables will be deprecated in a future "
217 "release. Please set credentials using a connection instead.",
218 RemovedInAirflow3Warning,
219 stacklevel=2,
220 )
221 try:
222 smtp_user = conf.get("smtp", "SMTP_USER")
223 smtp_password = conf.get("smtp", "SMTP_PASSWORD")
224 except AirflowConfigException:
225 log.debug("No user/password found for SMTP, so logging in with no authentication.")
227 if not dryrun:
228 for attempt in range(1, smtp_retry_limit + 1):
229 log.info("Email alerting: attempt %s", str(attempt))
230 try:
231 smtp_conn = _get_smtp_connection(smtp_host, smtp_port, smtp_timeout, smtp_ssl)
232 except smtplib.SMTPServerDisconnected:
233 if attempt < smtp_retry_limit:
234 continue
235 raise
237 if smtp_starttls:
238 smtp_conn.starttls()
239 if smtp_user and smtp_password:
240 smtp_conn.login(smtp_user, smtp_password)
241 log.info("Sent an alert email to %s", e_to)
242 smtp_conn.sendmail(e_from, e_to, mime_msg.as_string())
243 smtp_conn.quit()
244 break
247def get_email_address_list(addresses: str | Iterable[str]) -> list[str]:
248 """Get list of email addresses."""
249 if isinstance(addresses, str):
250 return _get_email_list_from_str(addresses)
252 elif isinstance(addresses, collections.abc.Iterable):
253 if not all(isinstance(item, str) for item in addresses):
254 raise TypeError("The items in your iterable must be strings.")
255 return list(addresses)
257 received_type = type(addresses).__name__
258 raise TypeError(f"Unexpected argument type: Received '{received_type}'.")
261def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) -> smtplib.SMTP:
262 return (
263 smtplib.SMTP_SSL(host=host, port=port, timeout=timeout)
264 if with_ssl
265 else smtplib.SMTP(host=host, port=port, timeout=timeout)
266 )
269def _get_email_list_from_str(addresses: str) -> list[str]:
270 delimiters = [",", ";"]
271 for delimiter in delimiters:
272 if delimiter in addresses:
273 return [address.strip() for address in addresses.split(delimiter)]
274 return [addresses]