Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/secrets/local_filesystem.py: 23%
154 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Objects relating to retrieving connections and variables from local file."""
19from __future__ import annotations
21import json
22import logging
23import os
24import warnings
25from collections import defaultdict
26from inspect import signature
27from json import JSONDecodeError
28from typing import TYPE_CHECKING, Any
30from airflow.exceptions import (
31 AirflowException,
32 AirflowFileParseException,
33 ConnectionNotUnique,
34 FileSyntaxError,
35 RemovedInAirflow3Warning,
36)
37from airflow.secrets.base_secrets import BaseSecretsBackend
38from airflow.utils import yaml
39from airflow.utils.file import COMMENT_PATTERN
40from airflow.utils.log.logging_mixin import LoggingMixin
42log = logging.getLogger(__name__)
44if TYPE_CHECKING:
45 from airflow.models.connection import Connection
48def get_connection_parameter_names() -> set[str]:
49 """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
50 from airflow.models.connection import Connection
52 return {k for k in signature(Connection.__init__).parameters.keys() if k != "self"}
55def _parse_env_file(file_path: str) -> tuple[dict[str, list[str]], list[FileSyntaxError]]:
56 """
57 Parse a file in the ``.env`` format.
59 .. code-block:: text
61 MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2
63 :param file_path: The location of the file that will be processed.
64 :return: Tuple with mapping of key and list of values and list of syntax errors
65 """
66 with open(file_path) as f:
67 content = f.read()
69 secrets: dict[str, list[str]] = defaultdict(list)
70 errors: list[FileSyntaxError] = []
71 for line_no, line in enumerate(content.splitlines(), 1):
72 if not line:
73 # Ignore empty line
74 continue
76 if COMMENT_PATTERN.match(line):
77 # Ignore comments
78 continue
80 key, sep, value = line.partition("=")
81 if not sep:
82 errors.append(
83 FileSyntaxError(
84 line_no=line_no,
85 message='Invalid line format. The line should contain at least one equal sign ("=").',
86 )
87 )
88 continue
90 if not value:
91 errors.append(
92 FileSyntaxError(
93 line_no=line_no,
94 message="Invalid line format. Key is empty.",
95 )
96 )
97 secrets[key].append(value)
98 return secrets, errors
101def _parse_yaml_file(file_path: str) -> tuple[dict[str, list[str]], list[FileSyntaxError]]:
102 """
103 Parse a file in the YAML format.
105 :param file_path: The location of the file that will be processed.
106 :return: Tuple with mapping of key and list of values and list of syntax errors
107 """
108 with open(file_path) as f:
109 content = f.read()
111 if not content:
112 return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
113 try:
114 secrets = yaml.safe_load(content)
115 except yaml.MarkedYAMLError as e:
116 err_line_no = e.problem_mark.line if e.problem_mark else -1
117 return {}, [FileSyntaxError(line_no=err_line_no, message=str(e))]
118 if not isinstance(secrets, dict):
119 return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
121 return secrets, []
124def _parse_json_file(file_path: str) -> tuple[dict[str, Any], list[FileSyntaxError]]:
125 """
126 Parse a file in the JSON format.
128 :param file_path: The location of the file that will be processed.
129 :return: Tuple with mapping of key and list of values and list of syntax errors
130 """
131 with open(file_path) as f:
132 content = f.read()
134 if not content:
135 return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
136 try:
137 secrets = json.loads(content)
138 except JSONDecodeError as e:
139 return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
140 if not isinstance(secrets, dict):
141 return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
143 return secrets, []
146FILE_PARSERS = {
147 "env": _parse_env_file,
148 "json": _parse_json_file,
149 "yaml": _parse_yaml_file,
150 "yml": _parse_yaml_file,
151}
154def _parse_secret_file(file_path: str) -> dict[str, Any]:
155 """
156 Based on the file extension format, selects a parser, and parses the file.
158 :param file_path: The location of the file that will be processed.
159 :return: Map of secret key (e.g. connection ID) and value.
160 """
161 if not os.path.exists(file_path):
162 raise AirflowException(
163 f"File {file_path} was not found. Check the configuration of your Secrets backend."
164 )
166 log.debug("Parsing file: %s", file_path)
168 ext = file_path.rsplit(".", 2)[-1].lower()
170 if ext not in FILE_PARSERS:
171 raise AirflowException(
172 "Unsupported file format. The file must have one of the following extensions: "
173 ".env .json .yaml .yml"
174 )
176 secrets, parse_errors = FILE_PARSERS[ext](file_path)
178 log.debug("Parsed file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets))
180 if parse_errors:
181 raise AirflowFileParseException(
182 "Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
183 )
185 return secrets
188def _create_connection(conn_id: str, value: Any):
189 """Creates a connection based on a URL or JSON object."""
190 from airflow.models.connection import Connection
192 if isinstance(value, str):
193 return Connection(conn_id=conn_id, uri=value)
194 if isinstance(value, dict):
195 connection_parameter_names = get_connection_parameter_names() | {"extra_dejson"}
196 current_keys = set(value.keys())
197 if not current_keys.issubset(connection_parameter_names):
198 illegal_keys = current_keys - connection_parameter_names
199 illegal_keys_list = ", ".join(illegal_keys)
200 raise AirflowException(
201 f"The object have illegal keys: {illegal_keys_list}. "
202 f"The dictionary can only contain the following keys: {connection_parameter_names}"
203 )
204 if "extra" in value and "extra_dejson" in value:
205 raise AirflowException(
206 "The extra and extra_dejson parameters are mutually exclusive. "
207 "Please provide only one parameter."
208 )
209 if "extra_dejson" in value:
210 value["extra"] = json.dumps(value["extra_dejson"])
211 del value["extra_dejson"]
213 if "conn_id" in current_keys and conn_id != value["conn_id"]:
214 raise AirflowException(
215 f"Mismatch conn_id. "
216 f"The dictionary key has the value: {value['conn_id']}. "
217 f"The item has the value: {conn_id}."
218 )
219 value["conn_id"] = conn_id
220 return Connection(**value)
221 raise AirflowException(
222 f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object."
223 )
226def load_variables(file_path: str) -> dict[str, str]:
227 """
228 Load variables from a text file.
230 ``JSON``, `YAML` and ``.env`` files are supported.
232 :param file_path: The location of the file that will be processed.
233 """
234 log.debug("Loading variables from a text file")
236 secrets = _parse_secret_file(file_path)
237 invalid_keys = [key for key, values in secrets.items() if isinstance(values, list) and len(values) != 1]
238 if invalid_keys:
239 raise AirflowException(f'The "{file_path}" file contains multiple values for keys: {invalid_keys}')
240 variables = {key: values[0] if isinstance(values, list) else values for key, values in secrets.items()}
241 log.debug("Loaded %d variables: ", len(variables))
242 return variables
245def load_connections(file_path) -> dict[str, list[Any]]:
246 """Deprecated: Please use `airflow.secrets.local_filesystem.load_connections_dict`."""
247 warnings.warn(
248 "This function is deprecated. Please use `airflow.secrets.local_filesystem.load_connections_dict`.",
249 RemovedInAirflow3Warning,
250 stacklevel=2,
251 )
252 return {k: [v] for k, v in load_connections_dict(file_path).values()}
255def load_connections_dict(file_path: str) -> dict[str, Any]:
256 """
257 Load connection from text file.
259 ``JSON``, `YAML` and ``.env`` files are supported.
261 :return: A dictionary where the key contains a connection ID and the value contains the connection.
262 """
263 log.debug("Loading connection")
265 secrets: dict[str, Any] = _parse_secret_file(file_path)
266 connection_by_conn_id = {}
267 for key, secret_values in list(secrets.items()):
268 if isinstance(secret_values, list):
269 if len(secret_values) > 1:
270 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.")
272 for secret_value in secret_values:
273 connection_by_conn_id[key] = _create_connection(key, secret_value)
274 else:
275 connection_by_conn_id[key] = _create_connection(key, secret_values)
277 num_conn = len(connection_by_conn_id)
278 log.debug("Loaded %d connections", num_conn)
280 return connection_by_conn_id
283class LocalFilesystemBackend(BaseSecretsBackend, LoggingMixin):
284 """
285 Retrieves Connection objects and Variables from local files.
287 ``JSON``, `YAML` and ``.env`` files are supported.
289 :param variables_file_path: File location with variables data.
290 :param connections_file_path: File location with connection data.
291 """
293 def __init__(self, variables_file_path: str | None = None, connections_file_path: str | None = None):
294 super().__init__()
295 self.variables_file = variables_file_path
296 self.connections_file = connections_file_path
298 @property
299 def _local_variables(self) -> dict[str, str]:
300 if not self.variables_file:
301 self.log.debug("The file for variables is not specified. Skipping")
302 # The user may not specify any file.
303 return {}
304 secrets = load_variables(self.variables_file)
305 return secrets
307 @property
308 def _local_connections(self) -> dict[str, Connection]:
309 if not self.connections_file:
310 self.log.debug("The file for connection is not specified. Skipping")
311 # The user may not specify any file.
312 return {}
313 return load_connections_dict(self.connections_file)
315 def get_connection(self, conn_id: str) -> Connection | None:
316 if conn_id in self._local_connections:
317 return self._local_connections[conn_id]
318 return None
320 def get_connections(self, conn_id: str) -> list[Any]:
321 warnings.warn(
322 "This method is deprecated. Please use "
323 "`airflow.secrets.local_filesystem.LocalFilesystemBackend.get_connection`.",
324 RemovedInAirflow3Warning,
325 stacklevel=2,
326 )
327 conn = self.get_connection(conn_id=conn_id)
328 if conn:
329 return [conn]
330 return []
332 def get_variable(self, key: str) -> str | None:
333 return self._local_variables.get(key)