Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_flavour.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

280 statements  

1from __future__ import annotations 

2 

3import os.path 

4import posixpath 

5import sys 

6import warnings 

7from functools import lru_cache 

8from typing import TYPE_CHECKING 

9from typing import Any 

10from typing import Mapping 

11from typing import Sequence 

12from typing import TypedDict 

13from typing import Union 

14from urllib.parse import SplitResult 

15from urllib.parse import urlsplit 

16 

17if sys.version_info >= (3, 12): 

18 from typing import TypeAlias 

19else: 

20 TypeAlias = Any 

21 

22from fsspec.registry import known_implementations 

23from fsspec.registry import registry as _class_registry 

24from fsspec.spec import AbstractFileSystem 

25 

26from upath._compat import deprecated 

27from upath._compat import str_remove_prefix 

28from upath._compat import str_remove_suffix 

29from upath._flavour_sources import FileSystemFlavourBase 

30from upath._flavour_sources import flavour_registry 

31from upath._protocol import get_upath_protocol 

32from upath._protocol import normalize_empty_netloc 

33 

34if TYPE_CHECKING: 

35 from upath.core import UPath 

36 

37__all__ = [ 

38 "LazyFlavourDescriptor", 

39 "default_flavour", 

40 "upath_urijoin", 

41 "upath_get_kwargs_from_url", 

42] 

43 

44class_registry: Mapping[str, type[AbstractFileSystem]] = _class_registry 

45PathOrStr: TypeAlias = Union[str, "os.PathLike[str]"] 

46 

47 

48class AnyProtocolFileSystemFlavour(FileSystemFlavourBase): 

49 sep = "/" 

50 protocol = () 

51 root_marker = "/" 

52 

53 @classmethod 

54 def _strip_protocol(cls, path: str) -> str: 

55 protocol = get_upath_protocol(path) 

56 if path.startswith(protocol + "://"): 

57 path = path[len(protocol) + 3 :] 

58 elif path.startswith(protocol + "::"): 

59 path = path[len(protocol) + 2 :] 

60 path = path.rstrip("/") 

61 return path or cls.root_marker 

62 

63 @staticmethod 

64 def _get_kwargs_from_urls(path: str) -> dict[str, Any]: 

65 return {} 

66 

67 @classmethod 

68 def _parent(cls, path): 

69 path = cls._strip_protocol(path) 

70 if "/" in path: 

71 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker) 

72 return cls.root_marker + parent 

73 else: 

74 return cls.root_marker 

75 

76 

77class ProtocolConfig(TypedDict): 

78 netloc_is_anchor: set[str] 

79 supports_empty_parts: set[str] 

80 meaningful_trailing_slash: set[str] 

81 root_marker_override: dict[str, str] 

82 

83 

84class WrappedFileSystemFlavour: # (pathlib_abc.FlavourBase) 

85 """flavour class for universal_pathlib 

86 

87 **INTERNAL AND VERY MUCH EXPERIMENTAL** 

88 

89 Implements the fsspec compatible low-level lexical operations on 

90 PurePathBase-like objects. 

91 

92 Note: 

93 In case you find yourself in need of subclassing this class, 

94 please open an issue in the universal_pathlib issue tracker: 

95 https://github.com/fsspec/universal_pathlib/issues 

96 Ideally we can find a way to make your use-case work by adding 

97 more functionality to this class. 

98 

99 """ 

100 

101 # Note: 

102 # It would be ideal if there would be a way to avoid the need for 

103 # indicating the following settings via the protocol. This is a 

104 # workaround to be able to implement the flavour correctly. 

105 # TODO: 

106 # These settings should be configured on the UPath class?!? 

107 protocol_config: ProtocolConfig = { 

108 "netloc_is_anchor": { 

109 "http", 

110 "https", 

111 "s3", 

112 "s3a", 

113 "smb", 

114 "gs", 

115 "gcs", 

116 "az", 

117 "adl", 

118 "abfs", 

119 "abfss", 

120 "webdav+http", 

121 "webdav+https", 

122 }, 

123 "supports_empty_parts": { 

124 "http", 

125 "https", 

126 "s3", 

127 "s3a", 

128 "gs", 

129 "gcs", 

130 "az", 

131 "adl", 

132 "abfs", 

133 }, 

134 "meaningful_trailing_slash": { 

135 "http", 

136 "https", 

137 }, 

138 "root_marker_override": { 

139 "ssh": "/", 

140 "sftp": "/", 

141 }, 

142 } 

143 

