Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_protocol.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import re
4from collections import ChainMap
5from pathlib import PurePath
6from typing import TYPE_CHECKING
7from typing import Any
9from fsspec.registry import known_implementations as _known_implementations
10from fsspec.registry import registry as _registry
12if TYPE_CHECKING:
13 from upath.types import JoinablePathLike
15__all__ = [
16 "get_upath_protocol",
17 "normalize_empty_netloc",
18 "compatible_protocol",
19]
21# Regular expression to match fsspec style protocols.
22# Matches single slash usage too for compatibility.
23_PROTOCOL_RE = re.compile(
24 r"^(?P<protocol>[A-Za-z][A-Za-z0-9+]+):(?:(?P<slashes>//?)|:)(?P<path>.*)"
25)
27# Matches data URIs
28_DATA_URI_RE = re.compile(r"^data:[^,]*,")
31def _match_protocol(pth: str) -> str:
32 if m := _PROTOCOL_RE.match(pth):
33 return m.group("protocol")
34 elif _DATA_URI_RE.match(pth):
35 return "data"
36 return ""
39_fsspec_registry_map = ChainMap(_registry, _known_implementations)
42def _fsspec_protocol_equals(p0: str, p1: str) -> bool:
43 """check if two fsspec protocols are equivalent"""
44 p0 = p0 or "file"
45 p1 = p1 or "file"
46 if p0 == p1:
47 return True
49 try:
50 o0 = _fsspec_registry_map[p0]
51 except KeyError:
52 raise ValueError(f"Protocol not known: {p0!r}")
53 try:
54 o1 = _fsspec_registry_map[p1]
55 except KeyError:
56 raise ValueError(f"Protocol not known: {p1!r}")
58 if o0 == o1:
59 return True
61 if isinstance(o0, dict):
62 o0 = o0.get("class")
63 elif isinstance(o0, type):
64 if o0.__module__:
65 o0 = o0.__module__ + "." + o0.__name__
66 else:
67 o0 = o0.__name__
68 if isinstance(o1, dict):
69 o1 = o1.get("class")
70 elif isinstance(o1, type):
71 if o1.__module__:
72 o1 = o1.__module__ + "." + o1.__name__
73 else:
74 o1 = o1.__name__
76 return o0 == o1
79def get_upath_protocol(
80 pth: JoinablePathLike,
81 *,
82 protocol: str | None = None,
83 storage_options: dict[str, Any] | None = None,
84) -> str:
85 """return the filesystem spec protocol"""
86 from upath.core import UPath
88 if isinstance(pth, str):
89 pth_protocol = _match_protocol(pth)
90 elif isinstance(pth, UPath):
91 pth_protocol = pth.protocol
92 elif isinstance(pth, PurePath):
93 pth_protocol = getattr(pth, "protocol", "")
94 elif hasattr(pth, "__vfspath__"):
95 pth_protocol = _match_protocol(pth.__vfspath__())
96 elif hasattr(pth, "__fspath__"):
97 pth_protocol = _match_protocol(pth.__fspath__())
98 else:
99 pth_protocol = _match_protocol(str(pth))
100 # if storage_options and not protocol and not pth_protocol:
101 # protocol = "file"
102 if protocol is None:
103 return pth_protocol or ""
104 elif (
105 protocol
106 and pth_protocol
107 and not _fsspec_protocol_equals(pth_protocol, protocol)
108 ):
109 raise ValueError(
110 f"requested protocol {protocol!r} incompatible with {pth_protocol!r}"
111 )
112 elif protocol == "" and pth_protocol:
113 # explicitly requested empty protocol, but path has non-empty protocol
114 raise ValueError(
115 f"explicitly requested empty protocol {protocol!r}"
116 f" incompatible with {pth_protocol!r}"
117 )
118 return protocol or pth_protocol or ""
121def normalize_empty_netloc(pth: str) -> str:
122 if m := _PROTOCOL_RE.match(pth):
123 if m.group("slashes") == "/":
124 protocol = m.group("protocol")
125 path = m.group("path")
126 pth = f"{protocol}:///{path}"
127 return pth
130def compatible_protocol(
131 protocol: str,
132 *args: JoinablePathLike,
133) -> bool:
134 """check if UPath protocols are compatible"""
135 from upath.core import UPath
137 for arg in args:
138 if isinstance(arg, UPath) and not arg.is_absolute():
139 # relative UPath are always compatible
140 continue
141 other_protocol = get_upath_protocol(arg)
142 # consider protocols equivalent if they match up to the first "+"
143 other_protocol = other_protocol.partition("+")[0]
144 # protocols: only identical (or empty "") protocols can combine
145 if other_protocol and not _fsspec_protocol_equals(other_protocol, protocol):
146 return False
147 return True