Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/registry.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

165 statements  

1"""upath.registry -- registry for file system specific implementations 

2 

3Retrieve UPath implementations via `get_upath_class`. 

4Register custom UPath subclasses in one of two ways: 

5 

6### directly from Python 

7 

8>>> from upath import UPath 

9>>> from upath.registry import register_implementation 

10>>> my_protocol = "myproto" 

11>>> class MyPath(UPath): 

12... pass 

13>>> register_implementation(my_protocol, MyPath) 

14 

15### via entry points 

16 

17```toml 

18# pyproject.toml 

19[project.entry-points."universal_pathlib.implementations"] 

20myproto = "my_module.submodule:MyPath" 

21``` 

22 

23```ini 

24# setup.cfg 

25[options.entry_points] 

26universal_pathlib.implementations = 

27 myproto = my_module.submodule:MyPath 

28``` 

29""" 

30 

31from __future__ import annotations 

32 

33import os 

34import re 

35import sys 

36import warnings 

37from collections import ChainMap 

38from collections.abc import Iterator 

39from collections.abc import MutableMapping 

40from functools import lru_cache 

41from importlib import import_module 

42from importlib.metadata import entry_points 

43from typing import TYPE_CHECKING 

44 

45from fsspec.core import get_filesystem_class 

46from fsspec.registry import known_implementations as _fsspec_known_implementations 

47from fsspec.registry import registry as _fsspec_registry 

48 

49import upath 

50 

51if TYPE_CHECKING: 

52 from typing import Literal 

53 from typing import overload 

54 

55 from upath.implementations.cached import SimpleCachePath as _SimpleCachePath 

56 from upath.implementations.cloud import AzurePath as _AzurePath 

57 from upath.implementations.cloud import GCSPath as _GCSPath 

58 from upath.implementations.cloud import HfPath as _HfPath 

59 from upath.implementations.cloud import S3Path as _S3Path 

60 from upath.implementations.data import DataPath as _DataPath 

61 from upath.implementations.ftp import FTPPath as _FTPPath 

62 from upath.implementations.github import GitHubPath as _GitHubPath 

63 from upath.implementations.hdfs import HDFSPath as _HDFSPath 

64 from upath.implementations.http import HTTPPath as _HTTPPath 

65 from upath.implementations.local import FilePath as _FilePath 

66 from upath.implementations.local import PosixUPath as _PosixUPath 

67 from upath.implementations.local import WindowsUPath as _WindowsUPath 

68 from upath.implementations.memory import MemoryPath as _MemoryPath 

69 from upath.implementations.sftp import SFTPPath as _SFTPPath 

70 from upath.implementations.smb import SMBPath as _SMBPath 

71 from upath.implementations.tar import TarPath as _TarPath 

72 from upath.implementations.webdav import WebdavPath as _WebdavPath 

73 from upath.implementations.zip import ZipPath as _ZipPath 

74 

75 

76__all__ = [ 

77 "get_upath_class", 

78 "available_implementations", 

79 "register_implementation", 

80] 

81 

82 

83_ENTRY_POINT_GROUP = "universal_pathlib.implementations" 

84 

85 

86class _Registry(MutableMapping[str, "type[upath.UPath]"]): 

87 """internal registry for UPath subclasses""" 

88 

89 known_implementations: dict[str, str] = { 

90 "abfs": "upath.implementations.cloud.AzurePath", 

91 "abfss": "upath.implementations.cloud.AzurePath", 

92 "adl": "upath.implementations.cloud.AzurePath", 

93 "az": "upath.implementations.cloud.AzurePath", 

94 "data": "upath.implementations.data.DataPath", 

95 "file": "upath.implementations.local.FilePath", 

96 "ftp": "upath.implementations.ftp.FTPPath", 

97 "local": "upath.implementations.local.FilePath", 

98 "gcs": "upath.implementations.cloud.GCSPath", 

99 "gs": "upath.implementations.cloud.GCSPath", 

100 "hdfs": "upath.implementations.hdfs.HDFSPath", 

101 "hf": "upath.implementations.cloud.HfPath", 

102 "http": "upath.implementations.http.HTTPPath", 

103 "https": "upath.implementations.http.HTTPPath", 

104 "memory": "upath.implementations.memory.MemoryPath", 

105 "s3": "upath.implementations.cloud.S3Path", 

106 "s3a": "upath.implementations.cloud.S3Path", 

107 "simplecache": "upath.implementations.cached.SimpleCachePath", 

108 "sftp": "upath.implementations.sftp.SFTPPath", 

109 "ssh": "upath.implementations.sftp.SFTPPath", 

110 "tar": "upath.implementations.tar.TarPath", 

111 "webdav": "upath.implementations.webdav.WebdavPath", 

112 "webdav+http": "upath.implementations.webdav.WebdavPath", 

113 "webdav+https": "upath.implementations.webdav.WebdavPath", 

114 "github": "upath.implementations.github.GitHubPath", 

115 "smb": "upath.implementations.smb.SMBPath", 

116 "zip": "upath.implementations.zip.ZipPath", 

117 } 

