Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/secrets/local_filesystem.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

154 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Objects relating to retrieving connections and variables from local file.""" 

19 

20from __future__ import annotations 

21 

22import json 

23import logging 

24import os 

25import warnings 

26from collections import defaultdict 

27from inspect import signature 

28from json import JSONDecodeError 

29from typing import TYPE_CHECKING, Any 

30 

31from airflow.exceptions import ( 

32 AirflowException, 

33 AirflowFileParseException, 

34 ConnectionNotUnique, 

35 FileSyntaxError, 

36 RemovedInAirflow3Warning, 

37) 

38from airflow.secrets.base_secrets import BaseSecretsBackend 

39from airflow.utils import yaml 

40from airflow.utils.file import COMMENT_PATTERN 

41from airflow.utils.log.logging_mixin import LoggingMixin 

42 

43log = logging.getLogger(__name__) 

44 

45if TYPE_CHECKING: 

46 from airflow.models.connection import Connection 

47 

48 

49def get_connection_parameter_names() -> set[str]: 

50 """Return :class:`airflow.models.connection.Connection` constructor parameters.""" 

51 from airflow.models.connection import Connection 

52 

53 return {k for k in signature(Connection.__init__).parameters.keys() if k != "self"} 

54 

55 

56def _parse_env_file(file_path: str) -> tuple[dict[str, list[str]], list[FileSyntaxError]]: 

57 """ 

58 Parse a file in the ``.env`` format. 

59 

60 .. code-block:: text 

61 

62 MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2 

63 

64 :param file_path: The location of the file that will be processed. 

65 :return: Tuple with mapping of key and list of values and list of syntax errors 

66 """ 

67 with open(file_path) as f: 

68 content = f.read() 

69 

70 secrets: dict[str, list[str]] = defaultdict(list) 

71 errors: list[FileSyntaxError] = [] 

72 for line_no, line in enumerate(content.splitlines(), 1): 

73 if not line: 

74 # Ignore empty line 

75 continue 

76 

77 if COMMENT_PATTERN.match(line): 

78 # Ignore comments 

79 continue 

80 

81 key, sep, value = line.partition("=") 

82 if not sep: 

83 errors.append( 

84 FileSyntaxError( 

85 line_no=line_no, 

86 message='Invalid line format. The line should contain at least one equal sign ("=").', 

87 ) 

88 ) 

89 continue 

90 

91 if not value: 

92 errors.append( 

93 FileSyntaxError( 

94 line_no=line_no, 

95 message="Invalid line format. Key is empty.", 

96 ) 

97 ) 

98 secrets[key].append(value) 

99 return secrets, errors 

100 

101 

102def _parse_yaml_file(file_path: str) -> tuple[dict[str, list[str]], list[FileSyntaxError]]: 

103 """ 

104 Parse a file in the YAML format. 

105 

106 :param file_path: The location of the file that will be processed. 

107 :return: Tuple with mapping of key and list of values and list of syntax errors 

108 """ 

109 with open(file_path) as f: 

110 content = f.read() 

111 

112 if not content: 

113 return {}, [FileSyntaxError(line_no=1, message="The file is empty.")] 

114 try: 

115 secrets = yaml.safe_load(content) 

116 except yaml.MarkedYAMLError as e: 

117 err_line_no = e.problem_mark.line if e.problem_mark else -1 

118 return {}, [FileSyntaxError(line_no=err_line_no, message=str(e))] 

119 if not isinstance(secrets, dict): 

120 return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")] 

121 

122 return secrets, [] 

123 

124 

125def _parse_json_file(file_path: str) -> tuple[dict[str, Any], list[FileSyntaxError]]: 

126 """ 

127 Parse a file in the JSON format. 

128 

129 :param file_path: The location of the file that will be processed. 

130 :return: Tuple with mapping of key and list of values and list of syntax errors 

131 """ 

132 with open(file_path) as f: 

133 content = f.read() 

134 

135 if not content: 

136 return {}, [FileSyntaxError(line_no=1, message="The file is empty.")] 

137 try: 

138 secrets = json.loads(content) 

139 except JSONDecodeError as e: 

140 return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)] 

141 if not isinstance(secrets, dict): 

142 return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")] 

143 

144 return secrets, [] 

145 

146 

147FILE_PARSERS = { 

148 "env": _parse_env_file, 

149 "json": _parse_json_file, 

150 "yaml": _parse_yaml_file, 

151 "yml": _parse_yaml_file, 

152} 

153 

154 

155def _parse_secret_file(file_path: str) -> dict[str, Any]: 

156 """ 

157 Based on the file extension format, selects a parser, and parses the file. 

158 

159 :param file_path: The location of the file that will be processed. 

160 :return: Map of secret key (e.g. connection ID) and value. 

