Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/security/kerberos.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

66 statements  

1#!/usr/bin/env python 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from enum import Enum 

21 

22# Licensed to Cloudera, Inc. under one 

23# or more contributor license agreements. See the NOTICE file 

24# distributed with this work for additional information 

25# regarding copyright ownership. Cloudera, Inc. licenses this file 

26# to you under the Apache License, Version 2.0 (the 

27# "License"); you may not use this file except in compliance 

28# with the License. You may obtain a copy of the License at 

29# 

30# http://www.apache.org/licenses/LICENSE-2.0 

31# 

32# Unless required by applicable law or agreed to in writing, software 

33# distributed under the License is distributed on an "AS IS" BASIS, 

34# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

35# See the License for the specific language governing permissions and 

36# limitations under the License. 

37"""Kerberos security provider.""" 

38import logging 

39import shlex 

40import subprocess 

41import sys 

42import time 

43 

44from airflow.configuration import conf 

45from airflow.utils.net import get_hostname 

46 

47NEED_KRB181_WORKAROUND: bool | None = None 

48 

49log = logging.getLogger(__name__) 

50 

51 

52class KerberosMode(Enum): 

53 """ 

54 Defines modes for running airflow kerberos. 

55 

56 :return: None. 

57 """ 

58 

59 STANDARD = "standard" 

60 ONE_TIME = "one-time" 

61 

62 

63def get_kerberos_principle(principal: str | None) -> str: 

64 """Retrieve Kerberos principal. Fallback to principal from Airflow configuration if not provided.""" 

65 return principal or conf.get_mandatory_value("kerberos", "principal").replace("_HOST", get_hostname()) 

66 

67 

68def renew_from_kt(principal: str | None, keytab: str, exit_on_fail: bool = True): 

69 """ 

70 Renew kerberos token from keytab. 

71 

72 :param principal: principal 

73 :param keytab: keytab file 

74 :return: None 

75 """ 

76 # The config is specified in seconds. But we ask for that same amount in 

77 # minutes to give ourselves a large renewal buffer. 

78 renewal_lifetime = f"{conf.getint('kerberos', 'reinit_frequency')}m" 

79 

80 cmd_principal = get_kerberos_principle(principal) 

81 if conf.getboolean("kerberos", "forwardable"): 

82 forwardable = "-f" 

83 else: 

84 forwardable = "-F" 

85 

86 if conf.getboolean("kerberos", "include_ip"): 

87 include_ip = "-a" 

88 else: 

89 include_ip = "-A" 

90 

91 cmdv: list[str] = [ 

92 conf.get_mandatory_value("kerberos", "kinit_path"), 

93 forwardable, 

94 include_ip, 

95 "-r", 

96 renewal_lifetime, 

97 "-k", # host ticket 

98 "-t", 

99 keytab, # specify keytab 

100 "-c", 

101 conf.get_mandatory_value("kerberos", "ccache"), # specify credentials cache 

102 cmd_principal, 

103 ] 

104 log.info("Re-initialising kerberos from keytab: %s", " ".join(shlex.quote(f) for f in cmdv)) 

105 

106 with subprocess.Popen( 

107 cmdv, 

108 stdout=subprocess.PIPE, 

109 stderr=subprocess.PIPE, 

110 close_fds=True, 

111 bufsize=-1, 

112 universal_newlines=True, 

113 ) as subp: 

114 subp.wait() 

115 if subp.returncode != 0: 

116 log.error( 

117 "Couldn't reinit from keytab! `kinit` exited with %s.\n%s\n%s", 

118 subp.returncode, 

119 "\n".join(subp.stdout.readlines() if subp.stdout else []), 

120 "\n".join(subp.stderr.readlines() if subp.stderr else []), 

121 ) 

122 if exit_on_fail: 

123 sys.exit(subp.returncode) 

124 else: 

125 return subp.returncode 

126 

127 global NEED_KRB181_WORKAROUND 

128 if NEED_KRB181_WORKAROUND is None: 

129 NEED_KRB181_WORKAROUND = detect_conf_var() 

130 if NEED_KRB181_WORKAROUND: 

131 # (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we 

132 # renew the ticket after the initial valid time. 

133 time.sleep(1.5) 

134 ret = perform_krb181_workaround(cmd_principal) 

135 if exit_on_fail and ret != 0: 

136 sys.exit(ret) 

137 else: 

138 return ret 

139 return 0 

140 

141 

142def perform_krb181_workaround(principal: str): 

143 """ 

144 Workaround for Kerberos 1.8.1. 

145 

146 :param principal: principal name 

147 :return: None 

148 """ 

149 cmdv: list[str] = [ 

150 conf.get_mandatory_value("kerberos", "kinit_path"), 

151 "-c", 

152 conf.get_mandatory_value("kerberos", "ccache"), 

153 "-R", 

154 ] # Renew ticket_cache 

155 

156 log.info("Renewing kerberos ticket to work around kerberos 1.8.1: %s", " ".join(cmdv)) 

157 

158 ret = subprocess.call(cmdv, close_fds=True) 

159 

160 if ret != 0: 

161 principal = f"{principal or conf.get('kerberos', 'principal')}/{get_hostname()}" 

162 ccache = conf.get("kerberos", "ccache") 

163 log.error( 

164 "Couldn't renew kerberos ticket in order to work around Kerberos 1.8.1 issue. Please check that " 

165 "the ticket for '%s' is still renewable:\n $ kinit -f -c %s\nIf the 'renew until' date is the " 

166 "same as the 'valid starting' date, the ticket cannot be renewed. Please check your KDC " 

167 "configuration, and the ticket renewal policy (maxrenewlife) for the '%s' and `krbtgt' " 

168 "principals.", 

169 principal, 

170 ccache, 

171 principal, 

172 ) 

173 return ret 

174 

175 

176def detect_conf_var() -> bool: 

177 """ 

178 Autodetect the Kerberos ticket configuration. 

179 

180 Return true if the ticket cache contains "conf" information as is found 

181 in ticket caches of Kerberos 1.8.1 or later. This is incompatible with the 

182 Sun Java Krb5LoginModule in Java6, so we need to take an action to work 

183 around it. 

184 """ 

185 ticket_cache = conf.get_mandatory_value("kerberos", "ccache") 

186 

187 with open(ticket_cache, "rb") as file: 

188 # Note: this file is binary, so we check against a bytearray. 

189 return b"X-CACHECONF:" in file.read() 

190 

191 

192def run(principal: str | None, keytab: str, mode: KerberosMode = KerberosMode.STANDARD): 

193 """ 

194 Run the kerberos renewer. 

195 

196 :param principal: principal name 

197 :param keytab: keytab file 

198 :param mode: mode to run the airflow kerberos in 

199 :return: None 

200 """ 

201 if not keytab: 

202 log.warning("Keytab renewer not starting, no keytab configured") 

203 sys.exit(0) 

204 

205 log.info("Using airflow kerberos with mode: %s", mode.value) 

206 

207 if mode == KerberosMode.STANDARD: 

208 while True: 

209 renew_from_kt(principal, keytab) 

210 time.sleep(conf.getint("kerberos", "reinit_frequency")) 

211 elif mode == KerberosMode.ONE_TIME: 

212 renew_from_kt(principal, keytab)