Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/security/kerberos.py: 0%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from enum import Enum
22# Licensed to Cloudera, Inc. under one
23# or more contributor license agreements. See the NOTICE file
24# distributed with this work for additional information
25# regarding copyright ownership. Cloudera, Inc. licenses this file
26# to you under the Apache License, Version 2.0 (the
27# "License"); you may not use this file except in compliance
28# with the License. You may obtain a copy of the License at
29#
30# http://www.apache.org/licenses/LICENSE-2.0
31#
32# Unless required by applicable law or agreed to in writing, software
33# distributed under the License is distributed on an "AS IS" BASIS,
34# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
35# See the License for the specific language governing permissions and
36# limitations under the License.
37"""Kerberos security provider."""
38import logging
39import shlex
40import subprocess
41import sys
42import time
44from airflow.configuration import conf
45from airflow.utils.net import get_hostname
47NEED_KRB181_WORKAROUND: bool | None = None
49log = logging.getLogger(__name__)
52class KerberosMode(Enum):
53 """
54 Defines modes for running airflow kerberos.
56 :return: None.
57 """
59 STANDARD = "standard"
60 ONE_TIME = "one-time"
63def get_kerberos_principle(principal: str | None) -> str:
64 """Retrieve Kerberos principal. Fallback to principal from Airflow configuration if not provided."""
65 return principal or conf.get_mandatory_value("kerberos", "principal").replace("_HOST", get_hostname())
68def renew_from_kt(principal: str | None, keytab: str, exit_on_fail: bool = True):
69 """
70 Renew kerberos token from keytab.
72 :param principal: principal
73 :param keytab: keytab file
74 :return: None
75 """
76 # The config is specified in seconds. But we ask for that same amount in
77 # minutes to give ourselves a large renewal buffer.
78 renewal_lifetime = f"{conf.getint('kerberos', 'reinit_frequency')}m"
80 cmd_principal = get_kerberos_principle(principal)
81 if conf.getboolean("kerberos", "forwardable"):
82 forwardable = "-f"
83 else:
84 forwardable = "-F"
86 if conf.getboolean("kerberos", "include_ip"):
87 include_ip = "-a"
88 else:
89 include_ip = "-A"
91 cmdv: list[str] = [
92 conf.get_mandatory_value("kerberos", "kinit_path"),
93 forwardable,
94 include_ip,
95 "-r",
96 renewal_lifetime,
97 "-k", # host ticket
98 "-t",
99 keytab, # specify keytab
100 "-c",
101 conf.get_mandatory_value("kerberos", "ccache"), # specify credentials cache
102 cmd_principal,
103 ]
104 log.info("Re-initialising kerberos from keytab: %s", " ".join(shlex.quote(f) for f in cmdv))
106 with subprocess.Popen(
107 cmdv,
108 stdout=subprocess.PIPE,
109 stderr=subprocess.PIPE,
110 close_fds=True,
111 bufsize=-1,
112 universal_newlines=True,
113 ) as subp:
114 subp.wait()
115 if subp.returncode != 0:
116 log.error(
117 "Couldn't reinit from keytab! `kinit` exited with %s.\n%s\n%s",
118 subp.returncode,
119 "\n".join(subp.stdout.readlines() if subp.stdout else []),
120 "\n".join(subp.stderr.readlines() if subp.stderr else []),
121 )
122 if exit_on_fail:
123 sys.exit(subp.returncode)
124 else:
125 return subp.returncode
127 global NEED_KRB181_WORKAROUND
128 if NEED_KRB181_WORKAROUND is None:
129 NEED_KRB181_WORKAROUND = detect_conf_var()
130 if NEED_KRB181_WORKAROUND:
131 # (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
132 # renew the ticket after the initial valid time.
133 time.sleep(1.5)
134 ret = perform_krb181_workaround(cmd_principal)
135 if exit_on_fail and ret != 0:
136 sys.exit(ret)
137 else:
138 return ret
139 return 0
142def perform_krb181_workaround(principal: str):
143 """
144 Workaround for Kerberos 1.8.1.
146 :param principal: principal name
147 :return: None
148 """
149 cmdv: list[str] = [
150 conf.get_mandatory_value("kerberos", "kinit_path"),
151 "-c",
152 conf.get_mandatory_value("kerberos", "ccache"),
153 "-R",
154 ] # Renew ticket_cache
156 log.info("Renewing kerberos ticket to work around kerberos 1.8.1: %s", " ".join(cmdv))
158 ret = subprocess.call(cmdv, close_fds=True)
160 if ret != 0:
161 principal = f"{principal or conf.get('kerberos', 'principal')}/{get_hostname()}"
162 ccache = conf.get("kerberos", "ccache")
163 log.error(
164 "Couldn't renew kerberos ticket in order to work around Kerberos 1.8.1 issue. Please check that "
165 "the ticket for '%s' is still renewable:\n $ kinit -f -c %s\nIf the 'renew until' date is the "
166 "same as the 'valid starting' date, the ticket cannot be renewed. Please check your KDC "
167 "configuration, and the ticket renewal policy (maxrenewlife) for the '%s' and `krbtgt' "
168 "principals.",
169 principal,
170 ccache,
171 principal,
172 )
173 return ret
176def detect_conf_var() -> bool:
177 """
178 Autodetect the Kerberos ticket configuration.
180 Return true if the ticket cache contains "conf" information as is found
181 in ticket caches of Kerberos 1.8.1 or later. This is incompatible with the
182 Sun Java Krb5LoginModule in Java6, so we need to take an action to work
183 around it.
184 """
185 ticket_cache = conf.get_mandatory_value("kerberos", "ccache")
187 with open(ticket_cache, "rb") as file:
188 # Note: this file is binary, so we check against a bytearray.
189 return b"X-CACHECONF:" in file.read()
192def run(principal: str | None, keytab: str, mode: KerberosMode = KerberosMode.STANDARD):
193 """
194 Run the kerberos renewer.
196 :param principal: principal name
197 :param keytab: keytab file
198 :param mode: mode to run the airflow kerberos in
199 :return: None
200 """
201 if not keytab:
202 log.warning("Keytab renewer not starting, no keytab configured")
203 sys.exit(0)
205 log.info("Using airflow kerberos with mode: %s", mode.value)
207 if mode == KerberosMode.STANDARD:
208 while True:
209 renew_from_kt(principal, keytab)
210 time.sleep(conf.getint("kerberos", "reinit_frequency"))
211 elif mode == KerberosMode.ONE_TIME:
212 renew_from_kt(principal, keytab)