Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_auth.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import hashlib
4import os
5import re
6import time
7import typing
8from base64 import b64encode
9from urllib.request import parse_http_list
11from ._exceptions import ProtocolError
12from ._models import Cookies, Request, Response
13from ._utils import to_bytes, to_str, unquote
15if typing.TYPE_CHECKING: # pragma: no cover
16 from hashlib import _Hash
19class Auth:
20 """
21 Base class for all authentication schemes.
23 To implement a custom authentication scheme, subclass `Auth` and override
24 the `.auth_flow()` method.
26 If the authentication scheme does I/O such as disk access or network calls, or uses
27 synchronization primitives such as locks, you should override `.sync_auth_flow()`
28 and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized
29 implementations that will be used by `Client` and `AsyncClient` respectively.
30 """
32 requires_request_body = False
33 requires_response_body = False
35 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
36 """
37 Execute the authentication flow.
39 To dispatch a request, `yield` it:
41 ```
42 yield request
43 ```
45 The client will `.send()` the response back into the flow generator. You can
46 access it like so:
48 ```
49 response = yield request
50 ```
52 A `return` (or reaching the end of the generator) will result in the
53 client returning the last response obtained from the server.
55 You can dispatch as many requests as is necessary.
56 """
57 yield request
59 def sync_auth_flow(
60 self, request: Request
61 ) -> typing.Generator[Request, Response, None]:
62 """
63 Execute the authentication flow synchronously.
65 By default, this defers to `.auth_flow()`. You should override this method
66 when the authentication scheme does I/O and/or uses concurrency primitives.
67 """
68 if self.requires_request_body:
69 request.read()
71 flow = self.auth_flow(request)
72 request = next(flow)
74 while True:
75 response = yield request
76 if self.requires_response_body:
77 response.read()
79 try:
80 request = flow.send(response)
81 except StopIteration:
82 break
84 async def async_auth_flow(
85 self, request: Request
86 ) -> typing.AsyncGenerator[Request, Response]:
87 """
88 Execute the authentication flow asynchronously.
90 By default, this defers to `.auth_flow()`. You should override this method
91 when the authentication scheme does I/O and/or uses concurrency primitives.
92 """
93 if self.requires_request_body:
94 await request.aread()
96 flow = self.auth_flow(request)
97 request = next(flow)
99 while True:
100 response = yield request
101 if self.requires_response_body:
102 await response.aread()
104 try:
105 request = flow.send(response)
106 except StopIteration:
107 break
110class FunctionAuth(Auth):
111 """
112 Allows the 'auth' argument to be passed as a simple callable function,
113 that takes the request, and returns a new, modified request.
114 """
116 def __init__(self, func: typing.Callable[[Request], Request]) -> None:
117 self._func = func
119 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
120 yield self._func(request)
123class BasicAuth(Auth):
124 """
125 Allows the 'auth' argument to be passed as a (username, password) pair,
126 and uses HTTP Basic authentication.
127 """
129 def __init__(self, username: str | bytes, password: str | bytes) -> None:
130 self._auth_header = self._build_auth_header(username, password)
132 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
133 request.headers["Authorization"] = self._auth_header
134 yield request
136 def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
137 userpass = b":".join((to_bytes(username), to_bytes(password)))
138 token = b64encode(userpass).decode()
139 return f"Basic {token}"
142class NetRCAuth(Auth):
143 """
144 Use a 'netrc' file to lookup basic auth credentials based on the url host.
145 """
147 def __init__(self, file: str | None = None) -> None:
148 # Lazily import 'netrc'.
149 # There's no need for us to load this module unless 'NetRCAuth' is being used.
150 import netrc
152 self._netrc_info = netrc.netrc(file)
154 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
155 auth_info = self._netrc_info.authenticators(request.url.host)
156 if auth_info is None or not auth_info[2]:
157 # The netrc file did not have authentication credentials for this host.
158 yield request
159 else:
160 # Build a basic auth header with credentials from the netrc file.
161 request.headers["Authorization"] = self._build_auth_header(
162 username=auth_info[0], password=auth_info[2]
163 )
164 yield request
166 def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
167 userpass = b":".join((to_bytes(username), to_bytes(password)))
168 token = b64encode(userpass).decode()
169 return f"Basic {token}"
172class DigestAuth(Auth):
173 _ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = {
174 "MD5": hashlib.md5,
175 "MD5-SESS": hashlib.md5,
176 "SHA": hashlib.sha1,
177 "SHA-SESS": hashlib.sha1,
178 "SHA-256": hashlib.sha256,
179 "SHA-256-SESS": hashlib.sha256,
180 "SHA-512": hashlib.sha512,
181 "SHA-512-SESS": hashlib.sha512,
182 }
184 def __init__(self, username: str | bytes, password: str | bytes) -> None:
185 self._username = to_bytes(username)
186 self._password = to_bytes(password)
187 self._last_challenge: _DigestAuthChallenge | None = None
188 self._nonce_count = 1
190 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
191 if self._last_challenge:
192 request.headers["Authorization"] = self._build_auth_header(
193 request, self._last_challenge
194 )
196 response = yield request
198 if response.status_code != 401 or "www-authenticate" not in response.headers:
199 # If the response is not a 401 then we don't
200 # need to build an authenticated request.
201 return
203 for auth_header in response.headers.get_list("www-authenticate"):
204 if auth_header.lower().startswith("digest "):
205 break
206 else:
207 # If the response does not include a 'WWW-Authenticate: Digest ...'
208 # header, then we don't need to build an authenticated request.
209 return
211 self._last_challenge = self._parse_challenge(request, response, auth_header)
212 self._nonce_count = 1
214 request.headers["Authorization"] = self._build_auth_header(
215 request, self._last_challenge
216 )
217 if response.cookies:
218 Cookies(response.cookies).set_cookie_header(request=request)
219 yield request
221 def _parse_challenge(
222 self, request: Request, response: Response, auth_header: str
223 ) -> _DigestAuthChallenge:
224 """
225 Returns a challenge from a Digest WWW-Authenticate header.
226 These take the form of:
227 `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"`
228 """
229 scheme, _, fields = auth_header.partition(" ")
231 # This method should only ever have been called with a Digest auth header.
232 assert scheme.lower() == "digest"
234 header_dict: dict[str, str] = {}
235 for field in parse_http_list(fields):
236 key, value = field.strip().split("=", 1)
237 header_dict[key] = unquote(value)
239 try:
240 realm = header_dict["realm"].encode()
241 nonce = header_dict["nonce"].encode()
242 algorithm = header_dict.get("algorithm", "MD5")
243 opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None
244 qop = header_dict["qop"].encode() if "qop" in header_dict else None
245 return _DigestAuthChallenge(
246 realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop
247 )
248 except KeyError as exc:
249 message = "Malformed Digest WWW-Authenticate header"
250 raise ProtocolError(message, request=request) from exc
252 def _build_auth_header(
253 self, request: Request, challenge: _DigestAuthChallenge
254 ) -> str:
255 hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]
257 def digest(data: bytes) -> bytes:
258 return hash_func(data).hexdigest().encode()
260 A1 = b":".join((self._username, challenge.realm, self._password))
262 path = request.url.raw_path
263 A2 = b":".join((request.method.encode(), path))
264 # TODO: implement auth-int
265 HA2 = digest(A2)
267 nc_value = b"%08x" % self._nonce_count
268 cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce)
269 self._nonce_count += 1
271 HA1 = digest(A1)
272 if challenge.algorithm.lower().endswith("-sess"):
273 HA1 = digest(b":".join((HA1, challenge.nonce, cnonce)))
275 qop = self._resolve_qop(challenge.qop, request=request)
276 if qop is None:
277 # Following RFC 2069
278 digest_data = [HA1, challenge.nonce, HA2]
279 else:
280 # Following RFC 2617/7616
281 digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2]
283 format_args = {
284 "username": self._username,
285 "realm": challenge.realm,
286 "nonce": challenge.nonce,
287 "uri": path,
288 "response": digest(b":".join(digest_data)),
289 "algorithm": challenge.algorithm.encode(),
290 }
291 if challenge.opaque:
292 format_args["opaque"] = challenge.opaque
293 if qop:
294 format_args["qop"] = b"auth"
295 format_args["nc"] = nc_value
296 format_args["cnonce"] = cnonce
298 return "Digest " + self._get_header_value(format_args)
300 def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes:
301 s = str(nonce_count).encode()
302 s += nonce
303 s += time.ctime().encode()
304 s += os.urandom(8)
306 return hashlib.sha1(s).hexdigest()[:16].encode()
308 def _get_header_value(self, header_fields: dict[str, bytes]) -> str:
309 NON_QUOTED_FIELDS = ("algorithm", "qop", "nc")
310 QUOTED_TEMPLATE = '{}="{}"'
311 NON_QUOTED_TEMPLATE = "{}={}"
313 header_value = ""
314 for i, (field, value) in enumerate(header_fields.items()):
315 if i > 0:
316 header_value += ", "
317 template = (
318 QUOTED_TEMPLATE
319 if field not in NON_QUOTED_FIELDS
320 else NON_QUOTED_TEMPLATE
321 )
322 header_value += template.format(field, to_str(value))
324 return header_value
326 def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None:
327 if qop is None:
328 return None
329 qops = re.split(b", ?", qop)
330 if b"auth" in qops:
331 return b"auth"
333 if qops == [b"auth-int"]:
334 raise NotImplementedError("Digest auth-int support is not yet implemented")
336 message = f'Unexpected qop value "{qop!r}" in digest auth'
337 raise ProtocolError(message, request=request)
340class _DigestAuthChallenge(typing.NamedTuple):
341 realm: bytes
342 nonce: bytes
343 algorithm: str
344 opaque: bytes | None
345 qop: bytes | None