Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/secrets/local_filesystem.py: 23%

154 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Objects relating to retrieving connections and variables from local file.""" 

19from __future__ import annotations 

20 

21import json 

22import logging 

23import os 

24import warnings 

25from collections import defaultdict 

26from inspect import signature 

27from json import JSONDecodeError 

28from typing import TYPE_CHECKING, Any 

29 

30from airflow.exceptions import ( 

31 AirflowException, 

32 AirflowFileParseException, 

33 ConnectionNotUnique, 

34 FileSyntaxError, 

35 RemovedInAirflow3Warning, 

36) 

37from airflow.secrets.base_secrets import BaseSecretsBackend 

38from airflow.utils import yaml 

39from airflow.utils.file import COMMENT_PATTERN 

40from airflow.utils.log.logging_mixin import LoggingMixin 

41 

42log = logging.getLogger(__name__) 

43 

44if TYPE_CHECKING: 

45 from airflow.models.connection import Connection 

46 

47 

48def get_connection_parameter_names() -> set[str]: 

49 """Returns :class:`airflow.models.connection.Connection` constructor parameters.""" 

50 from airflow.models.connection import Connection 

51 

52 return {k for k in signature(Connection.__init__).parameters.keys() if k != "self"} 

53 

54 

55def _parse_env_file(file_path: str) -> tuple[dict[str, list[str]], list[FileSyntaxError]]: 

56 """ 

57 Parse a file in the ``.env`` format. 

58 

59 .. code-block:: text 

60 

61 MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2 

62 

63 :param file_path: The location of the file that will be processed. 

64 :return: Tuple with mapping of key and list of values and list of syntax errors 

65 """ 

66 with open(file_path) as f: 

67 content = f.read() 

68 

69 secrets: dict[str, list[str]] = defaultdict(list) 

70 errors: list[FileSyntaxError] = [] 

71 for line_no, line in enumerate(content.splitlines(), 1): 

72 if not line: 

73 # Ignore empty line 

74 continue 

75 

76 if COMMENT_PATTERN.match(line): 

77 # Ignore comments 

78 continue 

79 

80 key, sep, value = line.partition("=") 

81 if not sep: 

82 errors.append( 

83 FileSyntaxError( 

84 line_no=line_no, 

85 message='Invalid line format. The line should contain at least one equal sign ("=").', 

86 ) 

87 ) 

88 continue 

89 

90 if not value: 

91 errors.append( 

92 FileSyntaxError( 

93 line_no=line_no, 

94 message="Invalid line format. Key is empty.", 

95 ) 

96 ) 

97 secrets[key].append(value) 

98 return secrets, errors 

99 

100 

101def _parse_yaml_file(file_path: str) -> tuple[dict[str, list[str]], list[FileSyntaxError]]: 

102 """ 

103 Parse a file in the YAML format. 

104 

105 :param file_path: The location of the file that will be processed. 

106 :return: Tuple with mapping of key and list of values and list of syntax errors 

107 """ 

108 with open(file_path) as f: 

109 content = f.read() 

110 

111 if not content: 

112 return {}, [FileSyntaxError(line_no=1, message="The file is empty.")] 

113 try: 

114 secrets = yaml.safe_load(content) 

115 except yaml.MarkedYAMLError as e: 

116 err_line_no = e.problem_mark.line if e.problem_mark else -1 

117 return {}, [FileSyntaxError(line_no=err_line_no, message=str(e))] 

118 if not isinstance(secrets, dict): 

119 return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")] 

120 

121 return secrets, [] 

122 

123 

124def _parse_json_file(file_path: str) -> tuple[dict[str, Any], list[FileSyntaxError]]: 

125 """ 

126 Parse a file in the JSON format. 

127 

128 :param file_path: The location of the file that will be processed. 

129 :return: Tuple with mapping of key and list of values and list of syntax errors 

130 """ 

131 with open(file_path) as f: 

132 content = f.read() 

133 

134 if not content: 

135 return {}, [FileSyntaxError(line_no=1, message="The file is empty.")] 

136 try: 

137 secrets = json.loads(content) 

138 except JSONDecodeError as e: 

139 return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)] 

140 if not isinstance(secrets, dict): 

141 return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")] 

142 

143 return secrets, [] 

144 

145 

146FILE_PARSERS = { 

147 "env": _parse_env_file, 

148 "json": _parse_json_file, 

149 "yaml": _parse_yaml_file, 

150 "yml": _parse_yaml_file, 

151} 

152 

153 

154def _parse_secret_file(file_path: str) -> dict[str, Any]: 

155 """ 

156 Based on the file extension format, selects a parser, and parses the file. 

157 

158 :param file_path: The location of the file that will be processed. 

159 :return: Map of secret key (e.g. connection ID) and value. 

