Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/io/fs.py: 37%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import inspect
20import logging
21from collections.abc import Callable, Mapping
22from functools import cache
23from typing import TYPE_CHECKING
25from fsspec.implementations.local import LocalFileSystem
27from airflow.providers_manager import ProvidersManager
28from airflow.sdk._shared.module_loading import import_string
29from airflow.sdk.observability.stats import Stats
31if TYPE_CHECKING:
32 from fsspec import AbstractFileSystem
34 from airflow.sdk.io.typedef import Properties
37log = logging.getLogger(__name__)
40def _file(_: str | None, storage_options: Properties) -> LocalFileSystem:
41 return LocalFileSystem(**storage_options)
44# builtin supported filesystems
45_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None, Properties], AbstractFileSystem]] = {
46 "file": _file,
47 "local": _file,
48}
51@cache
52def _register_filesystems() -> Mapping[
53 str,
54 Callable[[str | None, Properties], AbstractFileSystem] | Callable[[str | None], AbstractFileSystem],
55]:
56 scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy()
57 with Stats.timer("airflow.io.load_filesystems") as timer:
58 manager = ProvidersManager()
59 for fs_module_name in manager.filesystem_module_names:
60 fs_module = import_string(fs_module_name)
61 for scheme in getattr(fs_module, "schemes", []):
62 if scheme in scheme_to_fs:
63 log.warning("Overriding scheme %s for %s", scheme, fs_module_name)
65 method = getattr(fs_module, "get_fs", None)
66 if method is None:
67 raise ImportError(f"Filesystem {fs_module_name} does not have a get_fs method")
68 scheme_to_fs[scheme] = method
70 log.debug("loading filesystems from providers took %.3f seconds", timer.duration)
71 return scheme_to_fs
74def get_fs(
75 scheme: str, conn_id: str | None = None, storage_options: Properties | None = None
76) -> AbstractFileSystem:
77 """
78 Get a filesystem by scheme.
80 :param scheme: the scheme to get the filesystem for
81 :return: the filesystem method
82 :param conn_id: the airflow connection id to use
83 :param storage_options: the storage options to pass to the filesystem
84 """
85 filesystems = _register_filesystems()
86 try:
87 fs = filesystems[scheme]
88 except KeyError:
89 raise ValueError(f"No filesystem registered for scheme {scheme}") from None
91 options = storage_options or {}
93 # MyPy does not recognize dynamic parameters inspection when we call the method, and we have to do
94 # it for compatibility reasons with already released providers, that's why we need to ignore
95 # mypy errors here
96 parameters = inspect.signature(fs).parameters
97 if len(parameters) == 1:
98 if options:
99 raise AttributeError(
100 f"Filesystem {scheme} does not support storage options, but options were passed."
101 f"This most likely means that you are using an old version of the provider that does not "
102 f"support storage options. Please upgrade the provider if possible."
103 )
104 return fs(conn_id) # type: ignore[call-arg]
105 return fs(conn_id, options) # type: ignore[call-arg]
108def has_fs(scheme: str) -> bool:
109 """
110 Check if a filesystem is available for a scheme.
112 :param scheme: the scheme to check
113 :return: True if a filesystem is available for the scheme
114 """
115 return scheme in _register_filesystems()