Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_utils.py: 24%
232 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import codecs
2import email.message
3import ipaddress
4import mimetypes
5import os
6import re
7import time
8import typing
9from pathlib import Path
10from urllib.request import getproxies
12import sniffio
14from ._types import PrimitiveData
16if typing.TYPE_CHECKING: # pragma: no cover
17 from ._urls import URL
20_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"}
21_HTML5_FORM_ENCODING_REPLACEMENTS.update(
22 {chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B}
23)
24_HTML5_FORM_ENCODING_RE = re.compile(
25 r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()])
26)
29def normalize_header_key(
30 value: typing.Union[str, bytes],
31 lower: bool,
32 encoding: typing.Optional[str] = None,
33) -> bytes:
34 """
35 Coerce str/bytes into a strictly byte-wise HTTP header key.
36 """
37 if isinstance(value, bytes):
38 bytes_value = value
39 else:
40 bytes_value = value.encode(encoding or "ascii")
42 return bytes_value.lower() if lower else bytes_value
45def normalize_header_value(
46 value: typing.Union[str, bytes], encoding: typing.Optional[str] = None
47) -> bytes:
48 """
49 Coerce str/bytes into a strictly byte-wise HTTP header value.
50 """
51 if isinstance(value, bytes):
52 return value
53 return value.encode(encoding or "ascii")
56def primitive_value_to_str(value: "PrimitiveData") -> str:
57 """
58 Coerce a primitive data type into a string value.
60 Note that we prefer JSON-style 'true'/'false' for boolean values here.
61 """
62 if value is True:
63 return "true"
64 elif value is False:
65 return "false"
66 elif value is None:
67 return ""
68 return str(value)
71def is_known_encoding(encoding: str) -> bool:
72 """
73 Return `True` if `encoding` is a known codec.
74 """
75 try:
76 codecs.lookup(encoding)
77 except LookupError:
78 return False
79 return True
82def format_form_param(name: str, value: str) -> bytes:
83 """
84 Encode a name/value pair within a multipart form.
85 """
87 def replacer(match: typing.Match[str]) -> str:
88 return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)]
90 value = _HTML5_FORM_ENCODING_RE.sub(replacer, value)
91 return f'{name}="{value}"'.encode()
94# Null bytes; no need to recreate these on each call to guess_json_utf
95_null = b"\x00"
96_null2 = _null * 2
97_null3 = _null * 3
100def guess_json_utf(data: bytes) -> typing.Optional[str]:
101 # JSON always starts with two ASCII characters, so detection is as
102 # easy as counting the nulls and from their location and count
103 # determine the encoding. Also detect a BOM, if present.
104 sample = data[:4]
105 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
106 return "utf-32" # BOM included
107 if sample[:3] == codecs.BOM_UTF8:
108 return "utf-8-sig" # BOM included, MS style (discouraged)
109 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
110 return "utf-16" # BOM included
111 nullcount = sample.count(_null)
112 if nullcount == 0:
113 return "utf-8"
114 if nullcount == 2:
115 if sample[::2] == _null2: # 1st and 3rd are null
116 return "utf-16-be"
117 if sample[1::2] == _null2: # 2nd and 4th are null
118 return "utf-16-le"
119 # Did not detect 2 valid UTF-16 ascii-range characters
120 if nullcount == 3:
121 if sample[:3] == _null3:
122 return "utf-32-be"
123 if sample[1:] == _null3:
124 return "utf-32-le"
125 # Did not detect a valid UTF-32 ascii-range character
126 return None
129def get_ca_bundle_from_env() -> typing.Optional[str]:
130 if "SSL_CERT_FILE" in os.environ:
131 ssl_file = Path(os.environ["SSL_CERT_FILE"])
132 if ssl_file.is_file():
133 return str(ssl_file)
134 if "SSL_CERT_DIR" in os.environ:
135 ssl_path = Path(os.environ["SSL_CERT_DIR"])
136 if ssl_path.is_dir():
137 return str(ssl_path)
138 return None
141def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]:
142 """
143 Returns a list of parsed link headers, for more info see:
144 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
145 The generic syntax of those is:
146 Link: < uri-reference >; param1=value1; param2="value2"
147 So for instance:
148 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
149 would return
150 [
151 {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
152 {"url": "http://.../back.jpeg"},
153 ]
154 :param value: HTTP Link entity-header field
155 :return: list of parsed link headers
156 """
157 links: typing.List[typing.Dict[str, str]] = []
158 replace_chars = " '\""
159 value = value.strip(replace_chars)
160 if not value:
161 return links
162 for val in re.split(", *<", value):
163 try:
164 url, params = val.split(";", 1)
165 except ValueError:
166 url, params = val, ""
167 link = {"url": url.strip("<> '\"")}
168 for param in params.split(";"):
169 try:
170 key, value = param.split("=")
171 except ValueError:
172 break
173 link[key.strip(replace_chars)] = value.strip(replace_chars)
174 links.append(link)
175 return links
178def parse_content_type_charset(content_type: str) -> typing.Optional[str]:
179 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
180 # See: https://peps.python.org/pep-0594/#cgi
181 msg = email.message.Message()
182 msg["content-type"] = content_type
183 return msg.get_content_charset(failobj=None)
186SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
189def obfuscate_sensitive_headers(
190 items: typing.Iterable[typing.Tuple[typing.AnyStr, typing.AnyStr]]
191) -> typing.Iterator[typing.Tuple[typing.AnyStr, typing.AnyStr]]:
192 for k, v in items:
193 if to_str(k.lower()) in SENSITIVE_HEADERS:
194 v = to_bytes_or_str("[secure]", match_type_of=v)
195 yield k, v
198def port_or_default(url: "URL") -> typing.Optional[int]:
199 if url.port is not None:
200 return url.port
201 return {"http": 80, "https": 443}.get(url.scheme)
204def same_origin(url: "URL", other: "URL") -> bool:
205 """
206 Return 'True' if the given URLs share the same origin.
207 """
208 return (
209 url.scheme == other.scheme
210 and url.host == other.host
211 and port_or_default(url) == port_or_default(other)
212 )
215def is_https_redirect(url: "URL", location: "URL") -> bool:
216 """
217 Return 'True' if 'location' is a HTTPS upgrade of 'url'
218 """
219 if url.host != location.host:
220 return False
222 return (
223 url.scheme == "http"
224 and port_or_default(url) == 80
225 and location.scheme == "https"
226 and port_or_default(location) == 443
227 )
230def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]:
231 """Gets proxy information from the environment"""
233 # urllib.request.getproxies() falls back on System
234 # Registry and Config for proxies on Windows and macOS.
235 # We don't want to propagate non-HTTP proxies into
236 # our configuration such as 'TRAVIS_APT_PROXY'.
237 proxy_info = getproxies()
238 mounts: typing.Dict[str, typing.Optional[str]] = {}
240 for scheme in ("http", "https", "all"):
241 if proxy_info.get(scheme):
242 hostname = proxy_info[scheme]
243 mounts[f"{scheme}://"] = (
244 hostname if "://" in hostname else f"http://{hostname}"
245 )
247 no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")]
248 for hostname in no_proxy_hosts:
249 # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details
250 # on how names in `NO_PROXY` are handled.
251 if hostname == "*":
252 # If NO_PROXY=* is used or if "*" occurs as any one of the comma
253 # separated hostnames, then we should just bypass any information
254 # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore
255 # proxies.
256 return {}
257 elif hostname:
258 # NO_PROXY=.google.com is marked as "all://*.google.com,
259 # which disables "www.google.com" but not "google.com"
260 # NO_PROXY=google.com is marked as "all://*google.com,
261 # which disables "www.google.com" and "google.com".
262 # (But not "wwwgoogle.com")
263 # NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost"
264 # NO_PROXY=example.com,::1,localhost,192.168.0.0/16
265 if is_ipv4_hostname(hostname):
266 mounts[f"all://{hostname}"] = None
267 elif is_ipv6_hostname(hostname):
268 mounts[f"all://[{hostname}]"] = None
269 elif hostname.lower() == "localhost":
270 mounts[f"all://{hostname}"] = None
271 else:
272 mounts[f"all://*{hostname}"] = None
274 return mounts
277def to_bytes(value: typing.Union[str, bytes], encoding: str = "utf-8") -> bytes:
278 return value.encode(encoding) if isinstance(value, str) else value
281def to_str(value: typing.Union[str, bytes], encoding: str = "utf-8") -> str:
282 return value if isinstance(value, str) else value.decode(encoding)
285def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr:
286 return value if isinstance(match_type_of, str) else value.encode()
289def unquote(value: str) -> str:
290 return value[1:-1] if value[0] == value[-1] == '"' else value
293def guess_content_type(filename: typing.Optional[str]) -> typing.Optional[str]:
294 if filename:
295 return mimetypes.guess_type(filename)[0] or "application/octet-stream"
296 return None
299def peek_filelike_length(stream: typing.Any) -> typing.Optional[int]:
300 """
301 Given a file-like stream object, return its length in number of bytes
302 without reading it into memory.
303 """
304 try:
305 # Is it an actual file?
306 fd = stream.fileno()
307 # Yup, seems to be an actual file.
308 length = os.fstat(fd).st_size
309 except (AttributeError, OSError):
310 # No... Maybe it's something that supports random access, like `io.BytesIO`?
311 try:
312 # Assuming so, go to end of stream to figure out its length,
313 # then put it back in place.
314 offset = stream.tell()
315 length = stream.seek(0, os.SEEK_END)
316 stream.seek(offset)
317 except (AttributeError, OSError):
318 # Not even that? Sorry, we're doomed...
319 return None
321 return length
324class Timer:
325 async def _get_time(self) -> float:
326 library = sniffio.current_async_library()
327 if library == "trio":
328 import trio
330 return trio.current_time()
331 elif library == "curio": # pragma: no cover
332 import curio
334 return typing.cast(float, await curio.clock())
336 import asyncio
338 return asyncio.get_event_loop().time()
340 def sync_start(self) -> None:
341 self.started = time.perf_counter()
343 async def async_start(self) -> None:
344 self.started = await self._get_time()
346 def sync_elapsed(self) -> float:
347 now = time.perf_counter()
348 return now - self.started
350 async def async_elapsed(self) -> float:
351 now = await self._get_time()
352 return now - self.started
355class URLPattern:
356 """
357 A utility class currently used for making lookups against proxy keys...
359 # Wildcard matching...
360 >>> pattern = URLPattern("all")
361 >>> pattern.matches(httpx.URL("http://example.com"))
362 True
364 # Witch scheme matching...
365 >>> pattern = URLPattern("https")
366 >>> pattern.matches(httpx.URL("https://example.com"))
367 True
368 >>> pattern.matches(httpx.URL("http://example.com"))
369 False
371 # With domain matching...
372 >>> pattern = URLPattern("https://example.com")
373 >>> pattern.matches(httpx.URL("https://example.com"))
374 True
375 >>> pattern.matches(httpx.URL("http://example.com"))
376 False
377 >>> pattern.matches(httpx.URL("https://other.com"))
378 False
380 # Wildcard scheme, with domain matching...
381 >>> pattern = URLPattern("all://example.com")
382 >>> pattern.matches(httpx.URL("https://example.com"))
383 True
384 >>> pattern.matches(httpx.URL("http://example.com"))
385 True
386 >>> pattern.matches(httpx.URL("https://other.com"))
387 False
389 # With port matching...
390 >>> pattern = URLPattern("https://example.com:1234")
391 >>> pattern.matches(httpx.URL("https://example.com:1234"))
392 True
393 >>> pattern.matches(httpx.URL("https://example.com"))
394 False
395 """
397 def __init__(self, pattern: str) -> None:
398 from ._urls import URL
400 if pattern and ":" not in pattern:
401 raise ValueError(
402 f"Proxy keys should use proper URL forms rather "
403 f"than plain scheme strings. "
404 f'Instead of "{pattern}", use "{pattern}://"'
405 )
407 url = URL(pattern)
408 self.pattern = pattern
409 self.scheme = "" if url.scheme == "all" else url.scheme
410 self.host = "" if url.host == "*" else url.host
411 self.port = url.port
412 if not url.host or url.host == "*":
413 self.host_regex: typing.Optional[typing.Pattern[str]] = None
414 elif url.host.startswith("*."):
415 # *.example.com should match "www.example.com", but not "example.com"
416 domain = re.escape(url.host[2:])
417 self.host_regex = re.compile(f"^.+\\.{domain}$")
418 elif url.host.startswith("*"):
419 # *example.com should match "www.example.com" and "example.com"
420 domain = re.escape(url.host[1:])
421 self.host_regex = re.compile(f"^(.+\\.)?{domain}$")
422 else:
423 # example.com should match "example.com" but not "www.example.com"
424 domain = re.escape(url.host)
425 self.host_regex = re.compile(f"^{domain}$")
427 def matches(self, other: "URL") -> bool:
428 if self.scheme and self.scheme != other.scheme:
429 return False
430 if (
431 self.host
432 and self.host_regex is not None
433 and not self.host_regex.match(other.host)
434 ):
435 return False
436 if self.port is not None and self.port != other.port:
437 return False
438 return True
440 @property
441 def priority(self) -> typing.Tuple[int, int, int]:
442 """
443 The priority allows URLPattern instances to be sortable, so that
444 we can match from most specific to least specific.
445 """
446 # URLs with a port should take priority over URLs without a port.
447 port_priority = 0 if self.port is not None else 1
448 # Longer hostnames should match first.
449 host_priority = -len(self.host)
450 # Longer schemes should match first.
451 scheme_priority = -len(self.scheme)
452 return (port_priority, host_priority, scheme_priority)
454 def __hash__(self) -> int:
455 return hash(self.pattern)
457 def __lt__(self, other: "URLPattern") -> bool:
458 return self.priority < other.priority
460 def __eq__(self, other: typing.Any) -> bool:
461 return isinstance(other, URLPattern) and self.pattern == other.pattern
464def is_ipv4_hostname(hostname: str) -> bool:
465 try:
466 ipaddress.IPv4Address(hostname.split("/")[0])
467 except Exception:
468 return False
469 return True
472def is_ipv6_hostname(hostname: str) -> bool:
473 try:
474 ipaddress.IPv6Address(hostname.split("/")[0])
475 except Exception:
476 return False
477 return True