Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/security/test_kerberos.py: 0%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import logging
21import shlex
22from unittest import mock
24import pytest
26from airflow.security import kerberos
27from airflow.security.kerberos import get_kerberos_principle, renew_from_kt
28from tests.test_utils.config import conf_vars
30pytestmark = pytest.mark.db_test
33class TestKerberos:
34 @pytest.mark.parametrize(
35 "kerberos_config, expected_cmd",
36 [
37 (
38 {("kerberos", "reinit_frequency"): "42"},
39 [
40 "kinit",
41 "-f",
42 "-a",
43 "-r",
44 "42m",
45 "-k",
46 "-t",
47 "keytab",
48 "-c",
49 "/tmp/airflow_krb5_ccache",
50 "test-principal",
51 ],
52 ),
53 (
54 {("kerberos", "forwardable"): "True", ("kerberos", "include_ip"): "True"},
55 [
56 "kinit",
57 "-f",
58 "-a",
59 "-r",
60 "3600m",
61 "-k",
62 "-t",
63 "keytab",
64 "-c",
65 "/tmp/airflow_krb5_ccache",
66 "test-principal",
67 ],
68 ),
69 (
70 {("kerberos", "forwardable"): "False", ("kerberos", "include_ip"): "False"},
71 [
72 "kinit",
73 "-F",
74 "-A",
75 "-r",
76 "3600m",
77 "-k",
78 "-t",
79 "keytab",
80 "-c",
81 "/tmp/airflow_krb5_ccache",
82 "test-principal",
83 ],
84 ),
85 ],
86 )
87 @mock.patch("time.sleep", return_value=None)
88 @mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b"X-CACHECONF:"))
89 @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
90 @mock.patch("airflow.security.kerberos.subprocess")
91 def test_renew_from_kt(self, mock_subprocess, mock_sleep, kerberos_config, expected_cmd, caplog):
92 expected_cmd_text = " ".join(shlex.quote(f) for f in expected_cmd)
94 with conf_vars(kerberos_config), caplog.at_level(logging.INFO, logger=kerberos.log.name):
95 caplog.clear()
96 mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
97 mock_subprocess.call.return_value = 0
98 renew_from_kt(principal="test-principal", keytab="keytab")
100 assert caplog.messages == [
101 f"Re-initialising kerberos from keytab: {expected_cmd_text}",
102 "Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c /tmp/airflow_krb5_ccache -R",
103 ]
105 assert mock_subprocess.Popen.call_args.args[0] == expected_cmd
106 assert mock_subprocess.mock_calls == [
107 mock.call.Popen(
108 expected_cmd,
109 bufsize=-1,
110 close_fds=True,
111 stderr=mock_subprocess.PIPE,
112 stdout=mock_subprocess.PIPE,
113 universal_newlines=True,
114 ),
115 mock.call.Popen().__enter__(),
116 mock.call.Popen().__enter__().wait(),
117 mock.call.Popen().__exit__(None, None, None),
118 mock.call.call(["kinit", "-c", "/tmp/airflow_krb5_ccache", "-R"], close_fds=True),
119 ]
121 @mock.patch("airflow.security.kerberos.subprocess")
122 @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
123 @mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b""))
124 def test_renew_from_kt_without_workaround(self, mock_subprocess, caplog):
125 mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
126 mock_subprocess.call.return_value = 0
128 with caplog.at_level(logging.INFO, logger=kerberos.log.name):
129 caplog.clear()
130 renew_from_kt(principal="test-principal", keytab="keytab")
131 assert caplog.messages == [
132 "Re-initialising kerberos from keytab: "
133 "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal"
134 ]
136 assert mock_subprocess.mock_calls == [
137 mock.call.Popen(
138 [
139 "kinit",
140 "-f",
141 "-a",
142 "-r",
143 "3600m",
144 "-k",
145 "-t",
146 "keytab",
147 "-c",
148 "/tmp/airflow_krb5_ccache",
149 "test-principal",
150 ],
151 bufsize=-1,
152 close_fds=True,
153 stderr=mock_subprocess.PIPE,
154 stdout=mock_subprocess.PIPE,
155 universal_newlines=True,
156 ),
157 mock.call.Popen().__enter__(),
158 mock.call.Popen().__enter__().wait(),
159 mock.call.Popen().__exit__(None, None, None),
160 ]
162 @mock.patch("airflow.security.kerberos.subprocess")
163 @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
164 def test_renew_from_kt_failed(self, mock_subprocess, caplog):
165 mock_subp = mock_subprocess.Popen.return_value.__enter__.return_value
166 mock_subp.returncode = 1
167 mock_subp.stdout = mock.MagicMock(name="stdout", **{"readlines.return_value": ["STDOUT"]})
168 mock_subp.stderr = mock.MagicMock(name="stderr", **{"readlines.return_value": ["STDERR"]})
170 caplog.clear()
171 with pytest.raises(SystemExit) as ctx:
172 renew_from_kt(principal="test-principal", keytab="keytab")
173 assert ctx.value.code == 1
175 log_records = [record for record in caplog.record_tuples if record[0] == kerberos.log.name]
176 assert len(log_records) == 2, log_records
177 assert [lr[1] for lr in log_records] == [logging.INFO, logging.ERROR]
178 assert [lr[2] for lr in log_records] == [
179 "Re-initialising kerberos from keytab: "
180 "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal",
181 "Couldn't reinit from keytab! `kinit` exited with 1.\nSTDOUT\nSTDERR",
182 ]
184 assert mock_subprocess.mock_calls == [
185 mock.call.Popen(
186 [
187 "kinit",
188 "-f",
189 "-a",
190 "-r",
191 "3600m",
192 "-k",
193 "-t",
194 "keytab",
195 "-c",
196 "/tmp/airflow_krb5_ccache",
197 "test-principal",
198 ],
199 bufsize=-1,
200 close_fds=True,
201 stderr=mock_subprocess.PIPE,
202 stdout=mock_subprocess.PIPE,
203 universal_newlines=True,
204 ),
205 mock.call.Popen().__enter__(),
206 mock.call.Popen().__enter__().wait(),
207 mock.call.Popen().__exit__(mock.ANY, mock.ANY, mock.ANY),
208 ]
210 @mock.patch("airflow.security.kerberos.subprocess")
211 @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
212 @mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b"X-CACHECONF:"))
213 @mock.patch("airflow.security.kerberos.get_hostname", return_value="HOST")
214 @mock.patch("time.sleep", return_value=None)
215 def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn, mock_subprocess, caplog):
216 mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
217 mock_subprocess.call.return_value = 1
219 caplog.clear()
220 with pytest.raises(SystemExit) as ctx:
221 renew_from_kt(principal="test-principal", keytab="keytab")
222 assert ctx.value.code == 1
224 log_records = [record for record in caplog.record_tuples if record[0] == kerberos.log.name]
225 assert len(log_records) == 3, log_records
226 assert [lr[1] for lr in log_records] == [logging.INFO, logging.INFO, logging.ERROR]
227 assert [lr[2] for lr in log_records] == [
228 "Re-initialising kerberos from keytab: "
229 "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal",
230 "Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c /tmp/airflow_krb5_ccache -R",
231 "Couldn't renew kerberos ticket in order to work around "
232 "Kerberos 1.8.1 issue. Please check that the ticket for 'test-principal/HOST' is still "
233 "renewable:\n $ kinit -f -c /tmp/airflow_krb5_ccache\n"
234 "If the 'renew until' date is the same as the 'valid starting' date, the ticket cannot be "
235 "renewed. Please check your KDC configuration, and the ticket renewal policy (maxrenewlife) for "
236 "the 'test-principal/HOST' and `krbtgt' principals.",
237 ]
239 assert mock_subprocess.mock_calls == [
240 mock.call.Popen(
241 [
242 "kinit",
243 "-f",
244 "-a",
245 "-r",
246 "3600m",
247 "-k",
248 "-t",
249 "keytab",
250 "-c",
251 "/tmp/airflow_krb5_ccache",
252 "test-principal",
253 ],
254 bufsize=-1,
255 close_fds=True,
256 stderr=mock_subprocess.PIPE,
257 stdout=mock_subprocess.PIPE,
258 universal_newlines=True,
259 ),
260 mock.call.Popen().__enter__(),
261 mock.call.Popen().__enter__().wait(),
262 mock.call.Popen().__exit__(None, None, None),
263 mock.call.call(["kinit", "-c", "/tmp/airflow_krb5_ccache", "-R"], close_fds=True),
264 ]
266 def test_run_without_keytab(self, caplog):
267 with caplog.at_level(logging.WARNING, logger=kerberos.log.name):
268 caplog.clear()
269 with pytest.raises(SystemExit) as ctx:
270 kerberos.run(principal="test-principal", keytab=None)
271 assert ctx.value.code == 0
272 assert caplog.messages == ["Keytab renewer not starting, no keytab configured"]
274 @mock.patch("airflow.security.kerberos.renew_from_kt")
275 @mock.patch("time.sleep", return_value=None)
276 def test_run(self, mock_sleep, mock_renew_from_kt):
277 mock_renew_from_kt.side_effect = [1, 1, SystemExit(42)]
278 with pytest.raises(SystemExit) as ctx:
279 kerberos.run(principal="test-principal", keytab="/tmp/keytab")
280 assert ctx.value.code == 42
281 assert mock_renew_from_kt.mock_calls == [
282 mock.call("test-principal", "/tmp/keytab"),
283 mock.call("test-principal", "/tmp/keytab"),
284 mock.call("test-principal", "/tmp/keytab"),
285 ]
287 def test_get_kerberos_principle(self):
288 expected_principal = "test-principal"
289 principal = get_kerberos_principle(expected_principal)
290 assert principal == expected_principal
292 @mock.patch("airflow.security.kerberos.get_hostname", return_value="REPLACEMENT_HOST")
293 @mock.patch("airflow.security.kerberos.conf.get_mandatory_value", return_value="test-principal/_HOST")
294 def test_get_kerberos_principle_resolve_null_principal(self, get_madantory_value_mock, get_hostname_mock):
295 principal = get_kerberos_principle(principal=None)
296 assert principal == "test-principal/REPLACEMENT_HOST"