Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/security/test_kerberos.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import logging 

21import shlex 

22from unittest import mock 

23 

24import pytest 

25 

26from airflow.security import kerberos 

27from airflow.security.kerberos import get_kerberos_principle, renew_from_kt 

28from tests.test_utils.config import conf_vars 

29 

30pytestmark = pytest.mark.db_test 

31 

32 

33class TestKerberos: 

34 @pytest.mark.parametrize( 

35 "kerberos_config, expected_cmd", 

36 [ 

37 ( 

38 {("kerberos", "reinit_frequency"): "42"}, 

39 [ 

40 "kinit", 

41 "-f", 

42 "-a", 

43 "-r", 

44 "42m", 

45 "-k", 

46 "-t", 

47 "keytab", 

48 "-c", 

49 "/tmp/airflow_krb5_ccache", 

50 "test-principal", 

51 ], 

52 ), 

53 ( 

54 {("kerberos", "forwardable"): "True", ("kerberos", "include_ip"): "True"}, 

55 [ 

56 "kinit", 

57 "-f", 

58 "-a", 

59 "-r", 

60 "3600m", 

61 "-k", 

62 "-t", 

63 "keytab", 

64 "-c", 

65 "/tmp/airflow_krb5_ccache", 

66 "test-principal", 

67 ], 

68 ), 

69 ( 

70 {("kerberos", "forwardable"): "False", ("kerberos", "include_ip"): "False"}, 

71 [ 

72 "kinit", 

73 "-F", 

74 "-A", 

75 "-r", 

76 "3600m", 

77 "-k", 

78 "-t", 

79 "keytab", 

80 "-c", 

81 "/tmp/airflow_krb5_ccache", 

82 "test-principal", 

83 ], 

84 ), 

85 ], 

86 ) 

87 @mock.patch("time.sleep", return_value=None) 

88 @mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b"X-CACHECONF:")) 

89 @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None) 

90 @mock.patch("airflow.security.kerberos.subprocess") 

91 def test_renew_from_kt(self, mock_subprocess, mock_sleep, kerberos_config, expected_cmd, caplog): 

92 expected_cmd_text = " ".join(shlex.quote(f) for f in expected_cmd) 

93 

94 with conf_vars(kerberos_config), caplog.at_level(logging.INFO, logger=kerberos.log.name): 

95 caplog.clear() 

96 mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0 

97 mock_subprocess.call.return_value = 0 

98 renew_from_kt(principal="test-principal", keytab="keytab") 

99 

100 assert caplog.messages == [ 

101 f"Re-initialising kerberos from keytab: {expected_cmd_text}", 

102 "Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c /tmp/airflow_krb5_ccache -R", 

103 ] 

104 

105 assert mock_subprocess.Popen.call_args.args[0] == expected_cmd 

106 assert mock_subprocess.mock_calls == [ 

107 mock.call.Popen( 

108 expected_cmd, 

109 bufsize=-1, 

110 close_fds=True, 

111 stderr=mock_subprocess.PIPE, 

112 stdout=mock_subprocess.PIPE, 

113 universal_newlines=True, 

114 ), 

115 mock.call.Popen().__enter__(), 

116 mock.call.Popen().__enter__().wait(), 

117 mock.call.Popen().__exit__(None, None, None), 

118 mock.call.call(["kinit", "-c", "/tmp/airflow_krb5_ccache", "-R"], close_fds=True), 

119 ] 

120 

121 @mock.patch("airflow.security.kerberos.subprocess") 

122 @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None) 

123 @mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b"")) 

124 def test_renew_from_kt_without_workaround(self, mock_subprocess, caplog): 

125 mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0 

126 mock_subprocess.call.return_value = 0 

127 

128 with caplog.at_level(logging.INFO, logger=kerberos.log.name): 

129 caplog.clear() 

130 renew_from_kt(principal="test-principal", keytab="keytab") 

131 assert caplog.messages == [ 

132 "Re-initialising kerberos from keytab: " 

133 "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal" 

134 ] 

135 

136 assert mock_subprocess.mock_calls == [ 

137 mock.call.Popen( 

138 [ 

139 "kinit", 

140 "-f", 

141 "-a", 

142 "-r", 

143 "3600m", 

144 "-k", 

145 "-t", 

146 "keytab", 

147 "-c", 

148 "/tmp/airflow_krb5_ccache", 

149 "test-principal", 

150 ], 

151 bufsize=-1, 

152 close_fds=True, 

153 stderr=mock_subprocess.PIPE, 

154 stdout=mock_subprocess.PIPE, 

155 universal_newlines=True, 

156 ), 

157 mock.call.Popen().__enter__(), 

158 mock.call.Popen().__enter__().wait(), 

159 mock.call.Popen().__exit__(None, None, None), 

160 ] 

161 

162 @mock.patch("airflow.security.kerberos.subprocess") 

163 @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None) 

164 def test_renew_from_kt_failed(self, mock_subprocess, caplog): 

165 mock_subp = mock_subprocess.Popen.return_value.__enter__.return_value 

166 mock_subp.returncode = 1 

167 mock_subp.stdout = mock.MagicMock(name="stdout", **{"readlines.return_value": ["STDOUT"]}) 

168 mock_subp.stderr = mock.MagicMock(name="stderr", **{"readlines.return_value": ["STDERR"]}) 

169 

170 caplog.clear() 

171 with pytest.raises(SystemExit) as ctx: 

172 renew_from_kt(principal="test-principal", keytab="keytab") 

173 assert ctx.value.code == 1 

174 

175 log_records = [record for record in caplog.record_tuples if record[0] == kerberos.log.name] 

