Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_flavour.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

275 statements  

1from __future__ import annotations 

2 

3import os.path 

4import posixpath 

5import sys 

6import warnings 

7from collections.abc import Mapping 

8from functools import lru_cache 

9from typing import TYPE_CHECKING 

10from typing import Any 

11from typing import TypedDict 

12from urllib.parse import SplitResult 

13from urllib.parse import urlsplit 

14 

15from fsspec.registry import known_implementations 

16from fsspec.registry import registry as _class_registry 

17from fsspec.spec import AbstractFileSystem 

18 

19import upath 

20from upath._flavour_sources import FileSystemFlavourBase 

21from upath._flavour_sources import flavour_registry 

22from upath._protocol import get_upath_protocol 

23from upath._protocol import normalize_empty_netloc 

24from upath.types import JoinablePathLike 

25from upath.types import UPathParser 

26 

27if TYPE_CHECKING: 

28 if sys.version_info >= (3, 12): 

29 from typing import TypeAlias 

30 else: 

31 TypeAlias = Any 

32 

33 from upath.core import UPath 

34 

35__all__ = [ 

36 "LazyFlavourDescriptor", 

37 "default_flavour", 

38 "upath_urijoin", 

39 "upath_get_kwargs_from_url", 

40 "upath_strip_protocol", 

41] 

42 

43class_registry: Mapping[str, type[AbstractFileSystem]] = _class_registry 

44 

45 

46class AnyProtocolFileSystemFlavour(FileSystemFlavourBase): 

47 sep = "/" 

48 protocol = () 

49 root_marker = "/" 

50 

51 @classmethod 

52 def _strip_protocol(cls, path: str) -> str: 

53 protocol = get_upath_protocol(path) 

54 if path.startswith(protocol + "://"): 

55 path = path[len(protocol) + 3 :] 

56 elif path.startswith(protocol + "::"): 

57 path = path[len(protocol) + 2 :] 

58 path = path.rstrip("/") 

59 return path or cls.root_marker 

60 

61 @staticmethod 

62 def _get_kwargs_from_urls(path: str) -> dict[str, Any]: 

63 return {} 

64 

65 @classmethod 

66 def _parent(cls, path): 

67 path = cls._strip_protocol(path) 

68 if "/" in path: 

69 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker) 

70 return cls.root_marker + parent 

71 else: 

72 return cls.root_marker 

73 

74 

75class ProtocolConfig(TypedDict): 

76 netloc_is_anchor: set[str] 

77 supports_empty_parts: set[str] 

78 meaningful_trailing_slash: set[str] 

79 root_marker_override: dict[str, str] 

80 

81 

82class WrappedFileSystemFlavour(UPathParser): # (pathlib_abc.FlavourBase) 

83 """flavour class for universal_pathlib 

84 

85 **INTERNAL AND VERY MUCH EXPERIMENTAL** 

86 

87 Implements the fsspec compatible low-level lexical operations on 

88 PurePathBase-like objects. 

89 

90 Note: 

91 In case you find yourself in need of subclassing this class, 

92 please open an issue in the universal_pathlib issue tracker: 

93 https://github.com/fsspec/universal_pathlib/issues 

94 Ideally we can find a way to make your use-case work by adding 

95 more functionality to this class. 

96 

97 """ 

98 

99 # Note: 

100 # It would be ideal if there would be a way to avoid the need for 

101 # indicating the following settings via the protocol. This is a 

102 # workaround to be able to implement the flavour correctly. 

103 # TODO: 

104 # These settings should be configured on the UPath class?!? 

105 protocol_config: ProtocolConfig = { 

106 "netloc_is_anchor": { 

107 "http", 

108 "https", 

109 "s3", 

110 "s3a", 

111 "smb", 

112 "gs", 

113 "gcs", 

114 "az", 

115 "adl", 

116 "abfs", 

117 "abfss", 

118 }, 

119 "supports_empty_parts": { 

120 "http", 

121 "https", 

122 "s3", 

123 "s3a", 

124 "gs", 

125 "gcs", 

126 "az", 

127 "adl", 

128 "abfs", 

129 }, 

130 "meaningful_trailing_slash": { 

131 "http", 

132 "https", 

133 }, 

134 "root_marker_override": { 

135 "smb": "/", 

136 "ssh": "/", 

137 "sftp": "/", 

138 }, 

139 } 

140 