144 def __init__( 

145 self, 

146 spec: type[AbstractFileSystem | FileSystemFlavourBase] | AbstractFileSystem, 

147 *, 

148 netloc_is_anchor: bool = False, 

149 supports_empty_parts: bool = False, 

150 meaningful_trailing_slash: bool = False, 

151 root_marker_override: str | None = None, 

152 ) -> None: 

153 """initialize the flavour with the given fsspec""" 

154 self._spec = spec 

155 

156 # netloc is considered an anchor, influences: 

157 # - splitdrive 

158 # - join 

159 self.netloc_is_anchor = bool(netloc_is_anchor) 

160 

161 # supports empty parts, influences: 

162 # - join 

163 # - UPath._parse_path 

164 self.supports_empty_parts = bool(supports_empty_parts) 

165 

166 # meaningful trailing slash, influences: 

167 # - join 

168 # - UPath._parse_path 

169 self.has_meaningful_trailing_slash = bool(meaningful_trailing_slash) 

170 

171 # some filesystems require UPath to enforce a specific root marker 

172 if root_marker_override is None: 

173 self.root_marker_override = None 

174 else: 

175 self.root_marker_override = str(root_marker_override) 

176 

177 @classmethod 

178 @lru_cache(maxsize=None) 

179 def from_protocol( 

180 cls, 

181 protocol: str, 

182 ) -> WrappedFileSystemFlavour: 

183 """return the fsspec flavour for the given protocol""" 

184 

185 _c = cls.protocol_config 

186 config: dict[str, Any] = { 

187 "netloc_is_anchor": protocol in _c["netloc_is_anchor"], 

188 "supports_empty_parts": protocol in _c["supports_empty_parts"], 

189 "meaningful_trailing_slash": protocol in _c["meaningful_trailing_slash"], 

190 "root_marker_override": _c["root_marker_override"].get(protocol), 

191 } 

192 

193 # first try to get an already imported fsspec filesystem class 

194 try: 

195 return cls(class_registry[protocol], **config) 

196 except KeyError: 

197 pass 

198 # next try to get the flavour from the generated flavour registry 

199 # to avoid imports 

200 try: 

201 return cls(flavour_registry[protocol], **config) 

202 except KeyError: 

203 pass 

204 # finally fallback to a default flavour for the protocol 

205 if protocol in known_implementations: 

206 warnings.warn( 

207 f"Could not find default for known protocol {protocol!r}." 

208 " Creating a default flavour for it. Please report this" 

209 " to the universal_pathlib issue tracker.", 

210 UserWarning, 

211 stacklevel=2, 

212 ) 

213 return cls(AnyProtocolFileSystemFlavour, **config) 

214 

215 def __repr__(self): 

216 if isinstance(self._spec, type): 

217 return f"<wrapped class {self._spec.__name__}>" 

218 else: 

219 return f"<wrapped instance {self._spec.__class__.__name__}>" 

220 

221 # === fsspec.AbstractFileSystem =================================== 

222 

223 @property 

224 def protocol(self) -> tuple[str, ...]: 

225 if isinstance(self._spec.protocol, str): 

226 return (self._spec.protocol,) 

227 else: 

228 return self._spec.protocol 

229 

230 @property 

231 def root_marker(self) -> str: 

232 if self.root_marker_override is not None: 

233 return self.root_marker_override 

234 else: 

235 return self._spec.root_marker 

236 

237 @property 

238 def local_file(self) -> bool: 

239 return bool(getattr(self._spec, "local_file", False)) 

240 

241 @staticmethod 

242 def stringify_path(pth: PathOrStr) -> str: 

243 if isinstance(pth, str): 

244 out = pth 

245 elif getattr(pth, "__fspath__", None) is not None: 

246 out = pth.__fspath__() 

247 elif isinstance(pth, os.PathLike): 

248 out = str(pth) 

249 elif hasattr(pth, "path"): # type: ignore[unreachable] 

250 out = pth.path 

251 else: 

252 out = str(pth) 

253 return normalize_empty_netloc(out) 

254 

255 def strip_protocol(self, pth: PathOrStr) -> str: 

256 pth = self.stringify_path(pth) 

257 return self._spec._strip_protocol(pth) 

258 

259 def get_kwargs_from_url(self, url: PathOrStr) -> dict[str, Any]: 

260 # NOTE: the public variant is _from_url not _from_urls 

261 if hasattr(url, "storage_options"): 

262 return dict(url.storage_options) 

263 url = self.stringify_path(url) 

