Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/secrets_backend/base.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

50 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import inspect 

20from abc import ABC 

21from collections.abc import Callable 

22 

23 

24def _accepts_team_name(method: Callable) -> bool: 

25 """ 

26 Return whether a secrets-backend method accepts the ``team_name`` keyword. 

27 

28 Backends written before Airflow 3.2 override ``get_conn_value`` / ``get_variable`` / 

29 ``get_connection`` with the legacy ``(self, conn_id)`` / ``(self, key)`` signature. 

30 AIP-67 (multi-team) added a ``team_name`` keyword; forwarding it to those raises 

31 ``TypeError``. A method accepts it if it declares a ``team_name`` parameter or a 

32 ``**kwargs`` catch-all. 

33 """ 

34 try: 

35 parameters = inspect.signature(method).parameters 

36 except (TypeError, ValueError): 

37 # Un-introspectable callable (e.g. C-implemented): assume the 3.2+ signature. 

38 return True 

39 return "team_name" in parameters or any( 

40 p.kind is inspect.Parameter.VAR_KEYWORD for p in parameters.values() 

41 ) 

42 

43 

44def call_secrets_backend_method(method: Callable, *, team_name: str | None, **kwargs): 

45 """ 

46 Call a secrets-backend lookup ``method``, forwarding ``team_name`` only when supported. 

47 

48 Forward ``team_name`` to backends that accept it (3.2+ overrides) and omit it for 

49 pre-3.2 overrides, so older bundled providers and custom backends keep working -- 

50 in both single-team and multi-team deployments -- without being forced to add the 

51 parameter. A ``TypeError`` raised inside an accepting backend is left to propagate 

52 rather than retried without ``team_name``, which could mask the error and resolve a 

53 team-scoped lookup against the global scope. 

54 """ 

55 if _accepts_team_name(method): 

56 return method(team_name=team_name, **kwargs) 

57 return method(**kwargs) 

58 

59 

60class BaseSecretsBackend(ABC): 

61 """Abstract base class to retrieve Connection object given a conn_id or Variable given a key.""" 

62 

63 @staticmethod 

64 def build_path(path_prefix: str, secret_id: str, sep: str = "/") -> str: 

65 """ 

66 Given conn_id, build path for Secrets Backend. 

67 

68 :param path_prefix: Prefix of the path to get secret 

69 :param secret_id: Secret id 

70 :param sep: separator used to concatenate connections_prefix and conn_id. Default: "/" 

71 """ 

72 return f"{path_prefix}{sep}{secret_id}" 

73 

74 def get_conn_value(self, conn_id: str, team_name: str | None = None) -> str | None: 

75 """ 

76 Retrieve from Secrets Backend a string value representing the Connection object. 

77 

78 If the client your secrets backend uses already returns a python dict, you should override 

79 ``get_connection`` instead. 

80 

81 :param conn_id: connection id 

82 :param team_name: Team name associated to the task trying to access the connection (if any) 

83 """ 

84 raise NotImplementedError 

85 

86 def get_variable(self, key: str, team_name: str | None = None) -> str | None: 

87 """ 

88 Return value for Airflow Variable. 

89 

90 :param key: Variable Key 

91 :param team_name: Team name associated to the task trying to access the variable (if any) 

92 :return: Variable Value 

93 """ 

94 raise NotImplementedError() 

95 

96 def get_config(self, key: str) -> str | None: 

97 """ 

98 Return value for Airflow Config Key. 

99 

100 :param key: Config Key 

101 :return: Config Value 

102 """ 

103 return None 

104 

105 def _set_connection_class(self, conn_class: type) -> None: 

106 if not isinstance(conn_class, type): 

107 raise TypeError(f"Connection class must be a type/class, got {type(conn_class).__name__}") 

108 self._connection_class = conn_class 

109 

110 def _get_connection_class(self) -> type: 

111 """Get the Connection class to use for deserialization.""" 

112 conn_class = getattr(self, "_connection_class", None) 

113 if conn_class is None: 

114 raise RuntimeError( 

115 "Connection class not set on backend instance. " 

116 "Backends must be instantiated via initialize_secrets_backends() " 

117 "or have _connection_class set manually." 

118 ) 

119 return conn_class 

120 

121 @staticmethod 

122 def _deserialize_connection_value(conn_class: type, conn_id: str, value: str): 

123 value = value.strip() 

124 if value[0] == "{": 

125 return conn_class.from_json(value=value, conn_id=conn_id) # type: ignore[attr-defined] 

126 

127 # TODO: Only sdk has from_uri defined on it. Is it worthwhile developing the core path or not? 

128 if hasattr(conn_class, "from_uri"): 

129 return conn_class.from_uri(conn_id=conn_id, uri=value) 

130 return conn_class(conn_id=conn_id, uri=value) 

131 

132 def deserialize_connection(self, conn_id: str, value: str): 

133 """ 

134 Given a serialized representation of the airflow Connection, return an instance. 

135 

136 Uses the Connection class set on this class (which should be set to the appropriate Connection class for the execution context). 

137 Uses Connection.from_json() for JSON format, Connection(uri=...) for URI format. 

138 

139 :param conn_id: connection id 

140 :param value: the serialized representation of the Connection object 

141 :return: the deserialized Connection 

142 """ 

143 conn_class = self._get_connection_class() 

144 return self._deserialize_connection_value(conn_class, conn_id, value) 

145 

146 def get_connection(self, conn_id: str, team_name: str | None = None): 

147 """ 

148 Return connection object with a given ``conn_id``. 

149 

150 :param conn_id: connection id 

151 :param team_name: Team name associated to the task trying to access the connection (if any) 

152 :return: Connection object or None 

153 """ 

154 value = call_secrets_backend_method(self.get_conn_value, team_name=team_name, conn_id=conn_id) 

155 if value: 

156 return self.deserialize_connection(conn_id=conn_id, value=value) 

157 return None