118 

119 if TYPE_CHECKING: 

120 _m: MutableMapping[str, str | type[upath.UPath]] 

121 

122 def __init__(self) -> None: 

123 if sys.version_info >= (3, 10): 

124 eps = entry_points(group=_ENTRY_POINT_GROUP) 

125 else: 

126 eps = entry_points().get(_ENTRY_POINT_GROUP, []) 

127 self._entries = {ep.name: ep for ep in eps} 

128 self._m = ChainMap({}, self.known_implementations) # type: ignore 

129 

130 def __contains__(self, item: object) -> bool: 

131 return item in set().union(self._m, self._entries) 

132 

133 def __getitem__(self, item: str) -> type[upath.UPath]: 

134 fqn: str | type[upath.UPath] | None = self._m.get(item) 

135 if fqn is None: 

136 if item in self._entries: 

137 fqn = self._m[item] = self._entries[item].load() 

138 if fqn is None: 

139 raise KeyError(f"{item} not in registry") 

140 if isinstance(fqn, str): 

141 module_name, name = fqn.rsplit(".", 1) 

142 mod = import_module(module_name) 

143 cls = getattr(mod, name) # type: ignore 

144 else: 

145 cls = fqn 

146 return cls 

147 

148 def __setitem__(self, item: str, value: type[upath.UPath] | str) -> None: 

149 if not ( 

150 (isinstance(value, type) and issubclass(value, upath.UPath)) 

151 or isinstance(value, str) 

152 ): 

153 raise ValueError( 

154 f"expected UPath subclass or FQN-string, got: {type(value).__name__!r}" 

155 ) 

156 if not item or item in self._m: 

157 get_upath_class.cache_clear() # type: ignore[attr-defined] 

158 _get_implementation_protocols.cache_clear() # type: ignore[attr-defined] 

159 self._m[item] = value 

160 

161 def __delitem__(self, __v: str) -> None: 

162 raise NotImplementedError("removal is unsupported") 

163 

164 def __len__(self) -> int: 

165 return len(set().union(self._m, self._entries)) 

166 

167 def __iter__(self) -> Iterator[str]: 

168 return iter(set().union(self._m, self._entries)) 

169 

170 

171_registry = _Registry() 

172 

173 

174def available_implementations(*, fallback: bool = False) -> list[str]: 

175 """return a list of protocols for available implementations 

176 

177 Parameters 

178 ---------- 

179 fallback: 

180 If True, also return protocols for fsspec filesystems without 

181 an implementation in universal_pathlib. 

182 """ 

183 if not fallback: 

184 return list(_registry) 

185 else: 

186 return list({*_registry, *_fsspec_registry, *_fsspec_known_implementations}) 

187 

188 

189def register_implementation( 

190 protocol: str, 

191 cls: type[upath.UPath] | str, 

192 *, 

193 clobber: bool = False, 

194) -> None: 

195 """register a UPath implementation with a protocol 

196 

197 Parameters 

198 ---------- 

199 protocol: 

200 Protocol name to associate with the class 

201 cls: 

202 The UPath subclass for the protocol or a str representing the 

203 full path to an implementation class like package.module.class. 

204 clobber: 

205 Whether to overwrite a protocol with the same name; if False, 

206 will raise instead. 

207 """ 

208 if not re.match(r"^[a-z][a-z0-9+_.]+$", protocol): 

209 raise ValueError(f"{protocol!r} is not a valid URI scheme") 

210 if not clobber and protocol in _registry: 

211 raise ValueError(f"{protocol!r} is already in registry and clobber is False!") 

212 _registry[protocol] = cls 

213 

214 

215@lru_cache # type: ignore[misc] 

216def _get_implementation_protocols(cls: type[upath.UPath]) -> list[str]: 

217 """return protocols registered for a given UPath class without triggering imports""" 

218 if not issubclass(cls, upath.UPath): 

219 raise ValueError(f"{cls!r} is not a UPath subclass") 

220 if cls.__module__ == "upath.implementations._experimental": 

221 # experimental fallback implementations have no registry entry 

222 return [cls.__name__[1:-4].lower()] 

223 loaded = ( 

224 p 

225 for p, c in _registry._m.maps[0].items() # type: ignore[attr-defined] 

226 if c is cls 

227 ) 

228 known = ( 

229 p 

230 for p, fqn in _registry.known_implementations.items() 

231 if fqn == f"{cls.__module__}.{cls.__name__}" 

232 ) 