264 return self._spec._get_kwargs_from_urls(url) 

265 

266 def parent(self, path: PathOrStr) -> str: 

267 path = self.stringify_path(path) 

268 return self._spec._parent(path) 

269 

270 # === pathlib_abc.FlavourBase ===================================== 

271 

272 @property 

273 def sep(self) -> str: 

274 return self._spec.sep 

275 

276 @property 

277 def altsep(self) -> str | None: 

278 return None 

279 

280 def isabs(self, path: PathOrStr) -> bool: 

281 path = self.strip_protocol(path) 

282 if self.local_file: 

283 return os.path.isabs(path) 

284 else: 

285 return path.startswith(self.root_marker) 

286 

287 def join(self, path: PathOrStr, *paths: PathOrStr) -> str: 

288 if self.netloc_is_anchor: 

289 drv, p0 = self.splitdrive(path) 

290 pN = list(map(self.stringify_path, paths)) 

291 if not drv and not p0: 

292 path, *pN = pN 

293 drv, p0 = self.splitdrive(path) 

294 p0 = p0 or self.sep 

295 else: 

296 p0 = str(self.strip_protocol(path)) or self.root_marker 

297 pN = list(map(self.stringify_path, paths)) 

298 drv = "" 

299 if self.supports_empty_parts: 

300 return drv + self.sep.join([str_remove_suffix(p0, self.sep), *pN]) 

301 else: 

302 return drv + posixpath.join(p0, *pN) 

303 

304 def split(self, path: PathOrStr): 

305 stripped_path = self.strip_protocol(path) 

306 head = self.parent(stripped_path) or self.root_marker 

307 if head: 

308 return head, stripped_path[len(head) + 1 :] 

309 else: 

310 return "", stripped_path 

311 

312 def splitdrive(self, path: PathOrStr) -> tuple[str, str]: 

313 path = self.strip_protocol(path) 

314 if self.netloc_is_anchor: 

315 u = urlsplit(path) 

316 if u.scheme: 

317 # cases like: "http://example.com/foo/bar" 

318 drive = u._replace(path="", query="", fragment="").geturl() 

319 rest = u._replace(scheme="", netloc="").geturl() 

320 if ( 

321 u.path.startswith("//") 

322 and SplitResult("", "", "//", "", "").geturl() == "////" 

323 ): 

324 # see: fsspec/universal_pathlib#233 

325 rest = rest[2:] 

326 return drive, rest or self.root_marker or self.sep 

327 else: 

328 # cases like: "bucket/some/special/key 

329 drive, root, tail = path.partition(self.sep) 

330 return drive, root + tail 

331 elif self.local_file: 

332 return os.path.splitdrive(path) 

333 else: 

334 # all other cases don't have a drive 

335 return "", path 

336 

337 def normcase(self, path: PathOrStr) -> str: 

338 if self.local_file: 

339 return os.path.normcase(self.stringify_path(path)) 

340 else: 

341 return self.stringify_path(path) 

342 

343 # === Python3.12 pathlib flavour ================================== 

344 

345 def splitroot(self, path: PathOrStr) -> tuple[str, str, str]: 

346 drive, tail = self.splitdrive(path) 

347 if self.netloc_is_anchor: 

348 root_marker = self.root_marker or self.sep 

349 else: 

350 root_marker = self.root_marker 

351 return drive, root_marker, str_remove_prefix(tail, self.sep) 

352 

353 # === deprecated backwards compatibility =========================== 

354 

355 @deprecated(python_version=(3, 12)) 

356 def casefold(self, s: str) -> str: 

357 if self.local_file: 

358 return s 

359 else: 

360 return s.lower() 

361 

362 @deprecated(python_version=(3, 12)) 

363 def parse_parts(self, parts: Sequence[str]) -> tuple[str, str, list[str]]: 

364 parsed = [] 

365 sep = self.sep 

366 drv = root = "" 

367 it = reversed(parts) 

368 for part in it: 

369 if part: 

370 drv, root, rel = self.splitroot(part) 

371 if not root or root and rel: 

372 for x in reversed(rel.split(sep)): 

373 parsed.append(sys.intern(x)) 

374 if drv or root: 

375 parsed.append(drv + root) 

376 parsed.reverse() 

377 return drv, root, parsed 

378 

379 @deprecated(python_version=(3, 12)) 

380 def join_parsed_parts( 

381 self, 

382 drv: str, 

383 root: str, 

384 parts: list[str], 

385 drv2: str, 

386 root2: str, 

387 parts2: list[str], 

388 ) -> tuple[str, str, list[str]]: 

