Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_protocol.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

42 statements  

1from __future__ import annotations 

2 

3import os 

4import re 

5from pathlib import PurePath 

6from typing import TYPE_CHECKING 

7from typing import Any 

8 

9if TYPE_CHECKING: 

10 from upath.core import UPath 

11 

12__all__ = [ 

13 "get_upath_protocol", 

14 "normalize_empty_netloc", 

15 "compatible_protocol", 

16] 

17 

18# Regular expression to match fsspec style protocols. 

19# Matches single slash usage too for compatibility. 

20_PROTOCOL_RE = re.compile( 

21 r"^(?P<protocol>[A-Za-z][A-Za-z0-9+]+):(?P<slashes>//?)(?P<path>.*)" 

22) 

23 

24# Matches data URIs 

25_DATA_URI_RE = re.compile(r"^data:[^,]*,") 

26 

27 

28def _match_protocol(pth: str) -> str: 

29 if m := _PROTOCOL_RE.match(pth): 

30 return m.group("protocol") 

31 elif _DATA_URI_RE.match(pth): 

32 return "data" 

33 return "" 

34 

35 

36def get_upath_protocol( 

37 pth: str | PurePath | os.PathLike, 

38 *, 

39 protocol: str | None = None, 

40 storage_options: dict[str, Any] | None = None, 

41) -> str: 

42 """return the filesystem spec protocol""" 

43 if isinstance(pth, str): 

44 pth_protocol = _match_protocol(pth) 

45 elif isinstance(pth, PurePath): 

46 pth_protocol = getattr(pth, "protocol", "") 

47 elif hasattr(pth, "__fspath__"): 

48 pth_protocol = _match_protocol(pth.__fspath__()) 

49 else: 

50 pth_protocol = _match_protocol(str(pth)) 

51 # if storage_options and not protocol and not pth_protocol: 

52 # protocol = "file" 

53 if protocol and pth_protocol and not pth_protocol.startswith(protocol): 

54 raise ValueError( 

55 f"requested protocol {protocol!r} incompatible with {pth_protocol!r}" 

56 ) 

57 return protocol or pth_protocol or "" 

58 

59 

60def normalize_empty_netloc(pth: str) -> str: 

61 if m := _PROTOCOL_RE.match(pth): 

62 if len(m.group("slashes")) == 1: 

63 protocol = m.group("protocol") 

64 path = m.group("path") 

65 pth = f"{protocol}:///{path}" 

66 return pth 

67 

68 

69def compatible_protocol(protocol: str, *args: str | os.PathLike[str] | UPath) -> bool: 

70 """check if UPath protocols are compatible""" 

71 for arg in args: 

72 other_protocol = get_upath_protocol(arg) 

73 # consider protocols equivalent if they match up to the first "+" 

74 other_protocol = other_protocol.partition("+")[0] 

75 # protocols: only identical (or empty "") protocols can combine 

76 if other_protocol and other_protocol != protocol: 

77 return False 

78 return True