Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_auth.py: 26%
166 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import hashlib
2import netrc
3import os
4import re
5import time
6import typing
7from base64 import b64encode
8from urllib.request import parse_http_list
10from ._exceptions import ProtocolError
11from ._models import Request, Response
12from ._utils import to_bytes, to_str, unquote
14if typing.TYPE_CHECKING: # pragma: no cover
15 from hashlib import _Hash
18class Auth:
19 """
20 Base class for all authentication schemes.
22 To implement a custom authentication scheme, subclass `Auth` and override
23 the `.auth_flow()` method.
25 If the authentication scheme does I/O such as disk access or network calls, or uses
26 synchronization primitives such as locks, you should override `.sync_auth_flow()`
27 and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized
28 implementations that will be used by `Client` and `AsyncClient` respectively.
29 """
31 requires_request_body = False
32 requires_response_body = False
34 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
35 """
36 Execute the authentication flow.
38 To dispatch a request, `yield` it:
40 ```
41 yield request
42 ```
44 The client will `.send()` the response back into the flow generator. You can
45 access it like so:
47 ```
48 response = yield request
49 ```
51 A `return` (or reaching the end of the generator) will result in the
52 client returning the last response obtained from the server.
54 You can dispatch as many requests as is necessary.
55 """
56 yield request
58 def sync_auth_flow(
59 self, request: Request
60 ) -> typing.Generator[Request, Response, None]:
61 """
62 Execute the authentication flow synchronously.
64 By default, this defers to `.auth_flow()`. You should override this method
65 when the authentication scheme does I/O and/or uses concurrency primitives.
66 """
67 if self.requires_request_body:
68 request.read()
70 flow = self.auth_flow(request)
71 request = next(flow)
73 while True:
74 response = yield request
75 if self.requires_response_body:
76 response.read()
78 try:
79 request = flow.send(response)
80 except StopIteration:
81 break
83 async def async_auth_flow(
84 self, request: Request
85 ) -> typing.AsyncGenerator[Request, Response]:
86 """
87 Execute the authentication flow asynchronously.
89 By default, this defers to `.auth_flow()`. You should override this method
90 when the authentication scheme does I/O and/or uses concurrency primitives.
91 """
92 if self.requires_request_body:
93 await request.aread()
95 flow = self.auth_flow(request)
96 request = next(flow)
98 while True:
99 response = yield request
100 if self.requires_response_body:
101 await response.aread()
103 try:
104 request = flow.send(response)
105 except StopIteration:
106 break
109class FunctionAuth(Auth):
110 """
111 Allows the 'auth' argument to be passed as a simple callable function,
112 that takes the request, and returns a new, modified request.
113 """
115 def __init__(self, func: typing.Callable[[Request], Request]) -> None:
116 self._func = func
118 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
119 yield self._func(request)
122class BasicAuth(Auth):
123 """
124 Allows the 'auth' argument to be passed as a (username, password) pair,
125 and uses HTTP Basic authentication.
126 """
128 def __init__(
129 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
130 ):
131 self._auth_header = self._build_auth_header(username, password)
133 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
134 request.headers["Authorization"] = self._auth_header
135 yield request
137 def _build_auth_header(
138 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
139 ) -> str:
140 userpass = b":".join((to_bytes(username), to_bytes(password)))
141 token = b64encode(userpass).decode()
142 return f"Basic {token}"
145class NetRCAuth(Auth):
146 """
147 Use a 'netrc' file to lookup basic auth credentials based on the url host.
148 """
150 def __init__(self, file: typing.Optional[str] = None):
151 self._netrc_info = netrc.netrc(file)
153 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
154 auth_info = self._netrc_info.authenticators(request.url.host)
155 if auth_info is None or not auth_info[2]:
156 # The netrc file did not have authentication credentials for this host.
157 yield request
158 else:
159 # Build a basic auth header with credentials from the netrc file.
160 request.headers["Authorization"] = self._build_auth_header(
161 username=auth_info[0], password=auth_info[2]
162 )
163 yield request
165 def _build_auth_header(
166 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
167 ) -> str:
168 userpass = b":".join((to_bytes(username), to_bytes(password)))
169 token = b64encode(userpass).decode()
170 return f"Basic {token}"
173class DigestAuth(Auth):
174 _ALGORITHM_TO_HASH_FUNCTION: typing.Dict[str, typing.Callable[[bytes], "_Hash"]] = {
175 "MD5": hashlib.md5,
176 "MD5-SESS": hashlib.md5,
177 "SHA": hashlib.sha1,
178 "SHA-SESS": hashlib.sha1,
179 "SHA-256": hashlib.sha256,
180 "SHA-256-SESS": hashlib.sha256,
181 "SHA-512": hashlib.sha512,
182 "SHA-512-SESS": hashlib.sha512,
183 }
185 def __init__(
186 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
187 ) -> None:
188 self._username = to_bytes(username)
189 self._password = to_bytes(password)
190 self._last_challenge: typing.Optional[_DigestAuthChallenge] = None
191 self._nonce_count = 1
193 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
194 if self._last_challenge:
195 request.headers["Authorization"] = self._build_auth_header(
196 request, self._last_challenge
197 )
199 response = yield request
201 if response.status_code != 401 or "www-authenticate" not in response.headers:
202 # If the response is not a 401 then we don't
203 # need to build an authenticated request.
204 return
206 for auth_header in response.headers.get_list("www-authenticate"):
207 if auth_header.lower().startswith("digest "):
208 break
209 else:
210 # If the response does not include a 'WWW-Authenticate: Digest ...'
211 # header, then we don't need to build an authenticated request.
212 return
214 self._last_challenge = self._parse_challenge(request, response, auth_header)
215 self._nonce_count = 1
217 request.headers["Authorization"] = self._build_auth_header(
218 request, self._last_challenge
219 )
220 yield request
222 def _parse_challenge(
223 self, request: Request, response: Response, auth_header: str
224 ) -> "_DigestAuthChallenge":
225 """
226 Returns a challenge from a Digest WWW-Authenticate header.
227 These take the form of:
228 `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"`
229 """
230 scheme, _, fields = auth_header.partition(" ")
232 # This method should only ever have been called with a Digest auth header.
233 assert scheme.lower() == "digest"
235 header_dict: typing.Dict[str, str] = {}
236 for field in parse_http_list(fields):
237 key, value = field.strip().split("=", 1)
238 header_dict[key] = unquote(value)
240 try:
241 realm = header_dict["realm"].encode()
242 nonce = header_dict["nonce"].encode()
243 algorithm = header_dict.get("algorithm", "MD5")
244 opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None
245 qop = header_dict["qop"].encode() if "qop" in header_dict else None
246 return _DigestAuthChallenge(
247 realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop
248 )
249 except KeyError as exc:
250 message = "Malformed Digest WWW-Authenticate header"
251 raise ProtocolError(message, request=request) from exc
253 def _build_auth_header(
254 self, request: Request, challenge: "_DigestAuthChallenge"
255 ) -> str:
256 hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]
258 def digest(data: bytes) -> bytes:
259 return hash_func(data).hexdigest().encode()
261 A1 = b":".join((self._username, challenge.realm, self._password))
263 path = request.url.raw_path
264 A2 = b":".join((request.method.encode(), path))
265 # TODO: implement auth-int
266 HA2 = digest(A2)
268 nc_value = b"%08x" % self._nonce_count
269 cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce)
270 self._nonce_count += 1
272 HA1 = digest(A1)
273 if challenge.algorithm.lower().endswith("-sess"):
274 HA1 = digest(b":".join((HA1, challenge.nonce, cnonce)))
276 qop = self._resolve_qop(challenge.qop, request=request)
277 if qop is None:
278 digest_data = [HA1, challenge.nonce, HA2]
279 else:
280 digest_data = [challenge.nonce, nc_value, cnonce, qop, HA2]
281 key_digest = b":".join(digest_data)
283 format_args = {
284 "username": self._username,
285 "realm": challenge.realm,
286 "nonce": challenge.nonce,
287 "uri": path,
288 "response": digest(b":".join((HA1, key_digest))),
289 "algorithm": challenge.algorithm.encode(),
290 }
291 if challenge.opaque:
292 format_args["opaque"] = challenge.opaque
293 if qop:
294 format_args["qop"] = b"auth"
295 format_args["nc"] = nc_value
296 format_args["cnonce"] = cnonce
298 return "Digest " + self._get_header_value(format_args)
300 def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes:
301 s = str(nonce_count).encode()
302 s += nonce
303 s += time.ctime().encode()
304 s += os.urandom(8)
306 return hashlib.sha1(s).hexdigest()[:16].encode()
308 def _get_header_value(self, header_fields: typing.Dict[str, bytes]) -> str:
309 NON_QUOTED_FIELDS = ("algorithm", "qop", "nc")
310 QUOTED_TEMPLATE = '{}="{}"'
311 NON_QUOTED_TEMPLATE = "{}={}"
313 header_value = ""
314 for i, (field, value) in enumerate(header_fields.items()):
315 if i > 0:
316 header_value += ", "
317 template = (
318 QUOTED_TEMPLATE
319 if field not in NON_QUOTED_FIELDS
320 else NON_QUOTED_TEMPLATE
321 )
322 header_value += template.format(field, to_str(value))
324 return header_value
326 def _resolve_qop(
327 self, qop: typing.Optional[bytes], request: Request
328 ) -> typing.Optional[bytes]:
329 if qop is None:
330 return None
331 qops = re.split(b", ?", qop)
332 if b"auth" in qops:
333 return b"auth"
335 if qops == [b"auth-int"]:
336 raise NotImplementedError("Digest auth-int support is not yet implemented")
338 message = f'Unexpected qop value "{qop!r}" in digest auth'
339 raise ProtocolError(message, request=request)
342class _DigestAuthChallenge(typing.NamedTuple):
343 realm: bytes
344 nonce: bytes
345 algorithm: str
346 opaque: typing.Optional[bytes]
347 qop: typing.Optional[bytes]