Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/io/fs.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

49 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import inspect 

20import logging 

21from collections.abc import Callable, Mapping 

22from functools import cache 

23from typing import TYPE_CHECKING 

24 

25from fsspec.implementations.local import LocalFileSystem 

26 

27from airflow.providers_manager import ProvidersManager 

28from airflow.sdk._shared.module_loading import import_string 

29from airflow.sdk.observability.stats import Stats 

30 

31if TYPE_CHECKING: 

32 from fsspec import AbstractFileSystem 

33 

34 from airflow.sdk.io.typedef import Properties 

35 

36 

37log = logging.getLogger(__name__) 

38 

39 

40def _file(_: str | None, storage_options: Properties) -> LocalFileSystem: 

41 return LocalFileSystem(**storage_options) 

42 

43 

44# builtin supported filesystems 

45_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None, Properties], AbstractFileSystem]] = { 

46 "file": _file, 

47 "local": _file, 

48} 

49 

50 

51@cache 

52def _register_filesystems() -> Mapping[ 

53 str, 

54 Callable[[str | None, Properties], AbstractFileSystem] | Callable[[str | None], AbstractFileSystem], 

55]: 

56 scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy() 

57 with Stats.timer("airflow.io.load_filesystems") as timer: 

58 manager = ProvidersManager() 

59 for fs_module_name in manager.filesystem_module_names: 

60 fs_module = import_string(fs_module_name) 

61 for scheme in getattr(fs_module, "schemes", []): 

62 if scheme in scheme_to_fs: 

63 log.warning("Overriding scheme %s for %s", scheme, fs_module_name) 

64 

65 method = getattr(fs_module, "get_fs", None) 

66 if method is None: 

67 raise ImportError(f"Filesystem {fs_module_name} does not have a get_fs method") 

68 scheme_to_fs[scheme] = method 

69 

70 log.debug("loading filesystems from providers took %.3f seconds", timer.duration) 

71 return scheme_to_fs 

72 

73 

74def get_fs( 

75 scheme: str, conn_id: str | None = None, storage_options: Properties | None = None 

76) -> AbstractFileSystem: 

77 """ 

78 Get a filesystem by scheme. 

79 

80 :param scheme: the scheme to get the filesystem for 

81 :return: the filesystem method 

82 :param conn_id: the airflow connection id to use 

83 :param storage_options: the storage options to pass to the filesystem 

84 """ 

85 filesystems = _register_filesystems() 

86 try: 

87 fs = filesystems[scheme] 

88 except KeyError: 

89 raise ValueError(f"No filesystem registered for scheme {scheme}") from None 

90 

91 options = storage_options or {} 

92 

93 # MyPy does not recognize dynamic parameters inspection when we call the method, and we have to do 

94 # it for compatibility reasons with already released providers, that's why we need to ignore 

95 # mypy errors here 

96 parameters = inspect.signature(fs).parameters 

97 if len(parameters) == 1: 

98 if options: 

99 raise AttributeError( 

100 f"Filesystem {scheme} does not support storage options, but options were passed." 

101 f"This most likely means that you are using an old version of the provider that does not " 

102 f"support storage options. Please upgrade the provider if possible." 

103 ) 

104 return fs(conn_id) # type: ignore[call-arg] 

105 return fs(conn_id, options) # type: ignore[call-arg] 

106 

107 

108def has_fs(scheme: str) -> bool: 

109 """ 

110 Check if a filesystem is available for a scheme. 

111 

112 :param scheme: the scheme to check 

113 :return: True if a filesystem is available for the scheme 

114 """ 

115 return scheme in _register_filesystems()