389 if root2: 

390 if not drv2 and drv: 

391 return drv, root2, [drv + root2] + parts2[1:] 

392 elif drv2: 

393 if drv2 == drv or self.casefold(drv2) == self.casefold(drv): 

394 # Same drive => second path is relative to the first 

395 return drv, root, parts + parts2[1:] 

396 else: 

397 # Second path is non-anchored (common case) 

398 return drv, root, parts + parts2 

399 return drv2, root2, parts2 

400 

401 

402default_flavour = WrappedFileSystemFlavour(AnyProtocolFileSystemFlavour) 

403 

404 

405class LazyFlavourDescriptor: 

406 """descriptor to lazily get the flavour for a given protocol""" 

407 

408 def __init__(self) -> None: 

409 self._owner: type[UPath] | None = None 

410 

411 def __set_name__(self, owner: type[UPath], name: str) -> None: 

412 # helper to provide a more informative repr 

413 self._owner = owner 

414 self._default_protocol: str | None 

415 try: 

416 self._default_protocol = self._owner.protocols[0] # type: ignore 

417 except (AttributeError, IndexError): 

418 self._default_protocol = None 

419 

420 def __get__(self, instance: UPath, owner: type[UPath]) -> WrappedFileSystemFlavour: 

421 if instance is not None: 

422 return WrappedFileSystemFlavour.from_protocol(instance.protocol) 

423 elif self._default_protocol: # type: ignore 

424 return WrappedFileSystemFlavour.from_protocol(self._default_protocol) 

425 else: 

426 return default_flavour 

427 

428 def __repr__(self): 

429 cls_name = f"{type(self).__name__}" 

430 if self._owner is None: 

431 return f"<unbound {cls_name}>" 

432 else: 

433 return f"<{cls_name} of {self._owner.__name__}>" 

434 

435 

436def upath_strip_protocol(pth: PathOrStr) -> str: 

437 if protocol := get_upath_protocol(pth): 

438 return WrappedFileSystemFlavour.from_protocol(protocol).strip_protocol(pth) 

439 return WrappedFileSystemFlavour.stringify_path(pth) 

440 

441 

442def upath_get_kwargs_from_url(url: PathOrStr) -> dict[str, Any]: 

443 if protocol := get_upath_protocol(url): 

444 return WrappedFileSystemFlavour.from_protocol(protocol).get_kwargs_from_url(url) 

445 return {} 

446 

447 

448def upath_urijoin(base: str, uri: str) -> str: 

449 """Join a base URI and a possibly relative URI to form an absolute 

450 interpretation of the latter.""" 

451 # see: 

452 # https://github.com/python/cpython/blob/ae6c01d9d2/Lib/urllib/parse.py#L539-L605 

453 # modifications: 

454 # - removed allow_fragments parameter 

455 # - all schemes are considered to allow relative paths 

456 # - all schemes are considered to allow netloc (revisit this) 

457 # - no bytes support (removes encoding and decoding) 

458 if not base: 

459 return uri 

460 if not uri: 

461 return base 

462 

463 bs = urlsplit(base, scheme="") 

464 us = urlsplit(uri, scheme=bs.scheme) 

465 

466 if us.scheme != bs.scheme: # or us.scheme not in uses_relative: 

467 return uri 

468 # if us.scheme in uses_netloc: 

469 if us.netloc: 

470 return us.geturl() 

471 else: 

472 us = us._replace(netloc=bs.netloc) 

473 # end if 

474 if not us.path and not us.fragment: 

475 us = us._replace(path=bs.path, fragment=bs.fragment) 

476 if not us.query: 

477 us = us._replace(query=bs.query) 

478 return us.geturl() 

479 

480 base_parts = bs.path.split("/") 

481 if base_parts[-1] != "": 

482 del base_parts[-1] 

483 

484 if us.path[:1] == "/": 

485 segments = us.path.split("/") 

486 else: 

487 segments = base_parts + us.path.split("/") 

488 segments[1:-1] = filter(None, segments[1:-1]) 

489 

490 resolved_path: list[str] = [] 

491 

492 for seg in segments: 

493 if seg == "..": 

494 try: 

495 resolved_path.pop() 

496 except IndexError: 

497 pass 

498 elif seg == ".": 

499 continue 

500 else: 

501 resolved_path.append(seg) 

502 

503 if segments[-1] in (".", ".."): 

504 resolved_path.append("") 

505 

506 return us._replace(path="/".join(resolved_path) or "/").geturl()