Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_utils.py: 29%
259 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import codecs
2import email.message
3import logging
4import mimetypes
5import netrc
6import os
7import re
8import sys
9import time
10import typing
11from pathlib import Path
12from urllib.request import getproxies
14import sniffio
16from ._types import PrimitiveData
18if typing.TYPE_CHECKING: # pragma: no cover
19 from ._urls import URL
22_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"}
23_HTML5_FORM_ENCODING_REPLACEMENTS.update(
24 {chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B}
25)
26_HTML5_FORM_ENCODING_RE = re.compile(
27 r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()])
28)
31def normalize_header_key(
32 value: typing.Union[str, bytes],
33 lower: bool,
34 encoding: typing.Optional[str] = None,
35) -> bytes:
36 """
37 Coerce str/bytes into a strictly byte-wise HTTP header key.
38 """
39 if isinstance(value, bytes):
40 bytes_value = value
41 else:
42 bytes_value = value.encode(encoding or "ascii")
44 return bytes_value.lower() if lower else bytes_value
47def normalize_header_value(
48 value: typing.Union[str, bytes], encoding: typing.Optional[str] = None
49) -> bytes:
50 """
51 Coerce str/bytes into a strictly byte-wise HTTP header value.
52 """
53 if isinstance(value, bytes):
54 return value
55 return value.encode(encoding or "ascii")
58def primitive_value_to_str(value: "PrimitiveData") -> str:
59 """
60 Coerce a primitive data type into a string value.
62 Note that we prefer JSON-style 'true'/'false' for boolean values here.
63 """
64 if value is True:
65 return "true"
66 elif value is False:
67 return "false"
68 elif value is None:
69 return ""
70 return str(value)
73def is_known_encoding(encoding: str) -> bool:
74 """
75 Return `True` if `encoding` is a known codec.
76 """
77 try:
78 codecs.lookup(encoding)
79 except LookupError:
80 return False
81 return True
84def format_form_param(name: str, value: str) -> bytes:
85 """
86 Encode a name/value pair within a multipart form.
87 """
89 def replacer(match: typing.Match[str]) -> str:
90 return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)]
92 value = _HTML5_FORM_ENCODING_RE.sub(replacer, value)
93 return f'{name}="{value}"'.encode()
96# Null bytes; no need to recreate these on each call to guess_json_utf
97_null = b"\x00"
98_null2 = _null * 2
99_null3 = _null * 3
102def guess_json_utf(data: bytes) -> typing.Optional[str]:
103 # JSON always starts with two ASCII characters, so detection is as
104 # easy as counting the nulls and from their location and count
105 # determine the encoding. Also detect a BOM, if present.
106 sample = data[:4]
107 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
108 return "utf-32" # BOM included
109 if sample[:3] == codecs.BOM_UTF8:
110 return "utf-8-sig" # BOM included, MS style (discouraged)
111 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
112 return "utf-16" # BOM included
113 nullcount = sample.count(_null)
114 if nullcount == 0:
115 return "utf-8"
116 if nullcount == 2:
117 if sample[::2] == _null2: # 1st and 3rd are null
118 return "utf-16-be"
119 if sample[1::2] == _null2: # 2nd and 4th are null
120 return "utf-16-le"
121 # Did not detect 2 valid UTF-16 ascii-range characters
122 if nullcount == 3:
123 if sample[:3] == _null3:
124 return "utf-32-be"
125 if sample[1:] == _null3:
126 return "utf-32-le"
127 # Did not detect a valid UTF-32 ascii-range character
128 return None
131class NetRCInfo:
132 def __init__(self, files: typing.Optional[typing.List[str]] = None) -> None:
133 if files is None:
134 files = [os.getenv("NETRC", ""), "~/.netrc", "~/_netrc"]
135 self.netrc_files = files
137 @property
138 def netrc_info(self) -> typing.Optional[netrc.netrc]:
139 if not hasattr(self, "_netrc_info"):
140 self._netrc_info = None
141 for file_path in self.netrc_files:
142 expanded_path = Path(file_path).expanduser()
143 try:
144 if expanded_path.is_file():
145 self._netrc_info = netrc.netrc(str(expanded_path))
146 break
147 except (netrc.NetrcParseError, IOError): # pragma: no cover
148 # Issue while reading the netrc file, ignore...
149 pass
150 return self._netrc_info
152 def get_credentials(self, host: str) -> typing.Optional[typing.Tuple[str, str]]:
153 if self.netrc_info is None:
154 return None
156 auth_info = self.netrc_info.authenticators(host)
157 if auth_info is None or auth_info[2] is None:
158 return None
159 return (auth_info[0], auth_info[2])
162def get_ca_bundle_from_env() -> typing.Optional[str]:
163 if "SSL_CERT_FILE" in os.environ:
164 ssl_file = Path(os.environ["SSL_CERT_FILE"])
165 if ssl_file.is_file():
166 return str(ssl_file)
167 if "SSL_CERT_DIR" in os.environ:
168 ssl_path = Path(os.environ["SSL_CERT_DIR"])
169 if ssl_path.is_dir():
170 return str(ssl_path)
171 return None
174def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]:
175 """
176 Returns a list of parsed link headers, for more info see:
177 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
178 The generic syntax of those is:
179 Link: < uri-reference >; param1=value1; param2="value2"
180 So for instance:
181 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
182 would return
183 [
184 {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
185 {"url": "http://.../back.jpeg"},
186 ]
187 :param value: HTTP Link entity-header field
188 :return: list of parsed link headers
189 """
190 links: typing.List[typing.Dict[str, str]] = []
191 replace_chars = " '\""
192 value = value.strip(replace_chars)
193 if not value:
194 return links
195 for val in re.split(", *<", value):
196 try:
197 url, params = val.split(";", 1)
198 except ValueError:
199 url, params = val, ""
200 link = {"url": url.strip("<> '\"")}
201 for param in params.split(";"):
202 try:
203 key, value = param.split("=")
204 except ValueError:
205 break
206 link[key.strip(replace_chars)] = value.strip(replace_chars)
207 links.append(link)
208 return links
211def parse_content_type_charset(content_type: str) -> typing.Optional[str]:
212 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
213 # See: https://peps.python.org/pep-0594/#cgi
214 msg = email.message.Message()
215 msg["content-type"] = content_type
216 return msg.get_content_charset(failobj=None)
219SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
222def obfuscate_sensitive_headers(
223 items: typing.Iterable[typing.Tuple[typing.AnyStr, typing.AnyStr]]
224) -> typing.Iterator[typing.Tuple[typing.AnyStr, typing.AnyStr]]:
225 for k, v in items:
226 if to_str(k.lower()) in SENSITIVE_HEADERS:
227 v = to_bytes_or_str("[secure]", match_type_of=v)
228 yield k, v
231_LOGGER_INITIALIZED = False
232TRACE_LOG_LEVEL = 5
235class Logger(logging.Logger):
236 # Stub for type checkers.
237 def trace(self, message: str, *args: typing.Any, **kwargs: typing.Any) -> None:
238 ... # pragma: no cover
241def get_logger(name: str) -> Logger:
242 """
243 Get a `logging.Logger` instance, and optionally
244 set up debug logging based on the HTTPX_LOG_LEVEL environment variable.
245 """
246 global _LOGGER_INITIALIZED
248 if not _LOGGER_INITIALIZED:
249 _LOGGER_INITIALIZED = True
250 logging.addLevelName(TRACE_LOG_LEVEL, "TRACE")
252 log_level = os.environ.get("HTTPX_LOG_LEVEL", "").upper()
253 if log_level in ("DEBUG", "TRACE"):
254 logger = logging.getLogger("httpx")
255 logger.setLevel(logging.DEBUG if log_level == "DEBUG" else TRACE_LOG_LEVEL)
256 handler = logging.StreamHandler(sys.stderr)
257 handler.setFormatter(
258 logging.Formatter(
259 fmt="%(levelname)s [%(asctime)s] %(name)s - %(message)s",
260 datefmt="%Y-%m-%d %H:%M:%S",
261 )
262 )
263 logger.addHandler(handler)
265 logger = logging.getLogger(name)
267 def trace(message: str, *args: typing.Any, **kwargs: typing.Any) -> None:
268 logger.log(TRACE_LOG_LEVEL, message, *args, **kwargs)
270 logger.trace = trace # type: ignore
272 return typing.cast(Logger, logger)
275def port_or_default(url: "URL") -> typing.Optional[int]:
276 if url.port is not None:
277 return url.port
278 return {"http": 80, "https": 443}.get(url.scheme)
281def same_origin(url: "URL", other: "URL") -> bool:
282 """
283 Return 'True' if the given URLs share the same origin.
284 """
285 return (
286 url.scheme == other.scheme
287 and url.host == other.host
288 and port_or_default(url) == port_or_default(other)
289 )
292def is_https_redirect(url: "URL", location: "URL") -> bool:
293 """
294 Return 'True' if 'location' is a HTTPS upgrade of 'url'
295 """
296 if url.host != location.host:
297 return False
299 return (
300 url.scheme == "http"
301 and port_or_default(url) == 80
302 and location.scheme == "https"
303 and port_or_default(location) == 443
304 )
307def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]:
308 """Gets proxy information from the environment"""
310 # urllib.request.getproxies() falls back on System
311 # Registry and Config for proxies on Windows and macOS.
312 # We don't want to propagate non-HTTP proxies into
313 # our configuration such as 'TRAVIS_APT_PROXY'.
314 proxy_info = getproxies()
315 mounts: typing.Dict[str, typing.Optional[str]] = {}
317 for scheme in ("http", "https", "all"):
318 if proxy_info.get(scheme):
319 hostname = proxy_info[scheme]
320 mounts[f"{scheme}://"] = (
321 hostname if "://" in hostname else f"http://{hostname}"
322 )
324 no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")]
325 for hostname in no_proxy_hosts:
326 # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details
327 # on how names in `NO_PROXY` are handled.
328 if hostname == "*":
329 # If NO_PROXY=* is used or if "*" occurs as any one of the comma
330 # separated hostnames, then we should just bypass any information
331 # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore
332 # proxies.
333 return {}
334 elif hostname:
335 # NO_PROXY=.google.com is marked as "all://*.google.com,
336 # which disables "www.google.com" but not "google.com"
337 # NO_PROXY=google.com is marked as "all://*google.com,
338 # which disables "www.google.com" and "google.com".
339 # (But not "wwwgoogle.com")
340 mounts[f"all://*{hostname}"] = None
342 return mounts
345def to_bytes(value: typing.Union[str, bytes], encoding: str = "utf-8") -> bytes:
346 return value.encode(encoding) if isinstance(value, str) else value
349def to_str(value: typing.Union[str, bytes], encoding: str = "utf-8") -> str:
350 return value if isinstance(value, str) else value.decode(encoding)
353def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr:
354 return value if isinstance(match_type_of, str) else value.encode()
357def unquote(value: str) -> str:
358 return value[1:-1] if value[0] == value[-1] == '"' else value
361def guess_content_type(filename: typing.Optional[str]) -> typing.Optional[str]:
362 if filename:
363 return mimetypes.guess_type(filename)[0] or "application/octet-stream"
364 return None
367def peek_filelike_length(stream: typing.Any) -> typing.Optional[int]:
368 """
369 Given a file-like stream object, return its length in number of bytes
370 without reading it into memory.
371 """
372 try:
373 # Is it an actual file?
374 fd = stream.fileno()
375 # Yup, seems to be an actual file.
376 length = os.fstat(fd).st_size
377 except (AttributeError, OSError):
378 # No... Maybe it's something that supports random access, like `io.BytesIO`?
379 try:
380 # Assuming so, go to end of stream to figure out its length,
381 # then put it back in place.
382 offset = stream.tell()
383 length = stream.seek(0, os.SEEK_END)
384 stream.seek(offset)
385 except (AttributeError, OSError):
386 # Not even that? Sorry, we're doomed...
387 return None
389 return length
392class Timer:
393 async def _get_time(self) -> float:
394 library = sniffio.current_async_library()
395 if library == "trio":
396 import trio
398 return trio.current_time()
399 elif library == "curio": # pragma: no cover
400 import curio
402 return typing.cast(float, await curio.clock())
404 import asyncio
406 return asyncio.get_event_loop().time()
408 def sync_start(self) -> None:
409 self.started = time.perf_counter()
411 async def async_start(self) -> None:
412 self.started = await self._get_time()
414 def sync_elapsed(self) -> float:
415 now = time.perf_counter()
416 return now - self.started
418 async def async_elapsed(self) -> float:
419 now = await self._get_time()
420 return now - self.started
423class URLPattern:
424 """
425 A utility class currently used for making lookups against proxy keys...
427 # Wildcard matching...
428 >>> pattern = URLPattern("all")
429 >>> pattern.matches(httpx.URL("http://example.com"))
430 True
432 # Witch scheme matching...
433 >>> pattern = URLPattern("https")
434 >>> pattern.matches(httpx.URL("https://example.com"))
435 True
436 >>> pattern.matches(httpx.URL("http://example.com"))
437 False
439 # With domain matching...
440 >>> pattern = URLPattern("https://example.com")
441 >>> pattern.matches(httpx.URL("https://example.com"))
442 True
443 >>> pattern.matches(httpx.URL("http://example.com"))
444 False
445 >>> pattern.matches(httpx.URL("https://other.com"))
446 False
448 # Wildcard scheme, with domain matching...
449 >>> pattern = URLPattern("all://example.com")
450 >>> pattern.matches(httpx.URL("https://example.com"))
451 True
452 >>> pattern.matches(httpx.URL("http://example.com"))
453 True
454 >>> pattern.matches(httpx.URL("https://other.com"))
455 False
457 # With port matching...
458 >>> pattern = URLPattern("https://example.com:1234")
459 >>> pattern.matches(httpx.URL("https://example.com:1234"))
460 True
461 >>> pattern.matches(httpx.URL("https://example.com"))
462 False
463 """
465 def __init__(self, pattern: str) -> None:
466 from ._urls import URL
468 if pattern and ":" not in pattern:
469 raise ValueError(
470 f"Proxy keys should use proper URL forms rather "
471 f"than plain scheme strings. "
472 f'Instead of "{pattern}", use "{pattern}://"'
473 )
475 url = URL(pattern)
476 self.pattern = pattern
477 self.scheme = "" if url.scheme == "all" else url.scheme
478 self.host = "" if url.host == "*" else url.host
479 self.port = url.port
480 if not url.host or url.host == "*":
481 self.host_regex: typing.Optional[typing.Pattern[str]] = None
482 elif url.host.startswith("*."):
483 # *.example.com should match "www.example.com", but not "example.com"
484 domain = re.escape(url.host[2:])
485 self.host_regex = re.compile(f"^.+\\.{domain}$")
486 elif url.host.startswith("*"):
487 # *example.com should match "www.example.com" and "example.com"
488 domain = re.escape(url.host[1:])
489 self.host_regex = re.compile(f"^(.+\\.)?{domain}$")
490 else:
491 # example.com should match "example.com" but not "www.example.com"
492 domain = re.escape(url.host)
493 self.host_regex = re.compile(f"^{domain}$")
495 def matches(self, other: "URL") -> bool:
496 if self.scheme and self.scheme != other.scheme:
497 return False
498 if (
499 self.host
500 and self.host_regex is not None
501 and not self.host_regex.match(other.host)
502 ):
503 return False
504 if self.port is not None and self.port != other.port:
505 return False
506 return True
508 @property
509 def priority(self) -> typing.Tuple[int, int, int]:
510 """
511 The priority allows URLPattern instances to be sortable, so that
512 we can match from most specific to least specific.
513 """
514 # URLs with a port should take priority over URLs without a port.
515 port_priority = 0 if self.port is not None else 1
516 # Longer hostnames should match first.
517 host_priority = -len(self.host)
518 # Longer schemes should match first.
519 scheme_priority = -len(self.scheme)
520 return (port_priority, host_priority, scheme_priority)
522 def __hash__(self) -> int:
523 return hash(self.pattern)
525 def __lt__(self, other: "URLPattern") -> bool:
526 return self.priority < other.priority
528 def __eq__(self, other: typing.Any) -> bool:
529 return isinstance(other, URLPattern) and self.pattern == other.pattern