141 def __init__( 

142 self, 

143 spec: type[AbstractFileSystem | FileSystemFlavourBase] | AbstractFileSystem, 

144 *, 

145 netloc_is_anchor: bool = False, 

146 supports_empty_parts: bool = False, 

147 meaningful_trailing_slash: bool = False, 

148 root_marker_override: str | None = None, 

149 ) -> None: 

150 """initialize the flavour with the given fsspec""" 

151 self._spec = spec 

152 

153 # netloc is considered an anchor, influences: 

154 # - splitdrive 

155 # - join 

156 self.netloc_is_anchor = bool(netloc_is_anchor) 

157 

158 # supports empty parts, influences: 

159 # - join 

160 # - UPath._parse_path 

161 self.supports_empty_parts = bool(supports_empty_parts) 

162 

163 # meaningful trailing slash, influences: 

164 # - join 

165 # - UPath._parse_path 

166 self.has_meaningful_trailing_slash = bool(meaningful_trailing_slash) 

167 

168 # some filesystems require UPath to enforce a specific root marker 

169 if root_marker_override is None: 

170 self.root_marker_override = None 

171 else: 

172 self.root_marker_override = str(root_marker_override) 

173 

174 @classmethod 

175 @lru_cache(maxsize=None) 

176 def from_protocol( 

177 cls, 

178 protocol: str, 

179 ) -> WrappedFileSystemFlavour: 

180 """return the fsspec flavour for the given protocol""" 

181 

182 _c = cls.protocol_config 

183 config: dict[str, Any] = { 

184 "netloc_is_anchor": protocol in _c["netloc_is_anchor"], 

185 "supports_empty_parts": protocol in _c["supports_empty_parts"], 

186 "meaningful_trailing_slash": protocol in _c["meaningful_trailing_slash"], 

187 "root_marker_override": _c["root_marker_override"].get(protocol), 

188 } 

189 

190 # first try to get an already imported fsspec filesystem class 

191 try: 

192 return cls(class_registry[protocol], **config) 

193 except KeyError: 

194 pass 

195 # next try to get the flavour from the generated flavour registry 

196 # to avoid imports 

197 try: 

198 return cls(flavour_registry[protocol], **config) 

199 except KeyError: 

200 pass 

201 # finally fallback to a default flavour for the protocol 

202 if protocol in known_implementations: 

203 warnings.warn( 

204 f"Could not find default for known protocol {protocol!r}." 

205 " Creating a default flavour for it. Please report this" 

206 " to the universal_pathlib issue tracker.", 

207 UserWarning, 

208 stacklevel=2, 

209 ) 

210 return cls(AnyProtocolFileSystemFlavour, **config) 

211 

212 def __repr__(self): 

213 if isinstance(self._spec, type): 

214 return f"<wrapped class {self._spec.__name__}>" 

215 else: 

216 return f"<wrapped instance {self._spec.__class__.__name__}>" 

217 

218 # === fsspec.AbstractFileSystem =================================== 

219 

220 @property 

221 def protocol(self) -> tuple[str, ...]: 

222 if isinstance(self._spec.protocol, str): 

223 return (self._spec.protocol,) 

224 else: 

225 return self._spec.protocol 

226 

227 @property 

228 def root_marker(self) -> str: 

229 if self.root_marker_override is not None: 

230 return self.root_marker_override 

231 else: 

232 return self._spec.root_marker 

233 

234 @property 

235 def local_file(self) -> bool: 

236 return bool(getattr(self._spec, "local_file", False)) 

237 

238 @staticmethod 

239 def stringify_path(pth: JoinablePathLike) -> str: 

240 if isinstance(pth, str): 

241 out = pth 

242 elif isinstance(pth, upath.UPath) and not pth.is_absolute(): 

243 out = str(pth) 

244 elif getattr(pth, "__fspath__", None) is not None: 

245 assert hasattr(pth, "__fspath__") 

246 out = pth.__fspath__() 

247 elif isinstance(pth, os.PathLike): 

248 out = str(pth) 

249 elif isinstance(pth, upath.UPath) and pth.is_absolute(): 

250 out = pth.path 

251 else: 

252 out = str(pth) 

253 return normalize_empty_netloc(out) 

254 

255 def strip_protocol(self, pth: JoinablePathLike) -> str: 

256 pth = self.stringify_path(pth) 

257 return self._spec._strip_protocol(pth) or self.root_marker 

258 

259 def get_kwargs_from_url(self, url: JoinablePathLike) -> dict[str, Any]: 

260 # NOTE: the public variant is _from_url not _from_urls 

261 if hasattr(url, "storage_options"): 

