Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/registry.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

86 statements  

1"""upath.registry -- registry for file system specific implementations 

2 

3Retrieve UPath implementations via `get_upath_class`. 

4Register custom UPath subclasses in one of two ways: 

5 

6### directly from Python 

7 

8>>> from upath import UPath 

9>>> from upath.registry import register_implementation 

10>>> my_protocol = "myproto" 

11>>> class MyPath(UPath): 

12... pass 

13>>> register_implementation(my_protocol, MyPath) 

14 

15### via entry points 

16 

17```toml 

18# pyproject.toml 

19[project.entry-points."universal_pathlib.implementations"] 

20myproto = "my_module.submodule:MyPath" 

21``` 

22 

23```ini 

24# setup.cfg 

25[options.entry_points] 

26universal_pathlib.implementations = 

27 myproto = my_module.submodule:MyPath 

28``` 

29""" 

30 

31from __future__ import annotations 

32 

33import os 

34import re 

35import sys 

36import warnings 

37from collections import ChainMap 

38from functools import lru_cache 

39from importlib import import_module 

40from importlib.metadata import entry_points 

41from typing import TYPE_CHECKING 

42from typing import Iterator 

43from typing import MutableMapping 

44 

45from fsspec.core import get_filesystem_class 

46from fsspec.registry import known_implementations as _fsspec_known_implementations 

47 

48import upath 

49 

50__all__ = [ 

51 "get_upath_class", 

52 "available_implementations", 

53 "register_implementation", 

54] 

55 

56 

57_ENTRY_POINT_GROUP = "universal_pathlib.implementations" 

58 

59 

60class _Registry(MutableMapping[str, "type[upath.UPath]"]): 

61 """internal registry for UPath subclasses""" 

62 

63 known_implementations: dict[str, str] = { 

64 "abfs": "upath.implementations.cloud.AzurePath", 

65 "abfss": "upath.implementations.cloud.AzurePath", 

66 "adl": "upath.implementations.cloud.AzurePath", 

67 "az": "upath.implementations.cloud.AzurePath", 

68 "data": "upath.implementations.data.DataPath", 

69 "file": "upath.implementations.local.FilePath", 

70 "local": "upath.implementations.local.FilePath", 

71 "gcs": "upath.implementations.cloud.GCSPath", 

72 "gs": "upath.implementations.cloud.GCSPath", 

73 "hdfs": "upath.implementations.hdfs.HDFSPath", 

74 "http": "upath.implementations.http.HTTPPath", 

75 "https": "upath.implementations.http.HTTPPath", 

76 "memory": "upath.implementations.memory.MemoryPath", 

77 "s3": "upath.implementations.cloud.S3Path", 

78 "s3a": "upath.implementations.cloud.S3Path", 

79 "sftp": "upath.implementations.sftp.SFTPPath", 

80 "ssh": "upath.implementations.sftp.SFTPPath", 

81 "webdav": "upath.implementations.webdav.WebdavPath", 

82 "webdav+http": "upath.implementations.webdav.WebdavPath", 

83 "webdav+https": "upath.implementations.webdav.WebdavPath", 

84 "github": "upath.implementations.github.GitHubPath", 

85 "smb": "upath.implementations.smb.SMBPath", 

86 } 

87 

88 if TYPE_CHECKING: 

89 _m: MutableMapping[str, str | type[upath.UPath]] 

90 

91 def __init__(self) -> None: 

92 if sys.version_info >= (3, 10): 

93 eps = entry_points(group=_ENTRY_POINT_GROUP) 

94 else: 

95 eps = entry_points().get(_ENTRY_POINT_GROUP, []) 

96 self._entries = {ep.name: ep for ep in eps} 

97 self._m = ChainMap({}, self.known_implementations) # type: ignore 

98 

99 def __contains__(self, item: object) -> bool: 

100 return item in set().union(self._m, self._entries) 

101 

102 def __getitem__(self, item: str) -> type[upath.UPath]: 

103 fqn: str | type[upath.UPath] | None = self._m.get(item) 

