Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpx/_utils.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1from __future__ import annotations 

2 

3import ipaddress 

4import os 

5import re 

6import typing 

7from urllib.request import getproxies 

8 

9from ._types import PrimitiveData 

10 

11if typing.TYPE_CHECKING: # pragma: no cover 

12 from ._urls import URL 

13 

14 

15def primitive_value_to_str(value: PrimitiveData) -> str: 

16 """ 

17 Coerce a primitive data type into a string value. 

18 

19 Note that we prefer JSON-style 'true'/'false' for boolean values here. 

20 """ 

21 if value is True: 

22 return "true" 

23 elif value is False: 

24 return "false" 

25 elif value is None: 

26 return "" 

27 return str(value) 

28 

29 

30def get_environment_proxies() -> dict[str, str | None]: 

31 """Gets proxy information from the environment""" 

32 

33 # urllib.request.getproxies() falls back on System 

34 # Registry and Config for proxies on Windows and macOS. 

35 # We don't want to propagate non-HTTP proxies into 

36 # our configuration such as 'TRAVIS_APT_PROXY'. 

37 proxy_info = getproxies() 

38 mounts: dict[str, str | None] = {} 

39 

40 for scheme in ("http", "https", "all"): 

41 if proxy_info.get(scheme): 

42 hostname = proxy_info[scheme] 

43 mounts[f"{scheme}://"] = ( 

44 hostname if "://" in hostname else f"http://{hostname}" 

45 ) 

46 

47 no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")] 

48 for hostname in no_proxy_hosts: 

49 # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details 

50 # on how names in `NO_PROXY` are handled. 

51 if hostname == "*": 

52 # If NO_PROXY=* is used or if "*" occurs as any one of the comma 

53 # separated hostnames, then we should just bypass any information 

54 # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore 

55 # proxies. 

56 return {} 

57 elif hostname: 

58 # NO_PROXY=.google.com is marked as "all://*.google.com, 

59 # which disables "www.google.com" but not "google.com" 

60 # NO_PROXY=google.com is marked as "all://*google.com, 

61 # which disables "www.google.com" and "google.com". 

62 # (But not "wwwgoogle.com") 

63 # NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost" 

64 # NO_PROXY=example.com,::1,localhost,192.168.0.0/16 

65 if "://" in hostname: 

66 mounts[hostname] = None 

67 elif is_ipv4_hostname(hostname): 

68 mounts[f"all://{hostname}"] = None 

69 elif is_ipv6_hostname(hostname): 

70 mounts[f"all://[{hostname}]"] = None 

71 elif hostname.lower() == "localhost": 

72 mounts[f"all://{hostname}"] = None 

73 else: 

74 mounts[f"all://*{hostname}"] = None 

75 

76 return mounts 

77 

78 

79def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes: 

80 return value.encode(encoding) if isinstance(value, str) else value 

81 

82 

83def to_str(value: str | bytes, encoding: str = "utf-8") -> str: 

84 return value if isinstance(value, str) else value.decode(encoding) 

85 

86 

87def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr: 

88 return value if isinstance(match_type_of, str) else value.encode() 

89 

90 

91def unquote(value: str) -> str: 

92 return value[1:-1] if value[0] == value[-1] == '"' else value 

93 

94 

95def peek_filelike_length(stream: typing.Any) -> int | None: 

96 """ 

97 Given a file-like stream object, return its length in number of bytes 

98 without reading it into memory. 

99 """ 

100 try: 

101 # Is it an actual file? 

102 fd = stream.fileno() 

103 # Yup, seems to be an actual file. 

104 length = os.fstat(fd).st_size 

105 except (AttributeError, OSError): 

106 # No... Maybe it's something that supports random access, like `io.BytesIO`? 

107 try: 

108 # Assuming so, go to end of stream to figure out its length, 

109 # then put it back in place. 

110 offset = stream.tell() 

111 length = stream.seek(0, os.SEEK_END) 

112 stream.seek(offset) 

113 except (AttributeError, OSError): 

114 # Not even that? Sorry, we're doomed... 

115 return None 

116 

117 return length 

118 

119 

120class URLPattern: 