262 return dict(url.storage_options) 

263 url = self.stringify_path(url) 

264 return self._spec._get_kwargs_from_urls(url) 

265 

266 def parent(self, path: JoinablePathLike) -> str: 

267 path = self.stringify_path(path) 

268 return self._spec._parent(path) 

269 

270 # === pathlib_abc.FlavourBase ===================================== 

271 

272 @property 

273 def sep(self) -> str: # type: ignore[override] 

274 return self._spec.sep 

275 

276 @property 

277 def altsep(self) -> str | None: # type: ignore[override] 

278 return getattr(self._spec, "altsep", None) 

279 

280 def isabs(self, path: JoinablePathLike) -> bool: 

281 path = self.strip_protocol(path) 

282 if self.local_file: 

283 return os.path.isabs(path) 

284 else: 

285 return path.startswith(self.root_marker) 

286 

287 def join(self, path: JoinablePathLike, *paths: JoinablePathLike) -> str: 

288 if not paths: 

289 return self.strip_protocol(path) or self.root_marker 

290 if self.local_file: 

291 p = os.path.join( 

292 self.strip_protocol(path), 

293 *map(self.stringify_path, paths), 

294 ) 

295 return p if os.name != "nt" else p.replace("\\", "/") 

296 if self.netloc_is_anchor: 

297 drv, p0 = self.splitdrive(path) 

298 pN = list(map(self.stringify_path, paths)) 

299 if not drv and not p0: 

300 path, *pN = pN 

301 drv, p0 = self.splitdrive(path) 

302 p0 = p0 or self.sep 

303 else: 

304 p0 = str(self.strip_protocol(path)) or self.root_marker 

305 pN = list(map(self.stringify_path, paths)) 

306 drv = "" 

307 if self.supports_empty_parts: 

308 return drv + self.sep.join([p0.removesuffix(self.sep), *pN]) 

309 else: 

310 return drv + posixpath.join(p0, *pN) 

311 

312 def split(self, path: JoinablePathLike) -> tuple[str, str]: 

313 stripped_path = self.strip_protocol(path) 

314 if self.local_file: 

315 return os.path.split(stripped_path) 

316 head = self.parent(stripped_path) or self.root_marker 

317 if head == self.sep: 

318 tail = stripped_path[1:] 

319 elif head: 

320 tail = stripped_path[len(head) + 1 :] 

321 elif self.netloc_is_anchor: # and not head 

322 head = stripped_path 

323 tail = "" 

324 else: 

325 tail = stripped_path 

326 if ( 

327 not tail 

328 and not self.has_meaningful_trailing_slash 

329 and self.strip_protocol(head) != stripped_path 

330 ): 

331 return self.split(head) 

332 return head, tail 

333 

334 def splitdrive(self, path: JoinablePathLike) -> tuple[str, str]: 

335 path = self.strip_protocol(path) 

336 if self.netloc_is_anchor: 

337 u = urlsplit(path) 

338 if u.scheme: 

339 # cases like: "http://example.com/foo/bar" 

340 drive = u._replace(path="", query="", fragment="").geturl() 

341 rest = u._replace(scheme="", netloc="").geturl() 

342 if ( 

343 u.path.startswith("//") 

344 and SplitResult("", "", "//", "", "").geturl() == "////" 

345 ): 

346 # see: fsspec/universal_pathlib#233 

347 rest = rest[2:] 

348 return drive, rest or self.root_marker or self.sep 

349 else: 

350 # cases like: "bucket/some/special/key 

351 drive, root, tail = path.partition(self.sep) 

352 return drive, root + tail 

353 elif self.local_file: 

354 return os.path.splitdrive(path) 

355 else: 

356 # all other cases don't have a drive 

357 return "", path 

358 

359 def normcase(self, path: JoinablePathLike) -> str: 

360 if self.local_file: 

361 return os.path.normcase(self.stringify_path(path)) 

362 else: 

363 return self.stringify_path(path) 

364 

365 def splitext(self, path: JoinablePathLike) -> tuple[str, str]: 

366 path = self.stringify_path(path) 

367 if self.local_file: 

368 return os.path.splitext(path) 

369 else: 

370 path, sep, name = path.rpartition(self.sep) 

371 if "." in name: 

372 stem, dot, ext = name.rpartition(".") 

373 suffix = dot + ext 

374 else: 

375 stem = name 

376 suffix = "" 

377 return path + sep + stem, suffix 

378 

379 # === Python3.12 pathlib flavour ================================== 

