Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_auth.py: 25%
152 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import hashlib
2import os
3import re
4import time
5import typing
6from base64 import b64encode
7from urllib.request import parse_http_list
9from ._exceptions import ProtocolError
10from ._models import Request, Response
11from ._utils import to_bytes, to_str, unquote
13if typing.TYPE_CHECKING: # pragma: no cover
14 from hashlib import _Hash
17class Auth:
18 """
19 Base class for all authentication schemes.
21 To implement a custom authentication scheme, subclass `Auth` and override
22 the `.auth_flow()` method.
24 If the authentication scheme does I/O such as disk access or network calls, or uses
25 synchronization primitives such as locks, you should override `.sync_auth_flow()`
26 and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized
27 implementations that will be used by `Client` and `AsyncClient` respectively.
28 """
30 requires_request_body = False
31 requires_response_body = False
33 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
34 """
35 Execute the authentication flow.
37 To dispatch a request, `yield` it:
39 ```
40 yield request
41 ```
43 The client will `.send()` the response back into the flow generator. You can
44 access it like so:
46 ```
47 response = yield request
48 ```
50 A `return` (or reaching the end of the generator) will result in the
51 client returning the last response obtained from the server.
53 You can dispatch as many requests as is necessary.
54 """
55 yield request
57 def sync_auth_flow(
58 self, request: Request
59 ) -> typing.Generator[Request, Response, None]:
60 """
61 Execute the authentication flow synchronously.
63 By default, this defers to `.auth_flow()`. You should override this method
64 when the authentication scheme does I/O and/or uses concurrency primitives.
65 """
66 if self.requires_request_body:
67 request.read()
69 flow = self.auth_flow(request)
70 request = next(flow)
72 while True:
73 response = yield request
74 if self.requires_response_body:
75 response.read()
77 try:
78 request = flow.send(response)
79 except StopIteration:
80 break
82 async def async_auth_flow(
83 self, request: Request
84 ) -> typing.AsyncGenerator[Request, Response]:
85 """
86 Execute the authentication flow asynchronously.
88 By default, this defers to `.auth_flow()`. You should override this method
89 when the authentication scheme does I/O and/or uses concurrency primitives.
90 """
91 if self.requires_request_body:
92 await request.aread()
94 flow = self.auth_flow(request)
95 request = next(flow)
97 while True:
98 response = yield request
99 if self.requires_response_body:
100 await response.aread()
102 try:
103 request = flow.send(response)
104 except StopIteration:
105 break
108class FunctionAuth(Auth):
109 """
110 Allows the 'auth' argument to be passed as a simple callable function,
111 that takes the request, and returns a new, modified request.
112 """
114 def __init__(self, func: typing.Callable[[Request], Request]) -> None:
115 self._func = func
117 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
118 yield self._func(request)
121class BasicAuth(Auth):
122 """
123 Allows the 'auth' argument to be passed as a (username, password) pair,
124 and uses HTTP Basic authentication.
125 """
127 def __init__(
128 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
129 ):
130 self._auth_header = self._build_auth_header(username, password)
132 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
133 request.headers["Authorization"] = self._auth_header
134 yield request
136 def _build_auth_header(
137 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
138 ) -> str:
139 userpass = b":".join((to_bytes(username), to_bytes(password)))
140 token = b64encode(userpass).decode()
141 return f"Basic {token}"
144class DigestAuth(Auth):
145 _ALGORITHM_TO_HASH_FUNCTION: typing.Dict[str, typing.Callable[[bytes], "_Hash"]] = {
146 "MD5": hashlib.md5,
147 "MD5-SESS": hashlib.md5,
148 "SHA": hashlib.sha1,
149 "SHA-SESS": hashlib.sha1,
150 "SHA-256": hashlib.sha256,
151 "SHA-256-SESS": hashlib.sha256,
152 "SHA-512": hashlib.sha512,
153 "SHA-512-SESS": hashlib.sha512,
154 }
156 def __init__(
157 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
158 ) -> None:
159 self._username = to_bytes(username)
160 self._password = to_bytes(password)
161 self._last_challenge: typing.Optional[_DigestAuthChallenge] = None
162 self._nonce_count = 1
164 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
165 if self._last_challenge:
166 request.headers["Authorization"] = self._build_auth_header(
167 request, self._last_challenge
168 )
170 response = yield request
172 if response.status_code != 401 or "www-authenticate" not in response.headers:
173 # If the response is not a 401 then we don't
174 # need to build an authenticated request.
175 return
177 for auth_header in response.headers.get_list("www-authenticate"):
178 if auth_header.lower().startswith("digest "):
179 break
180 else:
181 # If the response does not include a 'WWW-Authenticate: Digest ...'
182 # header, then we don't need to build an authenticated request.
183 return
185 self._last_challenge = self._parse_challenge(request, response, auth_header)
186 self._nonce_count = 1
188 request.headers["Authorization"] = self._build_auth_header(
189 request, self._last_challenge
190 )
191 yield request
193 def _parse_challenge(
194 self, request: Request, response: Response, auth_header: str
195 ) -> "_DigestAuthChallenge":
196 """
197 Returns a challenge from a Digest WWW-Authenticate header.
198 These take the form of:
199 `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"`
200 """
201 scheme, _, fields = auth_header.partition(" ")
203 # This method should only ever have been called with a Digest auth header.
204 assert scheme.lower() == "digest"
206 header_dict: typing.Dict[str, str] = {}
207 for field in parse_http_list(fields):
208 key, value = field.strip().split("=", 1)
209 header_dict[key] = unquote(value)
211 try:
212 realm = header_dict["realm"].encode()
213 nonce = header_dict["nonce"].encode()
214 algorithm = header_dict.get("algorithm", "MD5")
215 opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None
216 qop = header_dict["qop"].encode() if "qop" in header_dict else None
217 return _DigestAuthChallenge(
218 realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop
219 )
220 except KeyError as exc:
221 message = "Malformed Digest WWW-Authenticate header"
222 raise ProtocolError(message, request=request) from exc
224 def _build_auth_header(
225 self, request: Request, challenge: "_DigestAuthChallenge"
226 ) -> str:
227 hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]
229 def digest(data: bytes) -> bytes:
230 return hash_func(data).hexdigest().encode()
232 A1 = b":".join((self._username, challenge.realm, self._password))
234 path = request.url.raw_path
235 A2 = b":".join((request.method.encode(), path))
236 # TODO: implement auth-int
237 HA2 = digest(A2)
239 nc_value = b"%08x" % self._nonce_count
240 cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce)
241 self._nonce_count += 1
243 HA1 = digest(A1)
244 if challenge.algorithm.lower().endswith("-sess"):
245 HA1 = digest(b":".join((HA1, challenge.nonce, cnonce)))
247 qop = self._resolve_qop(challenge.qop, request=request)
248 if qop is None:
249 digest_data = [HA1, challenge.nonce, HA2]
250 else:
251 digest_data = [challenge.nonce, nc_value, cnonce, qop, HA2]
252 key_digest = b":".join(digest_data)
254 format_args = {
255 "username": self._username,
256 "realm": challenge.realm,
257 "nonce": challenge.nonce,
258 "uri": path,
259 "response": digest(b":".join((HA1, key_digest))),
260 "algorithm": challenge.algorithm.encode(),
261 }
262 if challenge.opaque:
263 format_args["opaque"] = challenge.opaque
264 if qop:
265 format_args["qop"] = b"auth"
266 format_args["nc"] = nc_value
267 format_args["cnonce"] = cnonce
269 return "Digest " + self._get_header_value(format_args)
271 def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes:
272 s = str(nonce_count).encode()
273 s += nonce
274 s += time.ctime().encode()
275 s += os.urandom(8)
277 return hashlib.sha1(s).hexdigest()[:16].encode()
279 def _get_header_value(self, header_fields: typing.Dict[str, bytes]) -> str:
280 NON_QUOTED_FIELDS = ("algorithm", "qop", "nc")
281 QUOTED_TEMPLATE = '{}="{}"'
282 NON_QUOTED_TEMPLATE = "{}={}"
284 header_value = ""
285 for i, (field, value) in enumerate(header_fields.items()):
286 if i > 0:
287 header_value += ", "
288 template = (
289 QUOTED_TEMPLATE
290 if field not in NON_QUOTED_FIELDS
291 else NON_QUOTED_TEMPLATE
292 )
293 header_value += template.format(field, to_str(value))
295 return header_value
297 def _resolve_qop(
298 self, qop: typing.Optional[bytes], request: Request
299 ) -> typing.Optional[bytes]:
300 if qop is None:
301 return None
302 qops = re.split(b", ?", qop)
303 if b"auth" in qops:
304 return b"auth"
306 if qops == [b"auth-int"]:
307 raise NotImplementedError("Digest auth-int support is not yet implemented")
309 message = f'Unexpected qop value "{qop!r}" in digest auth'
310 raise ProtocolError(message, request=request)
313class _DigestAuthChallenge(typing.NamedTuple):
314 realm: bytes
315 nonce: bytes
316 algorithm: str
317 opaque: typing.Optional[bytes]
318 qop: typing.Optional[bytes]