Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/io/store.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19from functools import cached_property
20from typing import TYPE_CHECKING, ClassVar
22if TYPE_CHECKING:
23 from fsspec import AbstractFileSystem
25 from airflow.sdk.io.typedef import Properties
28class ObjectStore:
29 """
30 Manages a filesystem or object storage.
32 To use this class, call :meth:`.attach` instead.
33 """
35 __version__: ClassVar[int] = 1
37 method: str
38 conn_id: str | None
39 protocol: str
40 storage_options: Properties | None
42 def __init__(
43 self,
44 protocol: str,
45 conn_id: str | None,
46 fs: AbstractFileSystem | None = None,
47 storage_options: Properties | None = None,
48 ):
49 self.conn_id = conn_id
50 self.protocol = protocol
51 if fs is not None:
52 self.fs = fs
53 self.storage_options = storage_options
55 def __str__(self):
56 return f"{self.protocol}-{self.conn_id}" if self.conn_id else self.protocol
58 @cached_property
59 def fs(self) -> AbstractFileSystem:
60 from airflow.sdk.io import get_fs
62 # if the fs is provided in init, the next statement will be ignored
63 return get_fs(self.protocol, self.conn_id)
65 @property
66 def fsid(self) -> str:
67 """
68 Get the filesystem id for this store in order to be able to compare across instances.
70 The underlying `fsid` is returned from the filesystem if available, otherwise it is generated
71 from the protocol and connection ID.
73 :return: deterministic the filesystem ID
74 """
75 try:
76 return self.fs.fsid
77 except NotImplementedError:
78 return f"{self.fs.protocol}-{self.conn_id or 'env'}"
80 def serialize(self):
81 from airflow.sdk._shared.module_loading import qualname
83 return {
84 "protocol": self.protocol,
85 "conn_id": self.conn_id,
86 "filesystem": qualname(self.fs) if self.fs else None,
87 "storage_options": self.storage_options,
88 }
90 @classmethod
91 def deserialize(cls, data: dict[str, str], version: int):
92 from airflow.sdk.io import has_fs
94 if version > cls.__version__:
95 raise ValueError(f"Cannot deserialize version {version} for {cls.__name__}")
97 protocol = data["protocol"]
98 conn_id = data["conn_id"]
100 alias = f"{protocol}-{conn_id}" if conn_id else protocol
102 if store := _STORE_CACHE.get(alias):
103 return store
105 if not has_fs(protocol) and "filesystem" in data and data["filesystem"]:
106 raise ValueError(
107 f"No attached filesystem found for {data['filesystem']} with "
108 f"protocol {data['protocol']}. Please use attach() for this protocol and filesystem."
109 )
111 return attach(protocol=protocol, conn_id=conn_id, storage_options=data["storage_options"])
113 def __eq__(self, other: object) -> bool:
114 if not isinstance(other, ObjectStore):
115 return NotImplemented
116 if self.conn_id != other.conn_id:
117 return False
118 try:
119 return self.fs == other.fs
120 except ValueError:
121 return False
123 def __hash__(self):
124 return hash((self.conn_id, self.fsid))
127_STORE_CACHE: dict[str, ObjectStore] = {}
130def attach(
131 protocol: str | None = None,
132 conn_id: str | None = None,
133 alias: str | None = None,
134 encryption_type: str | None = "",
135 fs: AbstractFileSystem | None = None,
136 **kwargs,
137) -> ObjectStore:
138 """
139 Attach a filesystem or object storage.
141 :param alias: the alias to be used to refer to the store, autogenerated if omitted
142 :param protocol: the scheme that is used without ://
143 :param conn_id: the connection to use to connect to the filesystem
144 :param encryption_type: the encryption type to use to connect to the filesystem
145 :param fs: the filesystem type to use to connect to the filesystem
146 """
147 if alias:
148 if store := _STORE_CACHE.get(alias):
149 return store
150 if not protocol:
151 raise ValueError(f"No registered store with alias: {alias}")
153 if not protocol:
154 raise ValueError("No protocol specified and no alias provided")
156 if not alias:
157 alias = f"{protocol}-{conn_id}" if conn_id else protocol
158 if store := _STORE_CACHE.get(alias):
159 return store
161 _STORE_CACHE[alias] = store = ObjectStore(protocol=protocol, conn_id=conn_id, fs=fs, **kwargs)
163 return store