161 """ 

162 if not os.path.exists(file_path): 

163 raise AirflowException( 

164 f"File {file_path} was not found. Check the configuration of your Secrets backend." 

165 ) 

166 

167 log.debug("Parsing file: %s", file_path) 

168 

169 ext = file_path.rsplit(".", 2)[-1].lower() 

170 

171 if ext not in FILE_PARSERS: 

172 raise AirflowException( 

173 "Unsupported file format. The file must have one of the following extensions: " 

174 ".env .json .yaml .yml" 

175 ) 

176 

177 secrets, parse_errors = FILE_PARSERS[ext](file_path) 

178 

179 log.debug("Parsed file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets)) 

180 

181 if parse_errors: 

182 raise AirflowFileParseException( 

183 "Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors 

184 ) 

185 

186 return secrets 

187 

188 

189def _create_connection(conn_id: str, value: Any): 

190 """Create a connection based on a URL or JSON object.""" 

191 from airflow.models.connection import Connection 

192 

193 if isinstance(value, str): 

194 return Connection(conn_id=conn_id, uri=value) 

195 if isinstance(value, dict): 

196 connection_parameter_names = get_connection_parameter_names() | {"extra_dejson"} 

197 current_keys = set(value.keys()) 

198 if not current_keys.issubset(connection_parameter_names): 

199 illegal_keys = current_keys - connection_parameter_names 

200 illegal_keys_list = ", ".join(illegal_keys) 

201 raise AirflowException( 

202 f"The object have illegal keys: {illegal_keys_list}. " 

203 f"The dictionary can only contain the following keys: {connection_parameter_names}" 

204 ) 

205 if "extra" in value and "extra_dejson" in value: 

206 raise AirflowException( 

207 "The extra and extra_dejson parameters are mutually exclusive. " 

208 "Please provide only one parameter." 

209 ) 

210 if "extra_dejson" in value: 

211 value["extra"] = json.dumps(value["extra_dejson"]) 

212 del value["extra_dejson"] 

213 

214 if "conn_id" in current_keys and conn_id != value["conn_id"]: 

215 raise AirflowException( 

216 f"Mismatch conn_id. " 

217 f"The dictionary key has the value: {value['conn_id']}. " 

218 f"The item has the value: {conn_id}." 

219 ) 

220 value["conn_id"] = conn_id 

221 return Connection(**value) 

222 raise AirflowException( 

223 f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object." 

224 ) 

225 

226 

227def load_variables(file_path: str) -> dict[str, str]: 

228 """ 

229 Load variables from a text file. 

230 

231 ``JSON``, `YAML` and ``.env`` files are supported. 

232 

233 :param file_path: The location of the file that will be processed. 

234 """ 

235 log.debug("Loading variables from a text file") 

236 

237 secrets = _parse_secret_file(file_path) 

238 invalid_keys = [key for key, values in secrets.items() if isinstance(values, list) and len(values) != 1] 

239 if invalid_keys: 

240 raise AirflowException(f'The "{file_path}" file contains multiple values for keys: {invalid_keys}') 

241 variables = {key: values[0] if isinstance(values, list) else values for key, values in secrets.items()} 

242 log.debug("Loaded %d variables: ", len(variables)) 

243 return variables 

244 

245 

246def load_connections(file_path) -> dict[str, list[Any]]: 

247 """Use `airflow.secrets.local_filesystem.load_connections_dict`, this is deprecated.""" 

248 warnings.warn( 

249 "This function is deprecated. Please use `airflow.secrets.local_filesystem.load_connections_dict`.", 

250 RemovedInAirflow3Warning, 

251 stacklevel=2, 

252 ) 

253 return {k: [v] for k, v in load_connections_dict(file_path).values()} 

254 

255 

256def load_connections_dict(file_path: str) -> dict[str, Any]: 

257 """ 

258 Load connection from text file. 

259 

260 ``JSON``, `YAML` and ``.env`` files are supported. 

261 

262 :return: A dictionary where the key contains a connection ID and the value contains the connection. 

263 """ 

264 log.debug("Loading connection") 

265 

266 secrets: dict[str, Any] = _parse_secret_file(file_path) 

267 connection_by_conn_id = {} 

268 for key, secret_values in list(secrets.items()): 

269 if isinstance(secret_values, list): 

270 if len(secret_values) > 1: 

271 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.") 

272 

273 for secret_value in secret_values: 

274 connection_by_conn_id[key] = _create_connection(key, secret_value) 

275 else: 

276 connection_by_conn_id[key] = _create_connection(key, secret_values) 

277 

278 num_conn = len(connection_by_conn_id) 

279 log.debug("Loaded %d connections", num_conn) 

280 

281 return connection_by_conn_id 

282 

283 

284class LocalFilesystemBackend(BaseSecretsBackend, LoggingMixin): 

285 """ 

286 Retrieves Connection objects and Variables from local files. 

287 

288 ``JSON``, `YAML` and ``.env`` files are supported. 

289 

290 :param variables_file_path: File location with variables data. 

291 :param connections_file_path: File location with connection data. 

292 """ 

293 

294 def __init__(self, variables_file_path: str | None = None, connections_file_path: str | None = None): 

295 super().__init__() 

296 self.variables_file = variables_file_path 

297 self.connections_file = connections_file_path 

298 

299 @property 

300 def _local_variables(self) -> dict[str, str]: 

301 if not self.variables_file: 

302 self.log.debug("The file for variables is not specified. Skipping") 

303 # The user may not specify any file. 

304 return {} 

305 secrets = load_variables(self.variables_file) 

306 return secrets 

307 

308 @property 

309 def _local_connections(self) -> dict[str, Connection]: 

310 if not self.connections_file: 

311 self.log.debug("The file for connection is not specified. Skipping") 

312 # The user may not specify any file. 

313 return {} 

314 return load_connections_dict(self.connections_file) 

315 

316 def get_connection(self, conn_id: str) -> Connection | None: 

317 if conn_id in self._local_connections: 

318 return self._local_connections[conn_id] 

319 return None 

320 

321 def get_connections(self, conn_id: str) -> list[Any]: 

322 warnings.warn( 

323 "This method is deprecated. Please use " 

324 "`airflow.secrets.local_filesystem.LocalFilesystemBackend.get_connection`.", 

325 RemovedInAirflow3Warning, 

326 stacklevel=2, 

327 ) 

328 conn = self.get_connection(conn_id=conn_id) 

329 if conn: 

330 return [conn] 

331 return [] 

332 

333 def get_variable(self, key: str) -> str | None: 

334 return self._local_variables.get(key)