233 eps = ( 

234 p 

235 for p, ep in _registry._entries.items() 

236 if ep.module == cls.__module__ and ep.attr == cls.__name__ 

237 ) 

238 return list(dict.fromkeys((*loaded, *known, *eps))) 

239 

240 

241# --- get_upath_class type overloads ------------------------------------------ 

242 

243if TYPE_CHECKING: # noqa: C901 

244 

245 @overload 

246 def get_upath_class(protocol: Literal["simplecache"]) -> type[_SimpleCachePath]: ... 

247 @overload 

248 def get_upath_class(protocol: Literal["s3", "s3a"]) -> type[_S3Path]: ... 

249 @overload 

250 def get_upath_class(protocol: Literal["gcs", "gs"]) -> type[_GCSPath]: ... 

251 @overload # noqa: E301 

252 def get_upath_class( 

253 protocol: Literal["abfs", "abfss", "adl", "az"], 

254 ) -> type[_AzurePath]: ... 

255 @overload 

256 def get_upath_class(protocol: Literal["data"]) -> type[_DataPath]: ... 

257 @overload 

258 def get_upath_class(protocol: Literal["ftp"]) -> type[_FTPPath]: ... 

259 @overload 

260 def get_upath_class(protocol: Literal["github"]) -> type[_GitHubPath]: ... 

261 @overload 

262 def get_upath_class(protocol: Literal["hdfs"]) -> type[_HDFSPath]: ... 

263 @overload 

264 def get_upath_class(protocol: Literal["hf"]) -> type[_HfPath]: ... 

265 @overload 

266 def get_upath_class(protocol: Literal["http", "https"]) -> type[_HTTPPath]: ... 

267 @overload 

268 def get_upath_class(protocol: Literal["file", "local"]) -> type[_FilePath]: ... 

269 @overload 

270 def get_upath_class(protocol: Literal["memory"]) -> type[_MemoryPath]: ... 

271 @overload 

272 def get_upath_class(protocol: Literal["sftp", "ssh"]) -> type[_SFTPPath]: ... 

273 @overload 

274 def get_upath_class(protocol: Literal["smb"]) -> type[_SMBPath]: ... 

275 @overload 

276 def get_upath_class(protocol: Literal["tar"]) -> type[_TarPath]: ... 

277 @overload 

278 def get_upath_class(protocol: Literal["webdav"]) -> type[_WebdavPath]: ... 

279 @overload 

280 def get_upath_class(protocol: Literal["zip"]) -> type[_ZipPath]: ... 

281 

282 if sys.platform == "win32": 

283 

284 @overload 

285 def get_upath_class(protocol: Literal[""]) -> type[_WindowsUPath]: ... 

286 

287 else: 

288 

289 @overload 

290 def get_upath_class(protocol: Literal[""]) -> type[_PosixUPath]: ... # type: ignore[overload-overlap] # noqa: E501 

291 

292 @overload 

293 def get_upath_class( 

294 protocol: str, *, fallback: bool = ... 

295 ) -> type[upath.UPath] | None: ... 

296 

297 

298@lru_cache # type: ignore[misc] # see: https://github.com/python/typeshed/issues/11280 

299def get_upath_class( 

300 protocol: str, 

301 *, 

302 fallback: bool = True, 

303) -> type[upath.UPath] | None: 

304 """Return the upath cls for the given protocol. 

305 

306 Returns `None` if no matching protocol can be found. 

307 

308 Parameters 

309 ---------- 

310 protocol: 

311 The protocol string 

312 fallback: 

313 If fallback is False, don't return UPath instances for fsspec 

314 filesystems that don't have an implementation registered. 

315 """ 

316 try: 

317 return _registry[protocol] 

318 except KeyError: 

319 if not protocol: 

320 if os.name == "nt": 

321 from upath.implementations.local import WindowsUPath 

322 

323 return WindowsUPath # type: ignore[return-value] 

324 else: 

325 from upath.implementations.local import PosixUPath 

326 

327 return PosixUPath # type: ignore[return-value] 

328 if not fallback: 

329 return None 

330 try: 

331 get_filesystem_class(protocol) 

332 except ValueError: 

333 return None # this is an unknown protocol 

334 else: 

335 warnings.warn( 

336 f"UPath {protocol!r} filesystem not explicitly implemented." 

337 " Falling back to default implementation." 

338 " This filesystem may not be tested.", 

339 UserWarning, 

340 stacklevel=2, 

341 ) 

342 import upath.implementations._experimental as upath_experimental 

343 

344 cls_name = f"_{protocol.title()}Path" 

345 cls = type( 

346 cls_name, 

347 (upath.UPath,), 

348 {"__module__": "upath.implementations._experimental"}, 

349 ) 

350 setattr(upath_experimental, cls_name, cls) 

351 return cls