160 """ 

161 if not os.path.exists(file_path): 

162 raise AirflowException( 

163 f"File {file_path} was not found. Check the configuration of your Secrets backend." 

164 ) 

165 

166 log.debug("Parsing file: %s", file_path) 

167 

168 ext = file_path.rsplit(".", 2)[-1].lower() 

169 

170 if ext not in FILE_PARSERS: 

171 raise AirflowException( 

172 "Unsupported file format. The file must have one of the following extensions: " 

173 ".env .json .yaml .yml" 

174 ) 

175 

176 secrets, parse_errors = FILE_PARSERS[ext](file_path) 

177 

178 log.debug("Parsed file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets)) 

179 

180 if parse_errors: 

181 raise AirflowFileParseException( 

182 "Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors 

183 ) 

184 

185 return secrets 

186 

187 

188def _create_connection(conn_id: str, value: Any): 

189 """Creates a connection based on a URL or JSON object.""" 

190 from airflow.models.connection import Connection 

191 

192 if isinstance(value, str): 

193 return Connection(conn_id=conn_id, uri=value) 

194 if isinstance(value, dict): 

195 connection_parameter_names = get_connection_parameter_names() | {"extra_dejson"} 

196 current_keys = set(value.keys()) 

197 if not current_keys.issubset(connection_parameter_names): 

198 illegal_keys = current_keys - connection_parameter_names 

199 illegal_keys_list = ", ".join(illegal_keys) 

200 raise AirflowException( 

201 f"The object have illegal keys: {illegal_keys_list}. " 

202 f"The dictionary can only contain the following keys: {connection_parameter_names}" 

203 ) 

204 if "extra" in value and "extra_dejson" in value: 

205 raise AirflowException( 

206 "The extra and extra_dejson parameters are mutually exclusive. " 

207 "Please provide only one parameter." 

208 ) 

209 if "extra_dejson" in value: 

210 value["extra"] = json.dumps(value["extra_dejson"]) 

211 del value["extra_dejson"] 

212 

213 if "conn_id" in current_keys and conn_id != value["conn_id"]: 

214 raise AirflowException( 

215 f"Mismatch conn_id. " 

216 f"The dictionary key has the value: {value['conn_id']}. " 

217 f"The item has the value: {conn_id}." 

218 ) 

219 value["conn_id"] = conn_id 

220 return Connection(**value) 

221 raise AirflowException( 

222 f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object." 

223 ) 

224 

225 

226def load_variables(file_path: str) -> dict[str, str]: 

227 """ 

228 Load variables from a text file. 

229 

230 ``JSON``, `YAML` and ``.env`` files are supported. 

231 

232 :param file_path: The location of the file that will be processed. 

233 """ 

234 log.debug("Loading variables from a text file") 

235 

236 secrets = _parse_secret_file(file_path) 

237 invalid_keys = [key for key, values in secrets.items() if isinstance(values, list) and len(values) != 1] 

238 if invalid_keys: 

239 raise AirflowException(f'The "{file_path}" file contains multiple values for keys: {invalid_keys}') 

240 variables = {key: values[0] if isinstance(values, list) else values for key, values in secrets.items()} 

241 log.debug("Loaded %d variables: ", len(variables)) 

242 return variables 

243 

244 

245def load_connections(file_path) -> dict[str, list[Any]]: 

246 """Deprecated: Please use `airflow.secrets.local_filesystem.load_connections_dict`.""" 

247 warnings.warn( 

248 "This function is deprecated. Please use `airflow.secrets.local_filesystem.load_connections_dict`.", 

249 RemovedInAirflow3Warning, 

250 stacklevel=2, 

251 ) 

252 return {k: [v] for k, v in load_connections_dict(file_path).values()} 

253 

254 

255def load_connections_dict(file_path: str) -> dict[str, Any]: 

256 """ 

257 Load connection from text file. 

258 

259 ``JSON``, `YAML` and ``.env`` files are supported. 

260 

261 :return: A dictionary where the key contains a connection ID and the value contains the connection. 

262 """ 

263 log.debug("Loading connection") 

264 

265 secrets: dict[str, Any] = _parse_secret_file(file_path) 

266 connection_by_conn_id = {} 

267 for key, secret_values in list(secrets.items()): 

268 if isinstance(secret_values, list): 

269 if len(secret_values) > 1: 

270 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.") 

271 

272 for secret_value in secret_values: 

273 connection_by_conn_id[key] = _create_connection(key, secret_value) 

274 else: 

275 connection_by_conn_id[key] = _create_connection(key, secret_values) 

276 

277 num_conn = len(connection_by_conn_id) 

278 log.debug("Loaded %d connections", num_conn) 

279 

280 return connection_by_conn_id 

281 

282 

283class LocalFilesystemBackend(BaseSecretsBackend, LoggingMixin): 

284 """ 

285 Retrieves Connection objects and Variables from local files. 

286 

287 ``JSON``, `YAML` and ``.env`` files are supported. 

288 

289 :param variables_file_path: File location with variables data. 

290 :param connections_file_path: File location with connection data. 

291 """ 

292 

293 def __init__(self, variables_file_path: str | None = None, connections_file_path: str | None = None): 

294 super().__init__() 

295 self.variables_file = variables_file_path 

296 self.connections_file = connections_file_path 

297 

298 @property 

299 def _local_variables(self) -> dict[str, str]: 

300 if not self.variables_file: 

301 self.log.debug("The file for variables is not specified. Skipping") 

302 # The user may not specify any file. 

303 return {} 

304 secrets = load_variables(self.variables_file) 

305 return secrets 

306 

307 @property 

308 def _local_connections(self) -> dict[str, Connection]: 

309 if not self.connections_file: 

310 self.log.debug("The file for connection is not specified. Skipping") 

311 # The user may not specify any file. 

312 return {} 

313 return load_connections_dict(self.connections_file) 

314 

315 def get_connection(self, conn_id: str) -> Connection | None: 

316 if conn_id in self._local_connections: 

317 return self._local_connections[conn_id] 

318 return None 

319 

320 def get_connections(self, conn_id: str) -> list[Any]: 

321 warnings.warn( 

322 "This method is deprecated. Please use " 

323 "`airflow.secrets.local_filesystem.LocalFilesystemBackend.get_connection`.", 

324 RemovedInAirflow3Warning, 

325 stacklevel=2, 

326 ) 

327 conn = self.get_connection(conn_id=conn_id) 

328 if conn: 

329 return [conn] 

330 return [] 

331 

332 def get_variable(self, key: str) -> str | None: 

333 return self._local_variables.get(key)