121 """ 

122 A utility class currently used for making lookups against proxy keys... 

123 

124 # Wildcard matching... 

125 >>> pattern = URLPattern("all://") 

126 >>> pattern.matches(httpx.URL("http://example.com")) 

127 True 

128 

129 # Witch scheme matching... 

130 >>> pattern = URLPattern("https://") 

131 >>> pattern.matches(httpx.URL("https://example.com")) 

132 True 

133 >>> pattern.matches(httpx.URL("http://example.com")) 

134 False 

135 

136 # With domain matching... 

137 >>> pattern = URLPattern("https://example.com") 

138 >>> pattern.matches(httpx.URL("https://example.com")) 

139 True 

140 >>> pattern.matches(httpx.URL("http://example.com")) 

141 False 

142 >>> pattern.matches(httpx.URL("https://other.com")) 

143 False 

144 

145 # Wildcard scheme, with domain matching... 

146 >>> pattern = URLPattern("all://example.com") 

147 >>> pattern.matches(httpx.URL("https://example.com")) 

148 True 

149 >>> pattern.matches(httpx.URL("http://example.com")) 

150 True 

151 >>> pattern.matches(httpx.URL("https://other.com")) 

152 False 

153 

154 # With port matching... 

155 >>> pattern = URLPattern("https://example.com:1234") 

156 >>> pattern.matches(httpx.URL("https://example.com:1234")) 

157 True 

158 >>> pattern.matches(httpx.URL("https://example.com")) 

159 False 

160 """ 

161 

162 def __init__(self, pattern: str) -> None: 

163 from ._urls import URL 

164 

165 if pattern and ":" not in pattern: 

166 raise ValueError( 

167 f"Proxy keys should use proper URL forms rather " 

168 f"than plain scheme strings. " 

169 f'Instead of "{pattern}", use "{pattern}://"' 

170 ) 

171 

172 url = URL(pattern) 

173 self.pattern = pattern 

174 self.scheme = "" if url.scheme == "all" else url.scheme 

175 self.host = "" if url.host == "*" else url.host 

176 self.port = url.port 

177 if not url.host or url.host == "*": 

178 self.host_regex: typing.Pattern[str] | None = None 

179 elif url.host.startswith("*."): 

180 # *.example.com should match "www.example.com", but not "example.com" 

181 domain = re.escape(url.host[2:]) 

182 self.host_regex = re.compile(f"^.+\\.{domain}$") 

183 elif url.host.startswith("*"): 

184 # *example.com should match "www.example.com" and "example.com" 

185 domain = re.escape(url.host[1:]) 

186 self.host_regex = re.compile(f"^(.+\\.)?{domain}$") 

187 else: 

188 # example.com should match "example.com" but not "www.example.com" 

189 domain = re.escape(url.host) 

190 self.host_regex = re.compile(f"^{domain}$") 

191 

192 def matches(self, other: URL) -> bool: 

193 if self.scheme and self.scheme != other.scheme: 

194 return False 

195 if ( 

196 self.host 

197 and self.host_regex is not None 

198 and not self.host_regex.match(other.host) 

199 ): 

200 return False 

201 if self.port is not None and self.port != other.port: 

202 return False 

203 return True 

204 

205 @property 

206 def priority(self) -> tuple[int, int, int]: 

207 """ 

208 The priority allows URLPattern instances to be sortable, so that 

209 we can match from most specific to least specific. 

210 """ 

211 # URLs with a port should take priority over URLs without a port. 

212 port_priority = 0 if self.port is not None else 1 

213 # Longer hostnames should match first. 

214 host_priority = -len(self.host) 

215 # Longer schemes should match first. 

216 scheme_priority = -len(self.scheme) 

217 return (port_priority, host_priority, scheme_priority) 

218 

219 def __hash__(self) -> int: 

220 return hash(self.pattern) 

221 

222 def __lt__(self, other: URLPattern) -> bool: 

223 return self.priority < other.priority 

224 

225 def __eq__(self, other: typing.Any) -> bool: 

226 return isinstance(other, URLPattern) and self.pattern == other.pattern 

227 

228 

229def is_ipv4_hostname(hostname: str) -> bool: 

230 try: 

231 ipaddress.IPv4Address(hostname.split("/")[0]) 

232 except Exception: 

233 return False 

234 return True 

235 

236 

237def is_ipv6_hostname(hostname: str) -> bool: 

238 try: 

239 ipaddress.IPv6Address(hostname.split("/")[0]) 

240 except Exception: 

241 return False 

242 return True