Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_protocol.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

85 statements  

1from __future__ import annotations 

2 

3import re 

4from collections import ChainMap 

5from pathlib import PurePath 

6from typing import TYPE_CHECKING 

7from typing import Any 

8 

9from fsspec.registry import known_implementations as _known_implementations 

10from fsspec.registry import registry as _registry 

11 

12if TYPE_CHECKING: 

13 from upath.types import JoinablePathLike 

14 

15__all__ = [ 

16 "get_upath_protocol", 

17 "normalize_empty_netloc", 

18 "compatible_protocol", 

19] 

20 

21# Regular expression to match fsspec style protocols. 

22# Matches single slash usage too for compatibility. 

23_PROTOCOL_RE = re.compile( 

24 r"^(?P<protocol>[A-Za-z][A-Za-z0-9+]+):(?:(?P<slashes>//?)|:)(?P<path>.*)" 

25) 

26 

27# Matches data URIs 

28_DATA_URI_RE = re.compile(r"^data:[^,]*,") 

29 

30 

31def _match_protocol(pth: str) -> str: 

32 if m := _PROTOCOL_RE.match(pth): 

33 return m.group("protocol") 

34 elif _DATA_URI_RE.match(pth): 

35 return "data" 

36 return "" 

37 

38 

39_fsspec_registry_map = ChainMap(_registry, _known_implementations) 

40 

41 

42def _fsspec_protocol_equals(p0: str, p1: str) -> bool: 

43 """check if two fsspec protocols are equivalent""" 

44 p0 = p0 or "file" 

45 p1 = p1 or "file" 

46 if p0 == p1: 

47 return True 

48 

49 try: 

50 o0 = _fsspec_registry_map[p0] 

51 except KeyError: 

52 raise ValueError(f"Protocol not known: {p0!r}") 

53 try: 

54 o1 = _fsspec_registry_map[p1] 

55 except KeyError: 

56 raise ValueError(f"Protocol not known: {p1!r}") 

57 

58 if o0 == o1: 

59 return True 

60 

61 if isinstance(o0, dict): 

62 o0 = o0.get("class") 

63 elif isinstance(o0, type): 

64 if o0.__module__: 

65 o0 = o0.__module__ + "." + o0.__name__ 

66 else: 

67 o0 = o0.__name__ 

68 if isinstance(o1, dict): 

69 o1 = o1.get("class") 

70 elif isinstance(o1, type): 

71 if o1.__module__: 

72 o1 = o1.__module__ + "." + o1.__name__ 

73 else: 

74 o1 = o1.__name__ 

75 

76 return o0 == o1 

77 

78 

79def get_upath_protocol( 

80 pth: JoinablePathLike, 

81 *, 

82 protocol: str | None = None, 

83 storage_options: dict[str, Any] | None = None, 

84) -> str: 

85 """return the filesystem spec protocol""" 

86 from upath.core import UPath 

87 

88 if isinstance(pth, str): 

89 pth_protocol = _match_protocol(pth) 

90 elif isinstance(pth, UPath): 

91 pth_protocol = pth.protocol 

92 elif isinstance(pth, PurePath): 

93 pth_protocol = getattr(pth, "protocol", "") 

94 elif hasattr(pth, "__vfspath__"): 

95 pth_protocol = _match_protocol(pth.__vfspath__()) 

96 elif hasattr(pth, "__fspath__"): 

97 pth_protocol = _match_protocol(pth.__fspath__()) 

98 else: 

99 pth_protocol = _match_protocol(str(pth)) 

100 # if storage_options and not protocol and not pth_protocol: 

101 # protocol = "file" 

102 if protocol is None: 

103 return pth_protocol or "" 

104 elif ( 

105 protocol 

106 and pth_protocol 

107 and not _fsspec_protocol_equals(pth_protocol, protocol) 

108 ): 

109 raise ValueError( 

110 f"requested protocol {protocol!r} incompatible with {pth_protocol!r}" 

111 ) 

112 elif protocol == "" and pth_protocol: 

113 # explicitly requested empty protocol, but path has non-empty protocol 

114 raise ValueError( 

115 f"explicitly requested empty protocol {protocol!r}" 

116 f" incompatible with {pth_protocol!r}" 

117 ) 

118 return protocol or pth_protocol or "" 

119 

120 

121def normalize_empty_netloc(pth: str) -> str: 

122 if m := _PROTOCOL_RE.match(pth): 

123 if m.group("slashes") == "/": 

124 protocol = m.group("protocol") 

125 path = m.group("path") 

126 pth = f"{protocol}:///{path}" 

127 return pth 

128 

129 

130def compatible_protocol( 

131 protocol: str, 

132 *args: JoinablePathLike, 

133) -> bool: 

134 """check if UPath protocols are compatible""" 

135 from upath.core import UPath 

136 

137 for arg in args: 

138 if isinstance(arg, UPath) and not arg.is_absolute(): 

139 # relative UPath are always compatible 

140 continue 

141 other_protocol = get_upath_protocol(arg) 

142 # consider protocols equivalent if they match up to the first "+" 

143 other_protocol = other_protocol.partition("+")[0] 

144 # protocols: only identical (or empty "") protocols can combine 

145 if other_protocol and not _fsspec_protocol_equals(other_protocol, protocol): 

146 return False 

147 return True