Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/secrets/local_filesystem.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Objects relating to retrieving connections and variables from local file."""
20from __future__ import annotations
22import json
23import logging
24import os
25import warnings
26from collections import defaultdict
27from inspect import signature
28from json import JSONDecodeError
29from typing import TYPE_CHECKING, Any
31from airflow.exceptions import (
32 AirflowException,
33 AirflowFileParseException,
34 ConnectionNotUnique,
35 FileSyntaxError,
36 RemovedInAirflow3Warning,
37)
38from airflow.secrets.base_secrets import BaseSecretsBackend
39from airflow.utils import yaml
40from airflow.utils.file import COMMENT_PATTERN
41from airflow.utils.log.logging_mixin import LoggingMixin
43log = logging.getLogger(__name__)
45if TYPE_CHECKING:
46 from airflow.models.connection import Connection
49def get_connection_parameter_names() -> set[str]:
50 """Return :class:`airflow.models.connection.Connection` constructor parameters."""
51 from airflow.models.connection import Connection
53 return {k for k in signature(Connection.__init__).parameters.keys() if k != "self"}
56def _parse_env_file(file_path: str) -> tuple[dict[str, list[str]], list[FileSyntaxError]]:
57 """
58 Parse a file in the ``.env`` format.
60 .. code-block:: text
62 MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2
64 :param file_path: The location of the file that will be processed.
65 :return: Tuple with mapping of key and list of values and list of syntax errors
66 """
67 with open(file_path) as f:
68 content = f.read()
70 secrets: dict[str, list[str]] = defaultdict(list)
71 errors: list[FileSyntaxError] = []
72 for line_no, line in enumerate(content.splitlines(), 1):
73 if not line:
74 # Ignore empty line
75 continue
77 if COMMENT_PATTERN.match(line):
78 # Ignore comments
79 continue
81 key, sep, value = line.partition("=")
82 if not sep:
83 errors.append(
84 FileSyntaxError(
85 line_no=line_no,
86 message='Invalid line format. The line should contain at least one equal sign ("=").',
87 )
88 )
89 continue
91 if not value:
92 errors.append(
93 FileSyntaxError(
94 line_no=line_no,
95 message="Invalid line format. Key is empty.",
96 )
97 )
98 secrets[key].append(value)
99 return secrets, errors
102def _parse_yaml_file(file_path: str) -> tuple[dict[str, list[str]], list[FileSyntaxError]]:
103 """
104 Parse a file in the YAML format.
106 :param file_path: The location of the file that will be processed.
107 :return: Tuple with mapping of key and list of values and list of syntax errors
108 """
109 with open(file_path) as f:
110 content = f.read()
112 if not content:
113 return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
114 try:
115 secrets = yaml.safe_load(content)
116 except yaml.MarkedYAMLError as e:
117 err_line_no = e.problem_mark.line if e.problem_mark else -1
118 return {}, [FileSyntaxError(line_no=err_line_no, message=str(e))]
119 if not isinstance(secrets, dict):
120 return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
122 return secrets, []
125def _parse_json_file(file_path: str) -> tuple[dict[str, Any], list[FileSyntaxError]]:
126 """
127 Parse a file in the JSON format.
129 :param file_path: The location of the file that will be processed.
130 :return: Tuple with mapping of key and list of values and list of syntax errors
131 """
132 with open(file_path) as f:
133 content = f.read()
135 if not content:
136 return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
137 try:
138 secrets = json.loads(content)
139 except JSONDecodeError as e:
140 return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
141 if not isinstance(secrets, dict):
142 return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
144 return secrets, []
147FILE_PARSERS = {
148 "env": _parse_env_file,
149 "json": _parse_json_file,
150 "yaml": _parse_yaml_file,
151 "yml": _parse_yaml_file,
152}
155def _parse_secret_file(file_path: str) -> dict[str, Any]:
156 """
157 Based on the file extension format, selects a parser, and parses the file.
159 :param file_path: The location of the file that will be processed.
160 :return: Map of secret key (e.g. connection ID) and value.
161 """
162 if not os.path.exists(file_path):
163 raise AirflowException(
164 f"File {file_path} was not found. Check the configuration of your Secrets backend."
165 )
167 log.debug("Parsing file: %s", file_path)
169 ext = file_path.rsplit(".", 2)[-1].lower()
171 if ext not in FILE_PARSERS:
172 raise AirflowException(
173 "Unsupported file format. The file must have one of the following extensions: "
174 ".env .json .yaml .yml"
175 )
177 secrets, parse_errors = FILE_PARSERS[ext](file_path)
179 log.debug("Parsed file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets))
181 if parse_errors:
182 raise AirflowFileParseException(
183 "Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
184 )
186 return secrets
189def _create_connection(conn_id: str, value: Any):
190 """Create a connection based on a URL or JSON object."""
191 from airflow.models.connection import Connection
193 if isinstance(value, str):
194 return Connection(conn_id=conn_id, uri=value)
195 if isinstance(value, dict):
196 connection_parameter_names = get_connection_parameter_names() | {"extra_dejson"}
197 current_keys = set(value.keys())
198 if not current_keys.issubset(connection_parameter_names):
199 illegal_keys = current_keys - connection_parameter_names
200 illegal_keys_list = ", ".join(illegal_keys)
201 raise AirflowException(
202 f"The object have illegal keys: {illegal_keys_list}. "
203 f"The dictionary can only contain the following keys: {connection_parameter_names}"
204 )
205 if "extra" in value and "extra_dejson" in value:
206 raise AirflowException(
207 "The extra and extra_dejson parameters are mutually exclusive. "
208 "Please provide only one parameter."
209 )
210 if "extra_dejson" in value:
211 value["extra"] = json.dumps(value["extra_dejson"])
212 del value["extra_dejson"]
214 if "conn_id" in current_keys and conn_id != value["conn_id"]:
215 raise AirflowException(
216 f"Mismatch conn_id. "
217 f"The dictionary key has the value: {value['conn_id']}. "
218 f"The item has the value: {conn_id}."
219 )
220 value["conn_id"] = conn_id
221 return Connection(**value)
222 raise AirflowException(
223 f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object."
224 )
227def load_variables(file_path: str) -> dict[str, str]:
228 """
229 Load variables from a text file.
231 ``JSON``, `YAML` and ``.env`` files are supported.
233 :param file_path: The location of the file that will be processed.
234 """
235 log.debug("Loading variables from a text file")
237 secrets = _parse_secret_file(file_path)
238 invalid_keys = [key for key, values in secrets.items() if isinstance(values, list) and len(values) != 1]
239 if invalid_keys:
240 raise AirflowException(f'The "{file_path}" file contains multiple values for keys: {invalid_keys}')
241 variables = {key: values[0] if isinstance(values, list) else values for key, values in secrets.items()}
242 log.debug("Loaded %d variables: ", len(variables))
243 return variables
246def load_connections(file_path) -> dict[str, list[Any]]:
247 """Use `airflow.secrets.local_filesystem.load_connections_dict`, this is deprecated."""
248 warnings.warn(
249 "This function is deprecated. Please use `airflow.secrets.local_filesystem.load_connections_dict`.",
250 RemovedInAirflow3Warning,
251 stacklevel=2,
252 )
253 return {k: [v] for k, v in load_connections_dict(file_path).values()}
256def load_connections_dict(file_path: str) -> dict[str, Any]:
257 """
258 Load connection from text file.
260 ``JSON``, `YAML` and ``.env`` files are supported.
262 :return: A dictionary where the key contains a connection ID and the value contains the connection.
263 """
264 log.debug("Loading connection")
266 secrets: dict[str, Any] = _parse_secret_file(file_path)
267 connection_by_conn_id = {}
268 for key, secret_values in list(secrets.items()):
269 if isinstance(secret_values, list):
270 if len(secret_values) > 1:
271 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.")
273 for secret_value in secret_values:
274 connection_by_conn_id[key] = _create_connection(key, secret_value)
275 else:
276 connection_by_conn_id[key] = _create_connection(key, secret_values)
278 num_conn = len(connection_by_conn_id)
279 log.debug("Loaded %d connections", num_conn)
281 return connection_by_conn_id
284class LocalFilesystemBackend(BaseSecretsBackend, LoggingMixin):
285 """
286 Retrieves Connection objects and Variables from local files.
288 ``JSON``, `YAML` and ``.env`` files are supported.
290 :param variables_file_path: File location with variables data.
291 :param connections_file_path: File location with connection data.
292 """
294 def __init__(self, variables_file_path: str | None = None, connections_file_path: str | None = None):
295 super().__init__()
296 self.variables_file = variables_file_path
297 self.connections_file = connections_file_path
299 @property
300 def _local_variables(self) -> dict[str, str]:
301 if not self.variables_file:
302 self.log.debug("The file for variables is not specified. Skipping")
303 # The user may not specify any file.
304 return {}
305 secrets = load_variables(self.variables_file)
306 return secrets
308 @property
309 def _local_connections(self) -> dict[str, Connection]:
310 if not self.connections_file:
311 self.log.debug("The file for connection is not specified. Skipping")
312 # The user may not specify any file.
313 return {}
314 return load_connections_dict(self.connections_file)
316 def get_connection(self, conn_id: str) -> Connection | None:
317 if conn_id in self._local_connections:
318 return self._local_connections[conn_id]
319 return None
321 def get_connections(self, conn_id: str) -> list[Any]:
322 warnings.warn(
323 "This method is deprecated. Please use "
324 "`airflow.secrets.local_filesystem.LocalFilesystemBackend.get_connection`.",
325 RemovedInAirflow3Warning,
326 stacklevel=2,
327 )
328 conn = self.get_connection(conn_id=conn_id)
329 if conn:
330 return [conn]
331 return []
333 def get_variable(self, key: str) -> str | None:
334 return self._local_variables.get(key)