Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/io/store.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

73 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from functools import cached_property 

20from typing import TYPE_CHECKING, ClassVar 

21 

22if TYPE_CHECKING: 

23 from fsspec import AbstractFileSystem 

24 

25 from airflow.sdk.io.typedef import Properties 

26 

27 

28class ObjectStore: 

29 """ 

30 Manages a filesystem or object storage. 

31 

32 To use this class, call :meth:`.attach` instead. 

33 """ 

34 

35 __version__: ClassVar[int] = 1 

36 

37 method: str 

38 conn_id: str | None 

39 protocol: str 

40 storage_options: Properties | None 

41 

42 def __init__( 

43 self, 

44 protocol: str, 

45 conn_id: str | None, 

46 fs: AbstractFileSystem | None = None, 

47 storage_options: Properties | None = None, 

48 ): 

49 self.conn_id = conn_id 

50 self.protocol = protocol 

51 if fs is not None: 

52 self.fs = fs 

53 self.storage_options = storage_options 

54 

55 def __str__(self): 

56 return f"{self.protocol}-{self.conn_id}" if self.conn_id else self.protocol 

57 

58 @cached_property 

59 def fs(self) -> AbstractFileSystem: 

60 from airflow.sdk.io import get_fs 

61 

62 # if the fs is provided in init, the next statement will be ignored 

63 return get_fs(self.protocol, self.conn_id) 

64 

65 @property 

66 def fsid(self) -> str: 

67 """ 

68 Get the filesystem id for this store in order to be able to compare across instances. 

69 

70 The underlying `fsid` is returned from the filesystem if available, otherwise it is generated 

71 from the protocol and connection ID. 

72 

73 :return: deterministic the filesystem ID 

74 """ 

75 try: 

76 return self.fs.fsid 

77 except NotImplementedError: 

78 return f"{self.fs.protocol}-{self.conn_id or 'env'}" 

79 

80 def serialize(self): 

81 from airflow.sdk._shared.module_loading import qualname 

82 

83 return { 

84 "protocol": self.protocol, 

85 "conn_id": self.conn_id, 

86 "filesystem": qualname(self.fs) if self.fs else None, 

87 "storage_options": self.storage_options, 

88 } 

89 

90 @classmethod 

91 def deserialize(cls, data: dict[str, str], version: int): 

92 from airflow.sdk.io import has_fs 

93 

94 if version > cls.__version__: 

95 raise ValueError(f"Cannot deserialize version {version} for {cls.__name__}") 

96 

97 protocol = data["protocol"] 

98 conn_id = data["conn_id"] 

99 

100 alias = f"{protocol}-{conn_id}" if conn_id else protocol 

101 

102 if store := _STORE_CACHE.get(alias): 

103 return store 

104 

105 if not has_fs(protocol) and "filesystem" in data and data["filesystem"]: 

106 raise ValueError( 

107 f"No attached filesystem found for {data['filesystem']} with " 

108 f"protocol {data['protocol']}. Please use attach() for this protocol and filesystem." 

109 ) 

110 

111 return attach(protocol=protocol, conn_id=conn_id, storage_options=data["storage_options"]) 

112 

113 def __eq__(self, other: object) -> bool: 

114 if not isinstance(other, ObjectStore): 

115 return NotImplemented 

116 if self.conn_id != other.conn_id: 

117 return False 

118 try: 

119 return self.fs == other.fs 

120 except ValueError: 

121 return False 

122 

123 def __hash__(self): 

124 return hash((self.conn_id, self.fsid)) 

125 

126 

127_STORE_CACHE: dict[str, ObjectStore] = {} 

128 

129 

130def attach( 

131 protocol: str | None = None, 

132 conn_id: str | None = None, 

133 alias: str | None = None, 

134 encryption_type: str | None = "", 

135 fs: AbstractFileSystem | None = None, 

136 **kwargs, 

137) -> ObjectStore: 

138 """ 

139 Attach a filesystem or object storage. 

140 

141 :param alias: the alias to be used to refer to the store, autogenerated if omitted 

142 :param protocol: the scheme that is used without :// 

143 :param conn_id: the connection to use to connect to the filesystem 

144 :param encryption_type: the encryption type to use to connect to the filesystem 

145 :param fs: the filesystem type to use to connect to the filesystem 

146 """ 

147 if alias: 

148 if store := _STORE_CACHE.get(alias): 

149 return store 

150 if not protocol: 

151 raise ValueError(f"No registered store with alias: {alias}") 

152 

153 if not protocol: 

154 raise ValueError("No protocol specified and no alias provided") 

155 

156 if not alias: 

157 alias = f"{protocol}-{conn_id}" if conn_id else protocol 

158 if store := _STORE_CACHE.get(alias): 

159 return store 

160 

161 _STORE_CACHE[alias] = store = ObjectStore(protocol=protocol, conn_id=conn_id, fs=fs, **kwargs) 

162 

163 return store