Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/registry.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""upath.registry -- registry for file system specific implementations
3Retrieve UPath implementations via `get_upath_class`.
4Register custom UPath subclasses in one of two ways:
6### directly from Python
8>>> from upath import UPath
9>>> from upath.registry import register_implementation
10>>> my_protocol = "myproto"
11>>> class MyPath(UPath):
12... pass
13>>> register_implementation(my_protocol, MyPath)
15### via entry points
17```toml
18# pyproject.toml
19[project.entry-points."universal_pathlib.implementations"]
20myproto = "my_module.submodule:MyPath"
21```
23```ini
24# setup.cfg
25[options.entry_points]
26universal_pathlib.implementations =
27 myproto = my_module.submodule:MyPath
28```
29"""
31from __future__ import annotations
33import os
34import re
35import sys
36import warnings
37from collections import ChainMap
38from collections.abc import Iterator
39from collections.abc import MutableMapping
40from functools import lru_cache
41from importlib import import_module
42from importlib.metadata import entry_points
43from typing import TYPE_CHECKING
45from fsspec.core import get_filesystem_class
46from fsspec.registry import known_implementations as _fsspec_known_implementations
47from fsspec.registry import registry as _fsspec_registry
49import upath
51if TYPE_CHECKING:
52 from typing import Literal
53 from typing import overload
55 from upath.implementations.cached import SimpleCachePath as _SimpleCachePath
56 from upath.implementations.cloud import AzurePath as _AzurePath
57 from upath.implementations.cloud import GCSPath as _GCSPath
58 from upath.implementations.cloud import HfPath as _HfPath
59 from upath.implementations.cloud import S3Path as _S3Path
60 from upath.implementations.data import DataPath as _DataPath
61 from upath.implementations.ftp import FTPPath as _FTPPath
62 from upath.implementations.github import GitHubPath as _GitHubPath
63 from upath.implementations.hdfs import HDFSPath as _HDFSPath
64 from upath.implementations.http import HTTPPath as _HTTPPath
65 from upath.implementations.local import FilePath as _FilePath
66 from upath.implementations.local import PosixUPath as _PosixUPath
67 from upath.implementations.local import WindowsUPath as _WindowsUPath
68 from upath.implementations.memory import MemoryPath as _MemoryPath
69 from upath.implementations.sftp import SFTPPath as _SFTPPath
70 from upath.implementations.smb import SMBPath as _SMBPath
71 from upath.implementations.tar import TarPath as _TarPath
72 from upath.implementations.webdav import WebdavPath as _WebdavPath
73 from upath.implementations.zip import ZipPath as _ZipPath
76__all__ = [
77 "get_upath_class",
78 "available_implementations",
79 "register_implementation",
80]
83_ENTRY_POINT_GROUP = "universal_pathlib.implementations"
86class _Registry(MutableMapping[str, "type[upath.UPath]"]):
87 """internal registry for UPath subclasses"""
89 known_implementations: dict[str, str] = {
90 "abfs": "upath.implementations.cloud.AzurePath",
91 "abfss": "upath.implementations.cloud.AzurePath",
92 "adl": "upath.implementations.cloud.AzurePath",
93 "az": "upath.implementations.cloud.AzurePath",
94 "data": "upath.implementations.data.DataPath",
95 "file": "upath.implementations.local.FilePath",
96 "ftp": "upath.implementations.ftp.FTPPath",
97 "local": "upath.implementations.local.FilePath",
98 "gcs": "upath.implementations.cloud.GCSPath",
99 "gs": "upath.implementations.cloud.GCSPath",
100 "hdfs": "upath.implementations.hdfs.HDFSPath",
101 "hf": "upath.implementations.cloud.HfPath",
102 "http": "upath.implementations.http.HTTPPath",
103 "https": "upath.implementations.http.HTTPPath",
104 "memory": "upath.implementations.memory.MemoryPath",
105 "s3": "upath.implementations.cloud.S3Path",
106 "s3a": "upath.implementations.cloud.S3Path",
107 "simplecache": "upath.implementations.cached.SimpleCachePath",
108 "sftp": "upath.implementations.sftp.SFTPPath",
109 "ssh": "upath.implementations.sftp.SFTPPath",
110 "tar": "upath.implementations.tar.TarPath",
111 "webdav": "upath.implementations.webdav.WebdavPath",
112 "webdav+http": "upath.implementations.webdav.WebdavPath",
113 "webdav+https": "upath.implementations.webdav.WebdavPath",
114 "github": "upath.implementations.github.GitHubPath",
115 "smb": "upath.implementations.smb.SMBPath",
116 "zip": "upath.implementations.zip.ZipPath",
117 }
119 if TYPE_CHECKING:
120 _m: MutableMapping[str, str | type[upath.UPath]]
122 def __init__(self) -> None:
123 if sys.version_info >= (3, 10):
124 eps = entry_points(group=_ENTRY_POINT_GROUP)
125 else:
126 eps = entry_points().get(_ENTRY_POINT_GROUP, [])
127 self._entries = {ep.name: ep for ep in eps}
128 self._m = ChainMap({}, self.known_implementations) # type: ignore
130 def __contains__(self, item: object) -> bool:
131 return item in set().union(self._m, self._entries)
133 def __getitem__(self, item: str) -> type[upath.UPath]:
134 fqn: str | type[upath.UPath] | None = self._m.get(item)
135 if fqn is None:
136 if item in self._entries:
137 fqn = self._m[item] = self._entries[item].load()
138 if fqn is None:
139 raise KeyError(f"{item} not in registry")
140 if isinstance(fqn, str):
141 module_name, name = fqn.rsplit(".", 1)
142 mod = import_module(module_name)
143 cls = getattr(mod, name) # type: ignore
144 else:
145 cls = fqn
146 return cls
148 def __setitem__(self, item: str, value: type[upath.UPath] | str) -> None:
149 if not (
150 (isinstance(value, type) and issubclass(value, upath.UPath))
151 or isinstance(value, str)
152 ):
153 raise ValueError(
154 f"expected UPath subclass or FQN-string, got: {type(value).__name__!r}"
155 )
156 if not item or item in self._m:
157 get_upath_class.cache_clear() # type: ignore[attr-defined]
158 _get_implementation_protocols.cache_clear() # type: ignore[attr-defined]
159 self._m[item] = value
161 def __delitem__(self, __v: str) -> None:
162 raise NotImplementedError("removal is unsupported")
164 def __len__(self) -> int:
165 return len(set().union(self._m, self._entries))
167 def __iter__(self) -> Iterator[str]:
168 return iter(set().union(self._m, self._entries))
171_registry = _Registry()
174def available_implementations(*, fallback: bool = False) -> list[str]:
175 """return a list of protocols for available implementations
177 Parameters
178 ----------
179 fallback:
180 If True, also return protocols for fsspec filesystems without
181 an implementation in universal_pathlib.
182 """
183 if not fallback:
184 return list(_registry)
185 else:
186 return list({*_registry, *_fsspec_registry, *_fsspec_known_implementations})
189def register_implementation(
190 protocol: str,
191 cls: type[upath.UPath] | str,
192 *,
193 clobber: bool = False,
194) -> None:
195 """register a UPath implementation with a protocol
197 Parameters
198 ----------
199 protocol:
200 Protocol name to associate with the class
201 cls:
202 The UPath subclass for the protocol or a str representing the
203 full path to an implementation class like package.module.class.
204 clobber:
205 Whether to overwrite a protocol with the same name; if False,
206 will raise instead.
207 """
208 if not re.match(r"^[a-z][a-z0-9+_.]+$", protocol):
209 raise ValueError(f"{protocol!r} is not a valid URI scheme")
210 if not clobber and protocol in _registry:
211 raise ValueError(f"{protocol!r} is already in registry and clobber is False!")
212 _registry[protocol] = cls
215@lru_cache # type: ignore[misc]
216def _get_implementation_protocols(cls: type[upath.UPath]) -> list[str]:
217 """return protocols registered for a given UPath class without triggering imports"""
218 if not issubclass(cls, upath.UPath):
219 raise ValueError(f"{cls!r} is not a UPath subclass")
220 if cls.__module__ == "upath.implementations._experimental":
221 # experimental fallback implementations have no registry entry
222 return [cls.__name__[1:-4].lower()]
223 loaded = (
224 p
225 for p, c in _registry._m.maps[0].items() # type: ignore[attr-defined]
226 if c is cls
227 )
228 known = (
229 p
230 for p, fqn in _registry.known_implementations.items()
231 if fqn == f"{cls.__module__}.{cls.__name__}"
232 )
233 eps = (
234 p
235 for p, ep in _registry._entries.items()
236 if ep.module == cls.__module__ and ep.attr == cls.__name__
237 )
238 return list(dict.fromkeys((*loaded, *known, *eps)))
241# --- get_upath_class type overloads ------------------------------------------
243if TYPE_CHECKING: # noqa: C901
245 @overload
246 def get_upath_class(protocol: Literal["simplecache"]) -> type[_SimpleCachePath]: ...
247 @overload
248 def get_upath_class(protocol: Literal["s3", "s3a"]) -> type[_S3Path]: ...
249 @overload
250 def get_upath_class(protocol: Literal["gcs", "gs"]) -> type[_GCSPath]: ...
251 @overload # noqa: E301
252 def get_upath_class(
253 protocol: Literal["abfs", "abfss", "adl", "az"],
254 ) -> type[_AzurePath]: ...
255 @overload
256 def get_upath_class(protocol: Literal["data"]) -> type[_DataPath]: ...
257 @overload
258 def get_upath_class(protocol: Literal["ftp"]) -> type[_FTPPath]: ...
259 @overload
260 def get_upath_class(protocol: Literal["github"]) -> type[_GitHubPath]: ...
261 @overload
262 def get_upath_class(protocol: Literal["hdfs"]) -> type[_HDFSPath]: ...
263 @overload
264 def get_upath_class(protocol: Literal["hf"]) -> type[_HfPath]: ...
265 @overload
266 def get_upath_class(protocol: Literal["http", "https"]) -> type[_HTTPPath]: ...
267 @overload
268 def get_upath_class(protocol: Literal["file", "local"]) -> type[_FilePath]: ...
269 @overload
270 def get_upath_class(protocol: Literal["memory"]) -> type[_MemoryPath]: ...
271 @overload
272 def get_upath_class(protocol: Literal["sftp", "ssh"]) -> type[_SFTPPath]: ...
273 @overload
274 def get_upath_class(protocol: Literal["smb"]) -> type[_SMBPath]: ...
275 @overload
276 def get_upath_class(protocol: Literal["tar"]) -> type[_TarPath]: ...
277 @overload
278 def get_upath_class(protocol: Literal["webdav"]) -> type[_WebdavPath]: ...
279 @overload
280 def get_upath_class(protocol: Literal["zip"]) -> type[_ZipPath]: ...
282 if sys.platform == "win32":
284 @overload
285 def get_upath_class(protocol: Literal[""]) -> type[_WindowsUPath]: ...
287 else:
289 @overload
290 def get_upath_class(protocol: Literal[""]) -> type[_PosixUPath]: ... # type: ignore[overload-overlap] # noqa: E501
292 @overload
293 def get_upath_class(
294 protocol: str, *, fallback: bool = ...
295 ) -> type[upath.UPath] | None: ...
298@lru_cache # type: ignore[misc] # see: https://github.com/python/typeshed/issues/11280
299def get_upath_class(
300 protocol: str,
301 *,
302 fallback: bool = True,
303) -> type[upath.UPath] | None:
304 """Return the upath cls for the given protocol.
306 Returns `None` if no matching protocol can be found.
308 Parameters
309 ----------
310 protocol:
311 The protocol string
312 fallback:
313 If fallback is False, don't return UPath instances for fsspec
314 filesystems that don't have an implementation registered.
315 """
316 try:
317 return _registry[protocol]
318 except KeyError:
319 if not protocol:
320 if os.name == "nt":
321 from upath.implementations.local import WindowsUPath
323 return WindowsUPath # type: ignore[return-value]
324 else:
325 from upath.implementations.local import PosixUPath
327 return PosixUPath # type: ignore[return-value]
328 if not fallback:
329 return None
330 try:
331 get_filesystem_class(protocol)
332 except ValueError:
333 return None # this is an unknown protocol
334 else:
335 warnings.warn(
336 f"UPath {protocol!r} filesystem not explicitly implemented."
337 " Falling back to default implementation."
338 " This filesystem may not be tested.",
339 UserWarning,
340 stacklevel=2,
341 )
342 import upath.implementations._experimental as upath_experimental
344 cls_name = f"_{protocol.title()}Path"
345 cls = type(
346 cls_name,
347 (upath.UPath,),
348 {"__module__": "upath.implementations._experimental"},
349 )
350 setattr(upath_experimental, cls_name, cls)
351 return cls