176 assert len(log_records) == 2, log_records 

177 assert [lr[1] for lr in log_records] == [logging.INFO, logging.ERROR] 

178 assert [lr[2] for lr in log_records] == [ 

179 "Re-initialising kerberos from keytab: " 

180 "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal", 

181 "Couldn't reinit from keytab! `kinit` exited with 1.\nSTDOUT\nSTDERR", 

182 ] 

183 

184 assert mock_subprocess.mock_calls == [ 

185 mock.call.Popen( 

186 [ 

187 "kinit", 

188 "-f", 

189 "-a", 

190 "-r", 

191 "3600m", 

192 "-k", 

193 "-t", 

194 "keytab", 

195 "-c", 

196 "/tmp/airflow_krb5_ccache", 

197 "test-principal", 

198 ], 

199 bufsize=-1, 

200 close_fds=True, 

201 stderr=mock_subprocess.PIPE, 

202 stdout=mock_subprocess.PIPE, 

203 universal_newlines=True, 

204 ), 

205 mock.call.Popen().__enter__(), 

206 mock.call.Popen().__enter__().wait(), 

207 mock.call.Popen().__exit__(mock.ANY, mock.ANY, mock.ANY), 

208 ] 

209 

210 @mock.patch("airflow.security.kerberos.subprocess") 

211 @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None) 

212 @mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b"X-CACHECONF:")) 

213 @mock.patch("airflow.security.kerberos.get_hostname", return_value="HOST") 

214 @mock.patch("time.sleep", return_value=None) 

215 def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn, mock_subprocess, caplog): 

216 mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0 

217 mock_subprocess.call.return_value = 1 

218 

219 caplog.clear() 

220 with pytest.raises(SystemExit) as ctx: 

221 renew_from_kt(principal="test-principal", keytab="keytab") 

222 assert ctx.value.code == 1 

223 

224 log_records = [record for record in caplog.record_tuples if record[0] == kerberos.log.name] 

225 assert len(log_records) == 3, log_records 

226 assert [lr[1] for lr in log_records] == [logging.INFO, logging.INFO, logging.ERROR] 

227 assert [lr[2] for lr in log_records] == [ 

228 "Re-initialising kerberos from keytab: " 

229 "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal", 

230 "Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c /tmp/airflow_krb5_ccache -R", 

231 "Couldn't renew kerberos ticket in order to work around " 

232 "Kerberos 1.8.1 issue. Please check that the ticket for 'test-principal/HOST' is still " 

233 "renewable:\n $ kinit -f -c /tmp/airflow_krb5_ccache\n" 

234 "If the 'renew until' date is the same as the 'valid starting' date, the ticket cannot be " 

235 "renewed. Please check your KDC configuration, and the ticket renewal policy (maxrenewlife) for " 

236 "the 'test-principal/HOST' and `krbtgt' principals.", 

237 ] 

238 

239 assert mock_subprocess.mock_calls == [ 

240 mock.call.Popen( 

241 [ 

242 "kinit", 

243 "-f", 

244 "-a", 

245 "-r", 

246 "3600m", 

247 "-k", 

248 "-t", 

249 "keytab", 

250 "-c", 

251 "/tmp/airflow_krb5_ccache", 

252 "test-principal", 

253 ], 

254 bufsize=-1, 

255 close_fds=True, 

256 stderr=mock_subprocess.PIPE, 

257 stdout=mock_subprocess.PIPE, 

258 universal_newlines=True, 

259 ), 

260 mock.call.Popen().__enter__(), 

261 mock.call.Popen().__enter__().wait(), 

262 mock.call.Popen().__exit__(None, None, None), 

263 mock.call.call(["kinit", "-c", "/tmp/airflow_krb5_ccache", "-R"], close_fds=True), 

264 ] 

265 

266 def test_run_without_keytab(self, caplog): 

267 with caplog.at_level(logging.WARNING, logger=kerberos.log.name): 

268 caplog.clear() 

269 with pytest.raises(SystemExit) as ctx: 

270 kerberos.run(principal="test-principal", keytab=None) 

271 assert ctx.value.code == 0 

272 assert caplog.messages == ["Keytab renewer not starting, no keytab configured"] 

273 

274 @mock.patch("airflow.security.kerberos.renew_from_kt") 

275 @mock.patch("time.sleep", return_value=None) 

276 def test_run(self, mock_sleep, mock_renew_from_kt): 

277 mock_renew_from_kt.side_effect = [1, 1, SystemExit(42)] 

278 with pytest.raises(SystemExit) as ctx: 

279 kerberos.run(principal="test-principal", keytab="/tmp/keytab") 

280 assert ctx.value.code == 42 

281 assert mock_renew_from_kt.mock_calls == [ 

282 mock.call("test-principal", "/tmp/keytab"), 

283 mock.call("test-principal", "/tmp/keytab"), 

284 mock.call("test-principal", "/tmp/keytab"), 

285 ] 

286 

287 def test_get_kerberos_principle(self): 

288 expected_principal = "test-principal" 

289 principal = get_kerberos_principle(expected_principal) 

290 assert principal == expected_principal 

291 

292 @mock.patch("airflow.security.kerberos.get_hostname", return_value="REPLACEMENT_HOST") 

293 @mock.patch("airflow.security.kerberos.conf.get_mandatory_value", return_value="test-principal/_HOST") 

294 def test_get_kerberos_principle_resolve_null_principal(self, get_madantory_value_mock, get_hostname_mock): 

295 principal = get_kerberos_principle(principal=None) 

296 assert principal == "test-principal/REPLACEMENT_HOST"