Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_protocol.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import re
5from pathlib import PurePath
6from typing import TYPE_CHECKING
7from typing import Any
9if TYPE_CHECKING:
10 from upath.core import UPath
12__all__ = [
13 "get_upath_protocol",
14 "normalize_empty_netloc",
15 "compatible_protocol",
16]
18# Regular expression to match fsspec style protocols.
19# Matches single slash usage too for compatibility.
20_PROTOCOL_RE = re.compile(
21 r"^(?P<protocol>[A-Za-z][A-Za-z0-9+]+):(?P<slashes>//?)(?P<path>.*)"
22)
24# Matches data URIs
25_DATA_URI_RE = re.compile(r"^data:[^,]*,")
28def _match_protocol(pth: str) -> str:
29 if m := _PROTOCOL_RE.match(pth):
30 return m.group("protocol")
31 elif _DATA_URI_RE.match(pth):
32 return "data"
33 return ""
36def get_upath_protocol(
37 pth: str | PurePath | os.PathLike,
38 *,
39 protocol: str | None = None,
40 storage_options: dict[str, Any] | None = None,
41) -> str:
42 """return the filesystem spec protocol"""
43 if isinstance(pth, str):
44 pth_protocol = _match_protocol(pth)
45 elif isinstance(pth, PurePath):
46 pth_protocol = getattr(pth, "protocol", "")
47 elif hasattr(pth, "__fspath__"):
48 pth_protocol = _match_protocol(pth.__fspath__())
49 else:
50 pth_protocol = _match_protocol(str(pth))
51 # if storage_options and not protocol and not pth_protocol:
52 # protocol = "file"
53 if protocol and pth_protocol and not pth_protocol.startswith(protocol):
54 raise ValueError(
55 f"requested protocol {protocol!r} incompatible with {pth_protocol!r}"
56 )
57 return protocol or pth_protocol or ""
60def normalize_empty_netloc(pth: str) -> str:
61 if m := _PROTOCOL_RE.match(pth):
62 if len(m.group("slashes")) == 1:
63 protocol = m.group("protocol")
64 path = m.group("path")
65 pth = f"{protocol}:///{path}"
66 return pth
69def compatible_protocol(protocol: str, *args: str | os.PathLike[str] | UPath) -> bool:
70 """check if UPath protocols are compatible"""
71 for arg in args:
72 other_protocol = get_upath_protocol(arg)
73 # consider protocols equivalent if they match up to the first "+"
74 other_protocol = other_protocol.partition("+")[0]
75 # protocols: only identical (or empty "") protocols can combine
76 if other_protocol and other_protocol != protocol:
77 return False
78 return True