Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_flavour.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os.path
4import posixpath
5import sys
6import warnings
7from collections.abc import Mapping
8from functools import lru_cache
9from typing import TYPE_CHECKING
10from typing import Any
11from typing import TypedDict
12from urllib.parse import SplitResult
13from urllib.parse import urlsplit
15from fsspec.registry import known_implementations
16from fsspec.registry import registry as _class_registry
17from fsspec.spec import AbstractFileSystem
19import upath
20from upath._flavour_sources import FileSystemFlavourBase
21from upath._flavour_sources import flavour_registry
22from upath._protocol import get_upath_protocol
23from upath._protocol import normalize_empty_netloc
24from upath.types import JoinablePathLike
25from upath.types import UPathParser
27if TYPE_CHECKING:
28 if sys.version_info >= (3, 12):
29 from typing import TypeAlias
30 else:
31 TypeAlias = Any
33 from upath.core import UPath
35__all__ = [
36 "LazyFlavourDescriptor",
37 "default_flavour",
38 "upath_urijoin",
39 "upath_get_kwargs_from_url",
40 "upath_strip_protocol",
41]
43class_registry: Mapping[str, type[AbstractFileSystem]] = _class_registry
46class AnyProtocolFileSystemFlavour(FileSystemFlavourBase):
47 sep = "/"
48 protocol = ()
49 root_marker = "/"
51 @classmethod
52 def _strip_protocol(cls, path: str) -> str:
53 protocol = get_upath_protocol(path)
54 if path.startswith(protocol + "://"):
55 path = path[len(protocol) + 3 :]
56 elif path.startswith(protocol + "::"):
57 path = path[len(protocol) + 2 :]
58 path = path.rstrip("/")
59 return path or cls.root_marker
61 @staticmethod
62 def _get_kwargs_from_urls(path: str) -> dict[str, Any]:
63 return {}
65 @classmethod
66 def _parent(cls, path):
67 path = cls._strip_protocol(path)
68 if "/" in path:
69 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
70 return cls.root_marker + parent
71 else:
72 return cls.root_marker
75class ProtocolConfig(TypedDict):
76 netloc_is_anchor: set[str]
77 supports_empty_parts: set[str]
78 meaningful_trailing_slash: set[str]
79 root_marker_override: dict[str, str]
82class WrappedFileSystemFlavour(UPathParser): # (pathlib_abc.FlavourBase)
83 """flavour class for universal_pathlib
85 **INTERNAL AND VERY MUCH EXPERIMENTAL**
87 Implements the fsspec compatible low-level lexical operations on
88 PurePathBase-like objects.
90 Note:
91 In case you find yourself in need of subclassing this class,
92 please open an issue in the universal_pathlib issue tracker:
93 https://github.com/fsspec/universal_pathlib/issues
94 Ideally we can find a way to make your use-case work by adding
95 more functionality to this class.
97 """
99 # Note:
100 # It would be ideal if there would be a way to avoid the need for
101 # indicating the following settings via the protocol. This is a
102 # workaround to be able to implement the flavour correctly.
103 # TODO:
104 # These settings should be configured on the UPath class?!?
105 protocol_config: ProtocolConfig = {
106 "netloc_is_anchor": {
107 "http",
108 "https",
109 "s3",
110 "s3a",
111 "smb",
112 "gs",
113 "gcs",
114 "az",
115 "adl",
116 "abfs",
117 "abfss",
118 },
119 "supports_empty_parts": {
120 "http",
121 "https",
122 "s3",
123 "s3a",
124 "gs",
125 "gcs",
126 "az",
127 "adl",
128 "abfs",
129 },
130 "meaningful_trailing_slash": {
131 "http",
132 "https",
133 },
134 "root_marker_override": {
135 "smb": "/",
136 "ssh": "/",
137 "sftp": "/",
138 },
139 }
141 def __init__(
142 self,
143 spec: type[AbstractFileSystem | FileSystemFlavourBase] | AbstractFileSystem,
144 *,
145 netloc_is_anchor: bool = False,
146 supports_empty_parts: bool = False,
147 meaningful_trailing_slash: bool = False,
148 root_marker_override: str | None = None,
149 ) -> None:
150 """initialize the flavour with the given fsspec"""
151 self._spec = spec
153 # netloc is considered an anchor, influences:
154 # - splitdrive
155 # - join
156 self.netloc_is_anchor = bool(netloc_is_anchor)
158 # supports empty parts, influences:
159 # - join
160 # - UPath._parse_path
161 self.supports_empty_parts = bool(supports_empty_parts)
163 # meaningful trailing slash, influences:
164 # - join
165 # - UPath._parse_path
166 self.has_meaningful_trailing_slash = bool(meaningful_trailing_slash)
168 # some filesystems require UPath to enforce a specific root marker
169 if root_marker_override is None:
170 self.root_marker_override = None
171 else:
172 self.root_marker_override = str(root_marker_override)
174 @classmethod
175 @lru_cache(maxsize=None)
176 def from_protocol(
177 cls,
178 protocol: str,
179 ) -> WrappedFileSystemFlavour:
180 """return the fsspec flavour for the given protocol"""
182 _c = cls.protocol_config
183 config: dict[str, Any] = {
184 "netloc_is_anchor": protocol in _c["netloc_is_anchor"],
185 "supports_empty_parts": protocol in _c["supports_empty_parts"],
186 "meaningful_trailing_slash": protocol in _c["meaningful_trailing_slash"],
187 "root_marker_override": _c["root_marker_override"].get(protocol),
188 }
190 # first try to get an already imported fsspec filesystem class
191 try:
192 return cls(class_registry[protocol], **config)
193 except KeyError:
194 pass
195 # next try to get the flavour from the generated flavour registry
196 # to avoid imports
197 try:
198 return cls(flavour_registry[protocol], **config)
199 except KeyError:
200 pass
201 # finally fallback to a default flavour for the protocol
202 if protocol in known_implementations:
203 warnings.warn(
204 f"Could not find default for known protocol {protocol!r}."
205 " Creating a default flavour for it. Please report this"
206 " to the universal_pathlib issue tracker.",
207 UserWarning,
208 stacklevel=2,
209 )
210 return cls(AnyProtocolFileSystemFlavour, **config)
212 def __repr__(self):
213 if isinstance(self._spec, type):
214 return f"<wrapped class {self._spec.__name__}>"
215 else:
216 return f"<wrapped instance {self._spec.__class__.__name__}>"
218 # === fsspec.AbstractFileSystem ===================================
220 @property
221 def protocol(self) -> tuple[str, ...]:
222 if isinstance(self._spec.protocol, str):
223 return (self._spec.protocol,)
224 else:
225 return self._spec.protocol
227 @property
228 def root_marker(self) -> str:
229 if self.root_marker_override is not None:
230 return self.root_marker_override
231 else:
232 return self._spec.root_marker
234 @property
235 def local_file(self) -> bool:
236 return bool(getattr(self._spec, "local_file", False))
238 @staticmethod
239 def stringify_path(pth: JoinablePathLike) -> str:
240 if isinstance(pth, str):
241 out = pth
242 elif isinstance(pth, upath.UPath) and not pth.is_absolute():
243 out = str(pth)
244 elif getattr(pth, "__fspath__", None) is not None:
245 assert hasattr(pth, "__fspath__")
246 out = pth.__fspath__()
247 elif isinstance(pth, os.PathLike):
248 out = str(pth)
249 elif isinstance(pth, upath.UPath) and pth.is_absolute():
250 out = pth.path
251 else:
252 out = str(pth)
253 return normalize_empty_netloc(out)
255 def strip_protocol(self, pth: JoinablePathLike) -> str:
256 pth = self.stringify_path(pth)
257 return self._spec._strip_protocol(pth) or self.root_marker
259 def get_kwargs_from_url(self, url: JoinablePathLike) -> dict[str, Any]:
260 # NOTE: the public variant is _from_url not _from_urls
261 if hasattr(url, "storage_options"):
262 return dict(url.storage_options)
263 url = self.stringify_path(url)
264 return self._spec._get_kwargs_from_urls(url)
266 def parent(self, path: JoinablePathLike) -> str:
267 path = self.stringify_path(path)
268 return self._spec._parent(path)
270 # === pathlib_abc.FlavourBase =====================================
272 @property
273 def sep(self) -> str: # type: ignore[override]
274 return self._spec.sep
276 @property
277 def altsep(self) -> str | None: # type: ignore[override]
278 return getattr(self._spec, "altsep", None)
280 def isabs(self, path: JoinablePathLike) -> bool:
281 path = self.strip_protocol(path)
282 if self.local_file:
283 return os.path.isabs(path)
284 else:
285 return path.startswith(self.root_marker)
287 def join(self, path: JoinablePathLike, *paths: JoinablePathLike) -> str:
288 if not paths:
289 return self.strip_protocol(path) or self.root_marker
290 if self.local_file:
291 p = os.path.join(
292 self.strip_protocol(path),
293 *map(self.stringify_path, paths),
294 )
295 return p if os.name != "nt" else p.replace("\\", "/")
296 if self.netloc_is_anchor:
297 drv, p0 = self.splitdrive(path)
298 pN = list(map(self.stringify_path, paths))
299 if not drv and not p0:
300 path, *pN = pN
301 drv, p0 = self.splitdrive(path)
302 p0 = p0 or self.sep
303 else:
304 p0 = str(self.strip_protocol(path)) or self.root_marker
305 pN = list(map(self.stringify_path, paths))
306 drv = ""
307 if self.supports_empty_parts:
308 return drv + self.sep.join([p0.removesuffix(self.sep), *pN])
309 else:
310 return drv + posixpath.join(p0, *pN)
312 def split(self, path: JoinablePathLike) -> tuple[str, str]:
313 stripped_path = self.strip_protocol(path)
314 if self.local_file:
315 return os.path.split(stripped_path)
316 head = self.parent(stripped_path) or self.root_marker
317 if head == self.sep:
318 tail = stripped_path[1:]
319 elif head:
320 tail = stripped_path[len(head) + 1 :]
321 elif self.netloc_is_anchor: # and not head
322 head = stripped_path
323 tail = ""
324 else:
325 tail = stripped_path
326 if (
327 not tail
328 and not self.has_meaningful_trailing_slash
329 and self.strip_protocol(head) != stripped_path
330 ):
331 return self.split(head)
332 return head, tail
334 def splitdrive(self, path: JoinablePathLike) -> tuple[str, str]:
335 path = self.strip_protocol(path)
336 if self.netloc_is_anchor:
337 u = urlsplit(path)
338 if u.scheme:
339 # cases like: "http://example.com/foo/bar"
340 drive = u._replace(path="", query="", fragment="").geturl()
341 rest = u._replace(scheme="", netloc="").geturl()
342 if (
343 u.path.startswith("//")
344 and SplitResult("", "", "//", "", "").geturl() == "////"
345 ):
346 # see: fsspec/universal_pathlib#233
347 rest = rest[2:]
348 return drive, rest or self.root_marker or self.sep
349 else:
350 # cases like: "bucket/some/special/key
351 drive, root, tail = path.partition(self.sep)
352 return drive, root + tail
353 elif self.local_file:
354 return os.path.splitdrive(path)
355 else:
356 # all other cases don't have a drive
357 return "", path
359 def normcase(self, path: JoinablePathLike) -> str:
360 if self.local_file:
361 return os.path.normcase(self.stringify_path(path))
362 else:
363 return self.stringify_path(path)
365 def splitext(self, path: JoinablePathLike) -> tuple[str, str]:
366 path = self.stringify_path(path)
367 if self.local_file:
368 return os.path.splitext(path)
369 else:
370 path, sep, name = path.rpartition(self.sep)
371 if "." in name:
372 stem, dot, ext = name.rpartition(".")
373 suffix = dot + ext
374 else:
375 stem = name
376 suffix = ""
377 return path + sep + stem, suffix
379 # === Python3.12 pathlib flavour ==================================
381 def splitroot(self, path: JoinablePathLike) -> tuple[str, str, str]:
382 drive, tail = self.splitdrive(path)
383 if self.netloc_is_anchor:
384 root_marker = self.root_marker or self.sep
385 else:
386 root_marker = self.root_marker
387 return drive, root_marker, tail.removeprefix(self.sep)
390default_flavour = WrappedFileSystemFlavour(AnyProtocolFileSystemFlavour)
393class LazyFlavourDescriptor:
394 """descriptor to lazily get the flavour for a given protocol"""
396 def __init__(self) -> None:
397 self._owner: type[UPath] | None = None
399 def __set_name__(self, owner: type[UPath], name: str) -> None:
400 # helper to provide a more informative repr
401 self._owner = owner
402 self._default_protocol: str | None
403 try:
404 self._default_protocol = self._owner.protocols[0] # type: ignore
405 except (AttributeError, IndexError):
406 self._default_protocol = None
408 def __get__(
409 self, obj: UPath | None, objtype: type[UPath] | None = None
410 ) -> WrappedFileSystemFlavour:
411 if obj is not None:
412 return WrappedFileSystemFlavour.from_protocol(
413 obj._chain.active_path_protocol
414 )
415 elif self._default_protocol: # type: ignore
416 return WrappedFileSystemFlavour.from_protocol(self._default_protocol)
417 else:
418 return default_flavour
420 def __repr__(self):
421 cls_name = f"{type(self).__name__}"
422 if self._owner is None:
423 return f"<unbound {cls_name}>"
424 else:
425 return f"<{cls_name} of {self._owner.__name__}>"
428def upath_strip_protocol(pth: JoinablePathLike) -> str:
429 if protocol := get_upath_protocol(pth):
430 return WrappedFileSystemFlavour.from_protocol(protocol).strip_protocol(pth)
431 return WrappedFileSystemFlavour.stringify_path(pth)
434def upath_get_kwargs_from_url(url: JoinablePathLike) -> dict[str, Any]:
435 if protocol := get_upath_protocol(url):
436 return WrappedFileSystemFlavour.from_protocol(protocol).get_kwargs_from_url(url)
437 return {}
440def upath_urijoin(base: str, uri: str) -> str:
441 """Join a base URI and a possibly relative URI to form an absolute
442 interpretation of the latter."""
443 # see:
444 # https://github.com/python/cpython/blob/ae6c01d9d2/Lib/urllib/parse.py#L539-L605
445 # modifications:
446 # - removed allow_fragments parameter
447 # - all schemes are considered to allow relative paths
448 # - all schemes are considered to allow netloc (revisit this)
449 # - no bytes support (removes encoding and decoding)
450 if not base:
451 return uri
452 if not uri:
453 return base
455 bs = urlsplit(base, scheme="")
456 us = urlsplit(uri, scheme=bs.scheme)
458 if us.scheme != bs.scheme: # or us.scheme not in uses_relative:
459 return uri
460 # if us.scheme in uses_netloc:
461 if us.netloc:
462 return us.geturl()
463 else:
464 us = us._replace(netloc=bs.netloc)
465 # end if
466 if not us.path and not us.fragment:
467 us = us._replace(path=bs.path, fragment=bs.fragment)
468 if not us.query:
469 us = us._replace(query=bs.query)
470 return us.geturl()
472 base_parts = bs.path.split("/")
473 if base_parts[-1] != "":
474 del base_parts[-1]
476 if us.path[:1] == "/":
477 segments = us.path.split("/")
478 else:
479 segments = base_parts + us.path.split("/")
480 segments[1:-1] = filter(None, segments[1:-1])
482 resolved_path: list[str] = []
484 for seg in segments:
485 if seg == "..":
486 try:
487 resolved_path.pop()
488 except IndexError:
489 pass
490 elif seg == ".":
491 continue
492 else:
493 resolved_path.append(seg)
495 if segments[-1] in (".", ".."):
496 resolved_path.append("")
498 return us._replace(path="/".join(resolved_path) or "/").geturl()