104 if fqn is None: 

105 if item in self._entries: 

106 fqn = self._m[item] = self._entries[item].load() 

107 if fqn is None: 

108 raise KeyError(f"{item} not in registry") 

109 if isinstance(fqn, str): 

110 module_name, name = fqn.rsplit(".", 1) 

111 mod = import_module(module_name) 

112 cls = getattr(mod, name) # type: ignore 

113 else: 

114 cls = fqn 

115 return cls 

116 

117 def __setitem__(self, item: str, value: type[upath.UPath] | str) -> None: 

118 if not ( 

119 (isinstance(value, type) and issubclass(value, upath.UPath)) 

120 or isinstance(value, str) 

121 ): 

122 raise ValueError( 

123 f"expected UPath subclass or FQN-string, got: {type(value).__name__!r}" 

124 ) 

125 if not item or item in self._m: 

126 get_upath_class.cache_clear() 

127 self._m[item] = value 

128 

129 def __delitem__(self, __v: str) -> None: 

130 raise NotImplementedError("removal is unsupported") 

131 

132 def __len__(self) -> int: 

133 return len(set().union(self._m, self._entries)) 

134 

135 def __iter__(self) -> Iterator[str]: 

136 return iter(set().union(self._m, self._entries)) 

137 

138 

139_registry = _Registry() 

140 

141 

142def available_implementations(*, fallback: bool = False) -> list[str]: 

143 """return a list of protocols for available implementations 

144 

145 Parameters 

146 ---------- 

147 fallback: 

148 If True, also return protocols for fsspec filesystems without 

149 an implementation in universal_pathlib. 

150 """ 

151 impl = list(_registry) 

152 if not fallback: 

153 return impl 

154 else: 

155 return list({*impl, *list(_fsspec_known_implementations)}) 

156 

157 

158def register_implementation( 

159 protocol: str, 

160 cls: type[upath.UPath] | str, 

161 *, 

162 clobber: bool = False, 

163) -> None: 

164 """register a UPath implementation with a protocol 

165 

166 Parameters 

167 ---------- 

168 protocol: 

169 Protocol name to associate with the class 

170 cls: 

171 The UPath subclass for the protocol or a str representing the 

172 full path to an implementation class like package.module.class. 

173 clobber: 

174 Whether to overwrite a protocol with the same name; if False, 

175 will raise instead. 

176 """ 

177 if not re.match(r"^[a-z][a-z0-9+_.]+$", protocol): 

178 raise ValueError(f"{protocol!r} is not a valid URI scheme") 

179 if not clobber and protocol in _registry: 

180 raise ValueError(f"{protocol!r} is already in registry and clobber is False!") 

181 _registry[protocol] = cls 

182 

183 

184@lru_cache 

185def get_upath_class( 

186 protocol: str, 

187 *, 

188 fallback: bool = True, 

189) -> type[upath.UPath] | None: 

190 """Return the upath cls for the given protocol. 

191 

192 Returns `None` if no matching protocol can be found. 

193 

194 Parameters 

195 ---------- 

196 protocol: 

197 The protocol string 

198 fallback: 

199 If fallback is False, don't return UPath instances for fsspec 

200 filesystems that don't have an implementation registered. 

201 """ 

202 try: 

203 return _registry[protocol] 

204 except KeyError: 

205 if not protocol: 

206 if os.name == "nt": 

207 from upath.implementations.local import WindowsUPath 

208 

209 return WindowsUPath 

210 else: 

211 from upath.implementations.local import PosixUPath 

212 

213 return PosixUPath 

214 if not fallback: 

215 return None 

216 try: 

217 _ = get_filesystem_class(protocol) 

218 except ValueError: 

219 return None # this is an unknown protocol 

220 else: 

221 warnings.warn( 

222 f"UPath {protocol!r} filesystem not explicitly implemented." 

223 " Falling back to default implementation." 

224 " This filesystem may not be tested.", 

225 UserWarning, 

226 stacklevel=2, 

227 ) 

228 return upath.UPath