Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_utils.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import codecs
4import email.message
5import ipaddress
6import mimetypes
7import os
8import re
9import time
10import typing
11from pathlib import Path
12from urllib.request import getproxies
14import sniffio
16from ._types import PrimitiveData
18if typing.TYPE_CHECKING: # pragma: no cover
19 from ._urls import URL
22_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"}
23_HTML5_FORM_ENCODING_REPLACEMENTS.update(
24 {chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B}
25)
26_HTML5_FORM_ENCODING_RE = re.compile(
27 r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()])
28)
31def normalize_header_key(
32 value: str | bytes,
33 lower: bool,
34 encoding: str | None = None,
35) -> bytes:
36 """
37 Coerce str/bytes into a strictly byte-wise HTTP header key.
38 """
39 if isinstance(value, bytes):
40 bytes_value = value
41 else:
42 bytes_value = value.encode(encoding or "ascii")
44 return bytes_value.lower() if lower else bytes_value
47def normalize_header_value(value: str | bytes, encoding: str | None = None) -> bytes:
48 """
49 Coerce str/bytes into a strictly byte-wise HTTP header value.
50 """
51 if isinstance(value, bytes):
52 return value
53 if not isinstance(value, str):
54 raise TypeError(f"Header value must be str or bytes, not {type(value)}")
55 return value.encode(encoding or "ascii")
58def primitive_value_to_str(value: PrimitiveData) -> str:
59 """
60 Coerce a primitive data type into a string value.
62 Note that we prefer JSON-style 'true'/'false' for boolean values here.
63 """
64 if value is True:
65 return "true"
66 elif value is False:
67 return "false"
68 elif value is None:
69 return ""
70 return str(value)
73def is_known_encoding(encoding: str) -> bool:
74 """
75 Return `True` if `encoding` is a known codec.
76 """
77 try:
78 codecs.lookup(encoding)
79 except LookupError:
80 return False
81 return True
84def format_form_param(name: str, value: str) -> bytes:
85 """
86 Encode a name/value pair within a multipart form.
87 """
89 def replacer(match: typing.Match[str]) -> str:
90 return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)]
92 value = _HTML5_FORM_ENCODING_RE.sub(replacer, value)
93 return f'{name}="{value}"'.encode()
96def get_ca_bundle_from_env() -> str | None:
97 if "SSL_CERT_FILE" in os.environ:
98 ssl_file = Path(os.environ["SSL_CERT_FILE"])
99 if ssl_file.is_file():
100 return str(ssl_file)
101 if "SSL_CERT_DIR" in os.environ:
102 ssl_path = Path(os.environ["SSL_CERT_DIR"])
103 if ssl_path.is_dir():
104 return str(ssl_path)
105 return None
108def parse_header_links(value: str) -> list[dict[str, str]]:
109 """
110 Returns a list of parsed link headers, for more info see:
111 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
112 The generic syntax of those is:
113 Link: < uri-reference >; param1=value1; param2="value2"
114 So for instance:
115 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
116 would return
117 [
118 {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
119 {"url": "http://.../back.jpeg"},
120 ]
121 :param value: HTTP Link entity-header field
122 :return: list of parsed link headers
123 """
124 links: list[dict[str, str]] = []
125 replace_chars = " '\""
126 value = value.strip(replace_chars)
127 if not value:
128 return links
129 for val in re.split(", *<", value):
130 try:
131 url, params = val.split(";", 1)
132 except ValueError:
133 url, params = val, ""
134 link = {"url": url.strip("<> '\"")}
135 for param in params.split(";"):
136 try:
137 key, value = param.split("=")
138 except ValueError:
139 break
140 link[key.strip(replace_chars)] = value.strip(replace_chars)
141 links.append(link)
142 return links
145def parse_content_type_charset(content_type: str) -> str | None:
146 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
147 # See: https://peps.python.org/pep-0594/#cgi
148 msg = email.message.Message()
149 msg["content-type"] = content_type
150 return msg.get_content_charset(failobj=None)
153SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
156def obfuscate_sensitive_headers(
157 items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
158) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
159 for k, v in items:
160 if to_str(k.lower()) in SENSITIVE_HEADERS:
161 v = to_bytes_or_str("[secure]", match_type_of=v)
162 yield k, v
165def port_or_default(url: URL) -> int | None:
166 if url.port is not None:
167 return url.port
168 return {"http": 80, "https": 443}.get(url.scheme)
171def same_origin(url: URL, other: URL) -> bool:
172 """
173 Return 'True' if the given URLs share the same origin.
174 """
175 return (
176 url.scheme == other.scheme
177 and url.host == other.host
178 and port_or_default(url) == port_or_default(other)
179 )
182def is_https_redirect(url: URL, location: URL) -> bool:
183 """
184 Return 'True' if 'location' is a HTTPS upgrade of 'url'
185 """
186 if url.host != location.host:
187 return False
189 return (
190 url.scheme == "http"
191 and port_or_default(url) == 80
192 and location.scheme == "https"
193 and port_or_default(location) == 443
194 )
197def get_environment_proxies() -> dict[str, str | None]:
198 """Gets proxy information from the environment"""
200 # urllib.request.getproxies() falls back on System
201 # Registry and Config for proxies on Windows and macOS.
202 # We don't want to propagate non-HTTP proxies into
203 # our configuration such as 'TRAVIS_APT_PROXY'.
204 proxy_info = getproxies()
205 mounts: dict[str, str | None] = {}
207 for scheme in ("http", "https", "all"):
208 if proxy_info.get(scheme):
209 hostname = proxy_info[scheme]
210 mounts[f"{scheme}://"] = (
211 hostname if "://" in hostname else f"http://{hostname}"
212 )
214 no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")]
215 for hostname in no_proxy_hosts:
216 # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details
217 # on how names in `NO_PROXY` are handled.
218 if hostname == "*":
219 # If NO_PROXY=* is used or if "*" occurs as any one of the comma
220 # separated hostnames, then we should just bypass any information
221 # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore
222 # proxies.
223 return {}
224 elif hostname:
225 # NO_PROXY=.google.com is marked as "all://*.google.com,
226 # which disables "www.google.com" but not "google.com"
227 # NO_PROXY=google.com is marked as "all://*google.com,
228 # which disables "www.google.com" and "google.com".
229 # (But not "wwwgoogle.com")
230 # NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost"
231 # NO_PROXY=example.com,::1,localhost,192.168.0.0/16
232 if "://" in hostname:
233 mounts[hostname] = None
234 elif is_ipv4_hostname(hostname):
235 mounts[f"all://{hostname}"] = None
236 elif is_ipv6_hostname(hostname):
237 mounts[f"all://[{hostname}]"] = None
238 elif hostname.lower() == "localhost":
239 mounts[f"all://{hostname}"] = None
240 else:
241 mounts[f"all://*{hostname}"] = None
243 return mounts
246def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes:
247 return value.encode(encoding) if isinstance(value, str) else value
250def to_str(value: str | bytes, encoding: str = "utf-8") -> str:
251 return value if isinstance(value, str) else value.decode(encoding)
254def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr:
255 return value if isinstance(match_type_of, str) else value.encode()
258def unquote(value: str) -> str:
259 return value[1:-1] if value[0] == value[-1] == '"' else value
262def guess_content_type(filename: str | None) -> str | None:
263 if filename:
264 return mimetypes.guess_type(filename)[0] or "application/octet-stream"
265 return None
268def peek_filelike_length(stream: typing.Any) -> int | None:
269 """
270 Given a file-like stream object, return its length in number of bytes
271 without reading it into memory.
272 """
273 try:
274 # Is it an actual file?
275 fd = stream.fileno()
276 # Yup, seems to be an actual file.
277 length = os.fstat(fd).st_size
278 except (AttributeError, OSError):
279 # No... Maybe it's something that supports random access, like `io.BytesIO`?
280 try:
281 # Assuming so, go to end of stream to figure out its length,
282 # then put it back in place.
283 offset = stream.tell()
284 length = stream.seek(0, os.SEEK_END)
285 stream.seek(offset)
286 except (AttributeError, OSError):
287 # Not even that? Sorry, we're doomed...
288 return None
290 return length
293class Timer:
294 async def _get_time(self) -> float:
295 library = sniffio.current_async_library()
296 if library == "trio":
297 import trio
299 return trio.current_time()
300 else:
301 import asyncio
303 return asyncio.get_event_loop().time()
305 def sync_start(self) -> None:
306 self.started = time.perf_counter()
308 async def async_start(self) -> None:
309 self.started = await self._get_time()
311 def sync_elapsed(self) -> float:
312 now = time.perf_counter()
313 return now - self.started
315 async def async_elapsed(self) -> float:
316 now = await self._get_time()
317 return now - self.started
320class URLPattern:
321 """
322 A utility class currently used for making lookups against proxy keys...
324 # Wildcard matching...
325 >>> pattern = URLPattern("all://")
326 >>> pattern.matches(httpx.URL("http://example.com"))
327 True
329 # Witch scheme matching...
330 >>> pattern = URLPattern("https://")
331 >>> pattern.matches(httpx.URL("https://example.com"))
332 True
333 >>> pattern.matches(httpx.URL("http://example.com"))
334 False
336 # With domain matching...
337 >>> pattern = URLPattern("https://example.com")
338 >>> pattern.matches(httpx.URL("https://example.com"))
339 True
340 >>> pattern.matches(httpx.URL("http://example.com"))
341 False
342 >>> pattern.matches(httpx.URL("https://other.com"))
343 False
345 # Wildcard scheme, with domain matching...
346 >>> pattern = URLPattern("all://example.com")
347 >>> pattern.matches(httpx.URL("https://example.com"))
348 True
349 >>> pattern.matches(httpx.URL("http://example.com"))
350 True
351 >>> pattern.matches(httpx.URL("https://other.com"))
352 False
354 # With port matching...
355 >>> pattern = URLPattern("https://example.com:1234")
356 >>> pattern.matches(httpx.URL("https://example.com:1234"))
357 True
358 >>> pattern.matches(httpx.URL("https://example.com"))
359 False
360 """
362 def __init__(self, pattern: str) -> None:
363 from ._urls import URL
365 if pattern and ":" not in pattern:
366 raise ValueError(
367 f"Proxy keys should use proper URL forms rather "
368 f"than plain scheme strings. "
369 f'Instead of "{pattern}", use "{pattern}://"'
370 )
372 url = URL(pattern)
373 self.pattern = pattern
374 self.scheme = "" if url.scheme == "all" else url.scheme
375 self.host = "" if url.host == "*" else url.host
376 self.port = url.port
377 if not url.host or url.host == "*":
378 self.host_regex: typing.Pattern[str] | None = None
379 elif url.host.startswith("*."):
380 # *.example.com should match "www.example.com", but not "example.com"
381 domain = re.escape(url.host[2:])
382 self.host_regex = re.compile(f"^.+\\.{domain}$")
383 elif url.host.startswith("*"):
384 # *example.com should match "www.example.com" and "example.com"
385 domain = re.escape(url.host[1:])
386 self.host_regex = re.compile(f"^(.+\\.)?{domain}$")
387 else:
388 # example.com should match "example.com" but not "www.example.com"
389 domain = re.escape(url.host)
390 self.host_regex = re.compile(f"^{domain}$")
392 def matches(self, other: URL) -> bool:
393 if self.scheme and self.scheme != other.scheme:
394 return False
395 if (
396 self.host
397 and self.host_regex is not None
398 and not self.host_regex.match(other.host)
399 ):
400 return False
401 if self.port is not None and self.port != other.port:
402 return False
403 return True
405 @property
406 def priority(self) -> tuple[int, int, int]:
407 """
408 The priority allows URLPattern instances to be sortable, so that
409 we can match from most specific to least specific.
410 """
411 # URLs with a port should take priority over URLs without a port.
412 port_priority = 0 if self.port is not None else 1
413 # Longer hostnames should match first.
414 host_priority = -len(self.host)
415 # Longer schemes should match first.
416 scheme_priority = -len(self.scheme)
417 return (port_priority, host_priority, scheme_priority)
419 def __hash__(self) -> int:
420 return hash(self.pattern)
422 def __lt__(self, other: URLPattern) -> bool:
423 return self.priority < other.priority
425 def __eq__(self, other: typing.Any) -> bool:
426 return isinstance(other, URLPattern) and self.pattern == other.pattern
429def is_ipv4_hostname(hostname: str) -> bool:
430 try:
431 ipaddress.IPv4Address(hostname.split("/")[0])
432 except Exception:
433 return False
434 return True
437def is_ipv6_hostname(hostname: str) -> bool:
438 try:
439 ipaddress.IPv6Address(hostname.split("/")[0])
440 except Exception:
441 return False
442 return True