Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_flavour.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os.path
4import posixpath
5import sys
6import warnings
7from functools import lru_cache
8from typing import TYPE_CHECKING
9from typing import Any
10from typing import Mapping
11from typing import Sequence
12from typing import TypedDict
13from typing import Union
14from urllib.parse import SplitResult
15from urllib.parse import urlsplit
17if sys.version_info >= (3, 12):
18 from typing import TypeAlias
19else:
20 TypeAlias = Any
22from fsspec.registry import known_implementations
23from fsspec.registry import registry as _class_registry
24from fsspec.spec import AbstractFileSystem
26from upath._compat import deprecated
27from upath._compat import str_remove_prefix
28from upath._compat import str_remove_suffix
29from upath._flavour_sources import FileSystemFlavourBase
30from upath._flavour_sources import flavour_registry
31from upath._protocol import get_upath_protocol
32from upath._protocol import normalize_empty_netloc
34if TYPE_CHECKING:
35 from upath.core import UPath
37__all__ = [
38 "LazyFlavourDescriptor",
39 "default_flavour",
40 "upath_urijoin",
41 "upath_get_kwargs_from_url",
42]
44class_registry: Mapping[str, type[AbstractFileSystem]] = _class_registry
45PathOrStr: TypeAlias = Union[str, "os.PathLike[str]"]
48class AnyProtocolFileSystemFlavour(FileSystemFlavourBase):
49 sep = "/"
50 protocol = ()
51 root_marker = "/"
53 @classmethod
54 def _strip_protocol(cls, path: str) -> str:
55 protocol = get_upath_protocol(path)
56 if path.startswith(protocol + "://"):
57 path = path[len(protocol) + 3 :]
58 elif path.startswith(protocol + "::"):
59 path = path[len(protocol) + 2 :]
60 path = path.rstrip("/")
61 return path or cls.root_marker
63 @staticmethod
64 def _get_kwargs_from_urls(path: str) -> dict[str, Any]:
65 return {}
67 @classmethod
68 def _parent(cls, path):
69 path = cls._strip_protocol(path)
70 if "/" in path:
71 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
72 return cls.root_marker + parent
73 else:
74 return cls.root_marker
77class ProtocolConfig(TypedDict):
78 netloc_is_anchor: set[str]
79 supports_empty_parts: set[str]
80 meaningful_trailing_slash: set[str]
81 root_marker_override: dict[str, str]
84class WrappedFileSystemFlavour: # (pathlib_abc.FlavourBase)
85 """flavour class for universal_pathlib
87 **INTERNAL AND VERY MUCH EXPERIMENTAL**
89 Implements the fsspec compatible low-level lexical operations on
90 PurePathBase-like objects.
92 Note:
93 In case you find yourself in need of subclassing this class,
94 please open an issue in the universal_pathlib issue tracker:
95 https://github.com/fsspec/universal_pathlib/issues
96 Ideally we can find a way to make your use-case work by adding
97 more functionality to this class.
99 """
101 # Note:
102 # It would be ideal if there would be a way to avoid the need for
103 # indicating the following settings via the protocol. This is a
104 # workaround to be able to implement the flavour correctly.
105 # TODO:
106 # These settings should be configured on the UPath class?!?
107 protocol_config: ProtocolConfig = {
108 "netloc_is_anchor": {
109 "http",
110 "https",
111 "s3",
112 "s3a",
113 "smb",
114 "gs",
115 "gcs",
116 "az",
117 "adl",
118 "abfs",
119 "abfss",
120 "webdav+http",
121 "webdav+https",
122 },
123 "supports_empty_parts": {
124 "http",
125 "https",
126 "s3",
127 "s3a",
128 "gs",
129 "gcs",
130 "az",
131 "adl",
132 "abfs",
133 },
134 "meaningful_trailing_slash": {
135 "http",
136 "https",
137 },
138 "root_marker_override": {
139 "ssh": "/",
140 "sftp": "/",
141 },
142 }
144 def __init__(
145 self,
146 spec: type[AbstractFileSystem | FileSystemFlavourBase] | AbstractFileSystem,
147 *,
148 netloc_is_anchor: bool = False,
149 supports_empty_parts: bool = False,
150 meaningful_trailing_slash: bool = False,
151 root_marker_override: str | None = None,
152 ) -> None:
153 """initialize the flavour with the given fsspec"""
154 self._spec = spec
156 # netloc is considered an anchor, influences:
157 # - splitdrive
158 # - join
159 self.netloc_is_anchor = bool(netloc_is_anchor)
161 # supports empty parts, influences:
162 # - join
163 # - UPath._parse_path
164 self.supports_empty_parts = bool(supports_empty_parts)
166 # meaningful trailing slash, influences:
167 # - join
168 # - UPath._parse_path
169 self.has_meaningful_trailing_slash = bool(meaningful_trailing_slash)
171 # some filesystems require UPath to enforce a specific root marker
172 if root_marker_override is None:
173 self.root_marker_override = None
174 else:
175 self.root_marker_override = str(root_marker_override)
177 @classmethod
178 @lru_cache(maxsize=None)
179 def from_protocol(
180 cls,
181 protocol: str,
182 ) -> WrappedFileSystemFlavour:
183 """return the fsspec flavour for the given protocol"""
185 _c = cls.protocol_config
186 config: dict[str, Any] = {
187 "netloc_is_anchor": protocol in _c["netloc_is_anchor"],
188 "supports_empty_parts": protocol in _c["supports_empty_parts"],
189 "meaningful_trailing_slash": protocol in _c["meaningful_trailing_slash"],
190 "root_marker_override": _c["root_marker_override"].get(protocol),
191 }
193 # first try to get an already imported fsspec filesystem class
194 try:
195 return cls(class_registry[protocol], **config)
196 except KeyError:
197 pass
198 # next try to get the flavour from the generated flavour registry
199 # to avoid imports
200 try:
201 return cls(flavour_registry[protocol], **config)
202 except KeyError:
203 pass
204 # finally fallback to a default flavour for the protocol
205 if protocol in known_implementations:
206 warnings.warn(
207 f"Could not find default for known protocol {protocol!r}."
208 " Creating a default flavour for it. Please report this"
209 " to the universal_pathlib issue tracker.",
210 UserWarning,
211 stacklevel=2,
212 )
213 return cls(AnyProtocolFileSystemFlavour, **config)
215 def __repr__(self):
216 if isinstance(self._spec, type):
217 return f"<wrapped class {self._spec.__name__}>"
218 else:
219 return f"<wrapped instance {self._spec.__class__.__name__}>"
221 # === fsspec.AbstractFileSystem ===================================
223 @property
224 def protocol(self) -> tuple[str, ...]:
225 if isinstance(self._spec.protocol, str):
226 return (self._spec.protocol,)
227 else:
228 return self._spec.protocol
230 @property
231 def root_marker(self) -> str:
232 if self.root_marker_override is not None:
233 return self.root_marker_override
234 else:
235 return self._spec.root_marker
237 @property
238 def local_file(self) -> bool:
239 return bool(getattr(self._spec, "local_file", False))
241 @staticmethod
242 def stringify_path(pth: PathOrStr) -> str:
243 if isinstance(pth, str):
244 out = pth
245 elif getattr(pth, "__fspath__", None) is not None:
246 out = pth.__fspath__()
247 elif isinstance(pth, os.PathLike):
248 out = str(pth)
249 elif hasattr(pth, "path"): # type: ignore[unreachable]
250 out = pth.path
251 else:
252 out = str(pth)
253 return normalize_empty_netloc(out)
255 def strip_protocol(self, pth: PathOrStr) -> str:
256 pth = self.stringify_path(pth)
257 return self._spec._strip_protocol(pth)
259 def get_kwargs_from_url(self, url: PathOrStr) -> dict[str, Any]:
260 # NOTE: the public variant is _from_url not _from_urls
261 if hasattr(url, "storage_options"):
262 return dict(url.storage_options)
263 url = self.stringify_path(url)
264 return self._spec._get_kwargs_from_urls(url)
266 def parent(self, path: PathOrStr) -> str:
267 path = self.stringify_path(path)
268 return self._spec._parent(path)
270 # === pathlib_abc.FlavourBase =====================================
272 @property
273 def sep(self) -> str:
274 return self._spec.sep
276 @property
277 def altsep(self) -> str | None:
278 return None
280 def isabs(self, path: PathOrStr) -> bool:
281 path = self.strip_protocol(path)
282 if self.local_file:
283 return os.path.isabs(path)
284 else:
285 return path.startswith(self.root_marker)
287 def join(self, path: PathOrStr, *paths: PathOrStr) -> str:
288 if self.netloc_is_anchor:
289 drv, p0 = self.splitdrive(path)
290 pN = list(map(self.stringify_path, paths))
291 if not drv and not p0:
292 path, *pN = pN
293 drv, p0 = self.splitdrive(path)
294 p0 = p0 or self.sep
295 else:
296 p0 = str(self.strip_protocol(path)) or self.root_marker
297 pN = list(map(self.stringify_path, paths))
298 drv = ""
299 if self.supports_empty_parts:
300 return drv + self.sep.join([str_remove_suffix(p0, self.sep), *pN])
301 else:
302 return drv + posixpath.join(p0, *pN)
304 def split(self, path: PathOrStr):
305 stripped_path = self.strip_protocol(path)
306 head = self.parent(stripped_path) or self.root_marker
307 if head:
308 return head, stripped_path[len(head) + 1 :]
309 else:
310 return "", stripped_path
312 def splitdrive(self, path: PathOrStr) -> tuple[str, str]:
313 path = self.strip_protocol(path)
314 if self.netloc_is_anchor:
315 u = urlsplit(path)
316 if u.scheme:
317 # cases like: "http://example.com/foo/bar"
318 drive = u._replace(path="", query="", fragment="").geturl()
319 rest = u._replace(scheme="", netloc="").geturl()
320 if (
321 u.path.startswith("//")
322 and SplitResult("", "", "//", "", "").geturl() == "////"
323 ):
324 # see: fsspec/universal_pathlib#233
325 rest = rest[2:]
326 return drive, rest or self.root_marker or self.sep
327 else:
328 # cases like: "bucket/some/special/key
329 drive, root, tail = path.partition(self.sep)
330 return drive, root + tail
331 elif self.local_file:
332 return os.path.splitdrive(path)
333 else:
334 # all other cases don't have a drive
335 return "", path
337 def normcase(self, path: PathOrStr) -> str:
338 if self.local_file:
339 return os.path.normcase(self.stringify_path(path))
340 else:
341 return self.stringify_path(path)
343 # === Python3.12 pathlib flavour ==================================
345 def splitroot(self, path: PathOrStr) -> tuple[str, str, str]:
346 drive, tail = self.splitdrive(path)
347 if self.netloc_is_anchor:
348 root_marker = self.root_marker or self.sep
349 else:
350 root_marker = self.root_marker
351 return drive, root_marker, str_remove_prefix(tail, self.sep)
353 # === deprecated backwards compatibility ===========================
355 @deprecated(python_version=(3, 12))
356 def casefold(self, s: str) -> str:
357 if self.local_file:
358 return s
359 else:
360 return s.lower()
362 @deprecated(python_version=(3, 12))
363 def parse_parts(self, parts: Sequence[str]) -> tuple[str, str, list[str]]:
364 parsed = []
365 sep = self.sep
366 drv = root = ""
367 it = reversed(parts)
368 for part in it:
369 if part:
370 drv, root, rel = self.splitroot(part)
371 if not root or root and rel:
372 for x in reversed(rel.split(sep)):
373 parsed.append(sys.intern(x))
374 if drv or root:
375 parsed.append(drv + root)
376 parsed.reverse()
377 return drv, root, parsed
379 @deprecated(python_version=(3, 12))
380 def join_parsed_parts(
381 self,
382 drv: str,
383 root: str,
384 parts: list[str],
385 drv2: str,
386 root2: str,
387 parts2: list[str],
388 ) -> tuple[str, str, list[str]]:
389 if root2:
390 if not drv2 and drv:
391 return drv, root2, [drv + root2] + parts2[1:]
392 elif drv2:
393 if drv2 == drv or self.casefold(drv2) == self.casefold(drv):
394 # Same drive => second path is relative to the first
395 return drv, root, parts + parts2[1:]
396 else:
397 # Second path is non-anchored (common case)
398 return drv, root, parts + parts2
399 return drv2, root2, parts2
402default_flavour = WrappedFileSystemFlavour(AnyProtocolFileSystemFlavour)
405class LazyFlavourDescriptor:
406 """descriptor to lazily get the flavour for a given protocol"""
408 def __init__(self) -> None:
409 self._owner: type[UPath] | None = None
411 def __set_name__(self, owner: type[UPath], name: str) -> None:
412 # helper to provide a more informative repr
413 self._owner = owner
414 self._default_protocol: str | None
415 try:
416 self._default_protocol = self._owner.protocols[0] # type: ignore
417 except (AttributeError, IndexError):
418 self._default_protocol = None
420 def __get__(self, instance: UPath, owner: type[UPath]) -> WrappedFileSystemFlavour:
421 if instance is not None:
422 return WrappedFileSystemFlavour.from_protocol(instance.protocol)
423 elif self._default_protocol: # type: ignore
424 return WrappedFileSystemFlavour.from_protocol(self._default_protocol)
425 else:
426 return default_flavour
428 def __repr__(self):
429 cls_name = f"{type(self).__name__}"
430 if self._owner is None:
431 return f"<unbound {cls_name}>"
432 else:
433 return f"<{cls_name} of {self._owner.__name__}>"
436def upath_strip_protocol(pth: PathOrStr) -> str:
437 if protocol := get_upath_protocol(pth):
438 return WrappedFileSystemFlavour.from_protocol(protocol).strip_protocol(pth)
439 return WrappedFileSystemFlavour.stringify_path(pth)
442def upath_get_kwargs_from_url(url: PathOrStr) -> dict[str, Any]:
443 if protocol := get_upath_protocol(url):
444 return WrappedFileSystemFlavour.from_protocol(protocol).get_kwargs_from_url(url)
445 return {}
448def upath_urijoin(base: str, uri: str) -> str:
449 """Join a base URI and a possibly relative URI to form an absolute
450 interpretation of the latter."""
451 # see:
452 # https://github.com/python/cpython/blob/ae6c01d9d2/Lib/urllib/parse.py#L539-L605
453 # modifications:
454 # - removed allow_fragments parameter
455 # - all schemes are considered to allow relative paths
456 # - all schemes are considered to allow netloc (revisit this)
457 # - no bytes support (removes encoding and decoding)
458 if not base:
459 return uri
460 if not uri:
461 return base
463 bs = urlsplit(base, scheme="")
464 us = urlsplit(uri, scheme=bs.scheme)
466 if us.scheme != bs.scheme: # or us.scheme not in uses_relative:
467 return uri
468 # if us.scheme in uses_netloc:
469 if us.netloc:
470 return us.geturl()
471 else:
472 us = us._replace(netloc=bs.netloc)
473 # end if
474 if not us.path and not us.fragment:
475 us = us._replace(path=bs.path, fragment=bs.fragment)
476 if not us.query:
477 us = us._replace(query=bs.query)
478 return us.geturl()
480 base_parts = bs.path.split("/")
481 if base_parts[-1] != "":
482 del base_parts[-1]
484 if us.path[:1] == "/":
485 segments = us.path.split("/")
486 else:
487 segments = base_parts + us.path.split("/")
488 segments[1:-1] = filter(None, segments[1:-1])
490 resolved_path: list[str] = []
492 for seg in segments:
493 if seg == "..":
494 try:
495 resolved_path.pop()
496 except IndexError:
497 pass
498 elif seg == ".":
499 continue
500 else:
501 resolved_path.append(seg)
503 if segments[-1] in (".", ".."):
504 resolved_path.append("")
506 return us._replace(path="/".join(resolved_path) or "/").geturl()