Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_auth.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import hashlib
4import os
5import re
6import time
7import typing
8from base64 import b64encode
9from urllib.request import parse_http_list
11from ._exceptions import ProtocolError
12from ._models import Cookies, Request, Response
13from ._utils import to_bytes, to_str, unquote
15if typing.TYPE_CHECKING: # pragma: no cover
16 from hashlib import _Hash
19__all__ = ["Auth", "BasicAuth", "DigestAuth", "NetRCAuth"]
22class Auth:
23 """
24 Base class for all authentication schemes.
26 To implement a custom authentication scheme, subclass `Auth` and override
27 the `.auth_flow()` method.
29 If the authentication scheme does I/O such as disk access or network calls, or uses
30 synchronization primitives such as locks, you should override `.sync_auth_flow()`
31 and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized
32 implementations that will be used by `Client` and `AsyncClient` respectively.
33 """
35 requires_request_body = False
36 requires_response_body = False
38 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
39 """
40 Execute the authentication flow.
42 To dispatch a request, `yield` it:
44 ```
45 yield request
46 ```
48 The client will `.send()` the response back into the flow generator. You can
49 access it like so:
51 ```
52 response = yield request
53 ```
55 A `return` (or reaching the end of the generator) will result in the
56 client returning the last response obtained from the server.
58 You can dispatch as many requests as is necessary.
59 """
60 yield request
62 def sync_auth_flow(
63 self, request: Request
64 ) -> typing.Generator[Request, Response, None]:
65 """
66 Execute the authentication flow synchronously.
68 By default, this defers to `.auth_flow()`. You should override this method
69 when the authentication scheme does I/O and/or uses concurrency primitives.
70 """
71 if self.requires_request_body:
72 request.read()
74 flow = self.auth_flow(request)
75 request = next(flow)
77 while True:
78 response = yield request
79 if self.requires_response_body:
80 response.read()
82 try:
83 request = flow.send(response)
84 except StopIteration:
85 break
87 async def async_auth_flow(
88 self, request: Request
89 ) -> typing.AsyncGenerator[Request, Response]:
90 """
91 Execute the authentication flow asynchronously.
93 By default, this defers to `.auth_flow()`. You should override this method
94 when the authentication scheme does I/O and/or uses concurrency primitives.
95 """
96 if self.requires_request_body:
97 await request.aread()
99 flow = self.auth_flow(request)
100 request = next(flow)
102 while True:
103 response = yield request
104 if self.requires_response_body:
105 await response.aread()
107 try:
108 request = flow.send(response)
109 except StopIteration:
110 break
113class FunctionAuth(Auth):
114 """
115 Allows the 'auth' argument to be passed as a simple callable function,
116 that takes the request, and returns a new, modified request.
117 """
119 def __init__(self, func: typing.Callable[[Request], Request]) -> None:
120 self._func = func
122 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
123 yield self._func(request)
126class BasicAuth(Auth):
127 """
128 Allows the 'auth' argument to be passed as a (username, password) pair,
129 and uses HTTP Basic authentication.
130 """
132 def __init__(self, username: str | bytes, password: str | bytes) -> None:
133 self._auth_header = self._build_auth_header(username, password)
135 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
136 request.headers["Authorization"] = self._auth_header
137 yield request
139 def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
140 userpass = b":".join((to_bytes(username), to_bytes(password)))
141 token = b64encode(userpass).decode()
142 return f"Basic {token}"
145class NetRCAuth(Auth):
146 """
147 Use a 'netrc' file to lookup basic auth credentials based on the url host.
148 """
150 def __init__(self, file: str | None = None) -> None:
151 # Lazily import 'netrc'.
152 # There's no need for us to load this module unless 'NetRCAuth' is being used.
153 import netrc
155 self._netrc_info = netrc.netrc(file)
157 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
158 auth_info = self._netrc_info.authenticators(request.url.host)
159 if auth_info is None or not auth_info[2]:
160 # The netrc file did not have authentication credentials for this host.
161 yield request
162 else:
163 # Build a basic auth header with credentials from the netrc file.
164 request.headers["Authorization"] = self._build_auth_header(
165 username=auth_info[0], password=auth_info[2]
166 )
167 yield request
169 def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
170 userpass = b":".join((to_bytes(username), to_bytes(password)))
171 token = b64encode(userpass).decode()
172 return f"Basic {token}"
175class DigestAuth(Auth):
176 _ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = {
177 "MD5": hashlib.md5,
178 "MD5-SESS": hashlib.md5,
179 "SHA": hashlib.sha1,
180 "SHA-SESS": hashlib.sha1,
181 "SHA-256": hashlib.sha256,
182 "SHA-256-SESS": hashlib.sha256,
183 "SHA-512": hashlib.sha512,
184 "SHA-512-SESS": hashlib.sha512,
185 }
187 def __init__(self, username: str | bytes, password: str | bytes) -> None:
188 self._username = to_bytes(username)
189 self._password = to_bytes(password)
190 self._last_challenge: _DigestAuthChallenge | None = None
191 self._nonce_count = 1
193 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
194 if self._last_challenge:
195 request.headers["Authorization"] = self._build_auth_header(
196 request, self._last_challenge
197 )
199 response = yield request
201 if response.status_code != 401 or "www-authenticate" not in response.headers:
202 # If the response is not a 401 then we don't
203 # need to build an authenticated request.
204 return
206 for auth_header in response.headers.get_list("www-authenticate"):
207 if auth_header.lower().startswith("digest "):
208 break
209 else:
210 # If the response does not include a 'WWW-Authenticate: Digest ...'
211 # header, then we don't need to build an authenticated request.
212 return
214 self._last_challenge = self._parse_challenge(request, response, auth_header)
215 self._nonce_count = 1
217 request.headers["Authorization"] = self._build_auth_header(
218 request, self._last_challenge
219 )
220 if response.cookies:
221 Cookies(response.cookies).set_cookie_header(request=request)
222 yield request
224 def _parse_challenge(
225 self, request: Request, response: Response, auth_header: str
226 ) -> _DigestAuthChallenge:
227 """
228 Returns a challenge from a Digest WWW-Authenticate header.
229 These take the form of:
230 `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"`
231 """
232 scheme, _, fields = auth_header.partition(" ")
234 # This method should only ever have been called with a Digest auth header.
235 assert scheme.lower() == "digest"
237 header_dict: dict[str, str] = {}
238 for field in parse_http_list(fields):
239 key, value = field.strip().split("=", 1)
240 header_dict[key] = unquote(value)
242 try:
243 realm = header_dict["realm"].encode()
244 nonce = header_dict["nonce"].encode()
245 algorithm = header_dict.get("algorithm", "MD5")
246 opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None
247 qop = header_dict["qop"].encode() if "qop" in header_dict else None
248 return _DigestAuthChallenge(
249 realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop
250 )
251 except KeyError as exc:
252 message = "Malformed Digest WWW-Authenticate header"
253 raise ProtocolError(message, request=request) from exc
255 def _build_auth_header(
256 self, request: Request, challenge: _DigestAuthChallenge
257 ) -> str:
258 hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]
260 def digest(data: bytes) -> bytes:
261 return hash_func(data).hexdigest().encode()
263 A1 = b":".join((self._username, challenge.realm, self._password))
265 path = request.url.raw_path
266 A2 = b":".join((request.method.encode(), path))
267 # TODO: implement auth-int
268 HA2 = digest(A2)
270 nc_value = b"%08x" % self._nonce_count
271 cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce)
272 self._nonce_count += 1
274 HA1 = digest(A1)
275 if challenge.algorithm.lower().endswith("-sess"):
276 HA1 = digest(b":".join((HA1, challenge.nonce, cnonce)))
278 qop = self._resolve_qop(challenge.qop, request=request)
279 if qop is None:
280 # Following RFC 2069
281 digest_data = [HA1, challenge.nonce, HA2]
282 else:
283 # Following RFC 2617/7616
284 digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2]
286 format_args = {
287 "username": self._username,
288 "realm": challenge.realm,
289 "nonce": challenge.nonce,
290 "uri": path,
291 "response": digest(b":".join(digest_data)),
292 "algorithm": challenge.algorithm.encode(),
293 }
294 if challenge.opaque:
295 format_args["opaque"] = challenge.opaque
296 if qop:
297 format_args["qop"] = b"auth"
298 format_args["nc"] = nc_value
299 format_args["cnonce"] = cnonce
301 return "Digest " + self._get_header_value(format_args)
303 def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes:
304 s = str(nonce_count).encode()
305 s += nonce
306 s += time.ctime().encode()
307 s += os.urandom(8)
309 return hashlib.sha1(s).hexdigest()[:16].encode()
311 def _get_header_value(self, header_fields: dict[str, bytes]) -> str:
312 NON_QUOTED_FIELDS = ("algorithm", "qop", "nc")
313 QUOTED_TEMPLATE = '{}="{}"'
314 NON_QUOTED_TEMPLATE = "{}={}"
316 header_value = ""
317 for i, (field, value) in enumerate(header_fields.items()):
318 if i > 0:
319 header_value += ", "
320 template = (
321 QUOTED_TEMPLATE
322 if field not in NON_QUOTED_FIELDS
323 else NON_QUOTED_TEMPLATE
324 )
325 header_value += template.format(field, to_str(value))
327 return header_value
329 def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None:
330 if qop is None:
331 return None
332 qops = re.split(b", ?", qop)
333 if b"auth" in qops:
334 return b"auth"
336 if qops == [b"auth-int"]:
337 raise NotImplementedError("Digest auth-int support is not yet implemented")
339 message = f'Unexpected qop value "{qop!r}" in digest auth'
340 raise ProtocolError(message, request=request)
343class _DigestAuthChallenge(typing.NamedTuple):
344 realm: bytes
345 nonce: bytes
346 algorithm: str
347 opaque: bytes | None
348 qop: bytes | None