380 

381 def splitroot(self, path: JoinablePathLike) -> tuple[str, str, str]: 

382 drive, tail = self.splitdrive(path) 

383 if self.netloc_is_anchor: 

384 root_marker = self.root_marker or self.sep 

385 else: 

386 root_marker = self.root_marker 

387 return drive, root_marker, tail.removeprefix(self.sep) 

388 

389 

390default_flavour = WrappedFileSystemFlavour(AnyProtocolFileSystemFlavour) 

391 

392 

393class LazyFlavourDescriptor: 

394 """descriptor to lazily get the flavour for a given protocol""" 

395 

396 def __init__(self) -> None: 

397 self._owner: type[UPath] | None = None 

398 

399 def __set_name__(self, owner: type[UPath], name: str) -> None: 

400 # helper to provide a more informative repr 

401 self._owner = owner 

402 self._default_protocol: str | None 

403 try: 

404 self._default_protocol = self._owner.protocols[0] # type: ignore 

405 except (AttributeError, IndexError): 

406 self._default_protocol = None 

407 

408 def __get__( 

409 self, obj: UPath | None, objtype: type[UPath] | None = None 

410 ) -> WrappedFileSystemFlavour: 

411 if obj is not None: 

412 return WrappedFileSystemFlavour.from_protocol( 

413 obj._chain.active_path_protocol 

414 ) 

415 elif self._default_protocol: # type: ignore 

416 return WrappedFileSystemFlavour.from_protocol(self._default_protocol) 

417 else: 

418 return default_flavour 

419 

420 def __repr__(self): 

421 cls_name = f"{type(self).__name__}" 

422 if self._owner is None: 

423 return f"<unbound {cls_name}>" 

424 else: 

425 return f"<{cls_name} of {self._owner.__name__}>" 

426 

427 

428def upath_strip_protocol(pth: JoinablePathLike) -> str: 

429 if protocol := get_upath_protocol(pth): 

430 return WrappedFileSystemFlavour.from_protocol(protocol).strip_protocol(pth) 

431 return WrappedFileSystemFlavour.stringify_path(pth) 

432 

433 

434def upath_get_kwargs_from_url(url: JoinablePathLike) -> dict[str, Any]: 

435 if protocol := get_upath_protocol(url): 

436 return WrappedFileSystemFlavour.from_protocol(protocol).get_kwargs_from_url(url) 

437 return {} 

438 

439 

440def upath_urijoin(base: str, uri: str) -> str: 

441 """Join a base URI and a possibly relative URI to form an absolute 

442 interpretation of the latter.""" 

443 # see: 

444 # https://github.com/python/cpython/blob/ae6c01d9d2/Lib/urllib/parse.py#L539-L605 

445 # modifications: 

446 # - removed allow_fragments parameter 

447 # - all schemes are considered to allow relative paths 

448 # - all schemes are considered to allow netloc (revisit this) 

449 # - no bytes support (removes encoding and decoding) 

450 if not base: 

451 return uri 

452 if not uri: 

453 return base 

454 

455 bs = urlsplit(base, scheme="") 

456 us = urlsplit(uri, scheme=bs.scheme) 

457 

458 if us.scheme != bs.scheme: # or us.scheme not in uses_relative: 

459 return uri 

460 # if us.scheme in uses_netloc: 

461 if us.netloc: 

462 return us.geturl() 

463 else: 

464 us = us._replace(netloc=bs.netloc) 

465 # end if 

466 if not us.path and not us.fragment: 

467 us = us._replace(path=bs.path, fragment=bs.fragment) 

468 if not us.query: 

469 us = us._replace(query=bs.query) 

470 return us.geturl() 

471 

472 base_parts = bs.path.split("/") 

473 if base_parts[-1] != "": 

474 del base_parts[-1] 

475 

476 if us.path[:1] == "/": 

477 segments = us.path.split("/") 

478 else: 

479 segments = base_parts + us.path.split("/") 

480 segments[1:-1] = filter(None, segments[1:-1]) 

481 

482 resolved_path: list[str] = [] 

483 

484 for seg in segments: 

485 if seg == "..": 

486 try: 

487 resolved_path.pop() 

488 except IndexError: 

489 pass 

490 elif seg == ".": 

491 continue 

492 else: 

493 resolved_path.append(seg) 

494 

495 if segments[-1] in (".", ".."): 

496 resolved_path.append("") 

497 

498 return us._replace(path="/".join(resolved_path) or "/").geturl()