Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_middleware_digest_auth.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Digest authentication middleware for aiohttp client.
4This middleware implements HTTP Digest Authentication according to RFC 7616,
5providing a more secure alternative to Basic Authentication. It supports all
6standard hash algorithms including MD5, SHA, SHA-256, SHA-512 and their session
7variants, as well as both 'auth' and 'auth-int' quality of protection (qop) options.
8"""
10import hashlib
11import os
12import re
13import time
14from typing import (
15 Callable,
16 Dict,
17 Final,
18 FrozenSet,
19 List,
20 Literal,
21 Tuple,
22 TypedDict,
23 Union,
24)
26from yarl import URL
28from . import hdrs
29from .client_exceptions import ClientError
30from .client_middlewares import ClientHandlerType
31from .client_reqrep import ClientRequest, ClientResponse
34class DigestAuthChallenge(TypedDict, total=False):
35 realm: str
36 nonce: str
37 qop: str
38 algorithm: str
39 opaque: str
42DigestFunctions: Dict[str, Callable[[bytes], "hashlib._Hash"]] = {
43 "MD5": hashlib.md5,
44 "MD5-SESS": hashlib.md5,
45 "SHA": hashlib.sha1,
46 "SHA-SESS": hashlib.sha1,
47 "SHA256": hashlib.sha256,
48 "SHA256-SESS": hashlib.sha256,
49 "SHA-256": hashlib.sha256,
50 "SHA-256-SESS": hashlib.sha256,
51 "SHA512": hashlib.sha512,
52 "SHA512-SESS": hashlib.sha512,
53 "SHA-512": hashlib.sha512,
54 "SHA-512-SESS": hashlib.sha512,
55}
58# Compile the regex pattern once at module level for performance
59_HEADER_PAIRS_PATTERN = re.compile(
60 r'(\w+)\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))'
61 # | | | | | | | | | || |
62 # +----|--|-|-|--|----|------|----|--||-----|--> alphanumeric key
63 # +--|-|-|--|----|------|----|--||-----|--> maybe whitespace
64 # | | | | | | | || |
65 # +-|-|--|----|------|----|--||-----|--> = (delimiter)
66 # +-|--|----|------|----|--||-----|--> maybe whitespace
67 # | | | | | || |
68 # +--|----|------|----|--||-----|--> group quoted or unquoted
69 # | | | | || |
70 # +----|------|----|--||-----|--> if quoted...
71 # +------|----|--||-----|--> anything but " or \
72 # +----|--||-----|--> escaped characters allowed
73 # +--||-----|--> or can be empty string
74 # || |
75 # +|-----|--> if unquoted...
76 # +-----|--> anything but , or <space>
77 # +--> at least one char req'd
78)
81# RFC 7616: Challenge parameters to extract
82CHALLENGE_FIELDS: Final[
83 Tuple[Literal["realm", "nonce", "qop", "algorithm", "opaque"], ...]
84] = (
85 "realm",
86 "nonce",
87 "qop",
88 "algorithm",
89 "opaque",
90)
92# Supported digest authentication algorithms
93# Use a tuple of sorted keys for predictable documentation and error messages
94SUPPORTED_ALGORITHMS: Final[Tuple[str, ...]] = tuple(sorted(DigestFunctions.keys()))
96# RFC 7616: Fields that require quoting in the Digest auth header
97# These fields must be enclosed in double quotes in the Authorization header.
98# Algorithm, qop, and nc are never quoted per RFC specifications.
99# This frozen set is used by the template-based header construction to
100# automatically determine which fields need quotes.
101QUOTED_AUTH_FIELDS: Final[FrozenSet[str]] = frozenset(
102 {"username", "realm", "nonce", "uri", "response", "opaque", "cnonce"}
103)
106def escape_quotes(value: str) -> str:
107 """Escape double quotes for HTTP header values."""
108 return value.replace('"', '\\"')
111def unescape_quotes(value: str) -> str:
112 """Unescape double quotes in HTTP header values."""
113 return value.replace('\\"', '"')
116def parse_header_pairs(header: str) -> Dict[str, str]:
117 """
118 Parse key-value pairs from WWW-Authenticate or similar HTTP headers.
120 This function handles the complex format of WWW-Authenticate header values,
121 supporting both quoted and unquoted values, proper handling of commas in
122 quoted values, and whitespace variations per RFC 7616.
124 Examples of supported formats:
125 - key1="value1", key2=value2
126 - key1 = "value1" , key2="value, with, commas"
127 - key1=value1,key2="value2"
128 - realm="example.com", nonce="12345", qop="auth"
130 Args:
131 header: The header value string to parse
133 Returns:
134 Dictionary mapping parameter names to their values
135 """
136 return {
137 stripped_key: unescape_quotes(quoted_val) if quoted_val else unquoted_val
138 for key, quoted_val, unquoted_val in _HEADER_PAIRS_PATTERN.findall(header)
139 if (stripped_key := key.strip())
140 }
143class DigestAuthMiddleware:
144 """
145 HTTP digest authentication middleware for aiohttp client.
147 This middleware intercepts 401 Unauthorized responses containing a Digest
148 authentication challenge, calculates the appropriate digest credentials,
149 and automatically retries the request with the proper Authorization header.
151 Features:
152 - Handles all aspects of Digest authentication handshake automatically
153 - Supports all standard hash algorithms:
154 - MD5, MD5-SESS
155 - SHA, SHA-SESS
156 - SHA256, SHA256-SESS, SHA-256, SHA-256-SESS
157 - SHA512, SHA512-SESS, SHA-512, SHA-512-SESS
158 - Supports 'auth' and 'auth-int' quality of protection modes
159 - Properly handles quoted strings and parameter parsing
160 - Includes replay attack protection with client nonce count tracking
162 Standards compliance:
163 - RFC 7616: HTTP Digest Access Authentication (primary reference)
164 - RFC 2617: HTTP Authentication (deprecated by RFC 7616)
165 - RFC 1945: Section 11.1 (username restrictions)
167 Implementation notes:
168 The core digest calculation is inspired by the implementation in
169 https://github.com/requests/requests/blob/v2.18.4/requests/auth.py
170 with added support for modern digest auth features and error handling.
171 """
173 def __init__(
174 self,
175 login: str,
176 password: str,
177 ) -> None:
178 if login is None:
179 raise ValueError("None is not allowed as login value")
181 if password is None:
182 raise ValueError("None is not allowed as password value")
184 if ":" in login:
185 raise ValueError('A ":" is not allowed in username (RFC 1945#section-11.1)')
187 self._login_str: Final[str] = login
188 self._login_bytes: Final[bytes] = login.encode("utf-8")
189 self._password_bytes: Final[bytes] = password.encode("utf-8")
191 self._last_nonce_bytes = b""
192 self._nonce_count = 0
193 self._challenge: DigestAuthChallenge = {}
195 def _encode(self, method: str, url: URL, body: Union[bytes, str]) -> str:
196 """
197 Build digest authorization header for the current challenge.
199 Args:
200 method: The HTTP method (GET, POST, etc.)
201 url: The request URL
202 body: The request body (used for qop=auth-int)
204 Returns:
205 A fully formatted Digest authorization header string
207 Raises:
208 ClientError: If the challenge is missing required parameters or
209 contains unsupported values
210 """
211 challenge = self._challenge
212 if "realm" not in challenge:
213 raise ClientError(
214 "Malformed Digest auth challenge: Missing 'realm' parameter"
215 )
217 if "nonce" not in challenge:
218 raise ClientError(
219 "Malformed Digest auth challenge: Missing 'nonce' parameter"
220 )
222 # Empty realm values are allowed per RFC 7616 (SHOULD, not MUST, contain host name)
223 realm = challenge["realm"]
224 nonce = challenge["nonce"]
226 # Empty nonce values are not allowed as they are security-critical for replay protection
227 if not nonce:
228 raise ClientError(
229 "Security issue: Digest auth challenge contains empty 'nonce' value"
230 )
232 qop_raw = challenge.get("qop", "")
233 algorithm = challenge.get("algorithm", "MD5").upper()
234 opaque = challenge.get("opaque", "")
236 # Convert string values to bytes once
237 nonce_bytes = nonce.encode("utf-8")
238 realm_bytes = realm.encode("utf-8")
239 path = URL(url).path_qs
241 # Process QoP
242 qop = ""
243 qop_bytes = b""
244 if qop_raw:
245 valid_qops = {"auth", "auth-int"}.intersection(
246 {q.strip() for q in qop_raw.split(",") if q.strip()}
247 )
248 if not valid_qops:
249 raise ClientError(
250 f"Digest auth error: Unsupported Quality of Protection (qop) value(s): {qop_raw}"
251 )
253 qop = "auth-int" if "auth-int" in valid_qops else "auth"
254 qop_bytes = qop.encode("utf-8")
256 if algorithm not in DigestFunctions:
257 raise ClientError(
258 f"Digest auth error: Unsupported hash algorithm: {algorithm}. "
259 f"Supported algorithms: {', '.join(SUPPORTED_ALGORITHMS)}"
260 )
261 hash_fn: Final = DigestFunctions[algorithm]
263 def H(x: bytes) -> bytes:
264 """RFC 7616 Section 3: Hash function H(data) = hex(hash(data))."""
265 return hash_fn(x).hexdigest().encode()
267 def KD(s: bytes, d: bytes) -> bytes:
268 """RFC 7616 Section 3: KD(secret, data) = H(concat(secret, ":", data))."""
269 return H(b":".join((s, d)))
271 # Calculate A1 and A2
272 A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes))
273 A2 = f"{method.upper()}:{path}".encode()
274 if qop == "auth-int":
275 if isinstance(body, str):
276 entity_str = body.encode("utf-8", errors="replace")
277 else:
278 entity_str = body
279 entity_hash = H(entity_str)
280 A2 = b":".join((A2, entity_hash))
282 HA1 = H(A1)
283 HA2 = H(A2)
285 # Nonce count handling
286 if nonce_bytes == self._last_nonce_bytes:
287 self._nonce_count += 1
288 else:
289 self._nonce_count = 1
291 self._last_nonce_bytes = nonce_bytes
292 ncvalue = f"{self._nonce_count:08x}"
293 ncvalue_bytes = ncvalue.encode("utf-8")
295 # Generate client nonce
296 cnonce = hashlib.sha1(
297 b"".join(
298 [
299 str(self._nonce_count).encode("utf-8"),
300 nonce_bytes,
301 time.ctime().encode("utf-8"),
302 os.urandom(8),
303 ]
304 )
305 ).hexdigest()[:16]
306 cnonce_bytes = cnonce.encode("utf-8")
308 # Special handling for session-based algorithms
309 if algorithm.upper().endswith("-SESS"):
310 HA1 = H(b":".join((HA1, nonce_bytes, cnonce_bytes)))
312 # Calculate the response digest
313 if qop:
314 noncebit = b":".join(
315 (nonce_bytes, ncvalue_bytes, cnonce_bytes, qop_bytes, HA2)
316 )
317 response_digest = KD(HA1, noncebit)
318 else:
319 response_digest = KD(HA1, b":".join((nonce_bytes, HA2)))
321 # Define a dict mapping of header fields to their values
322 # Group fields into always-present, optional, and qop-dependent
323 header_fields = {
324 # Always present fields
325 "username": escape_quotes(self._login_str),
326 "realm": escape_quotes(realm),
327 "nonce": escape_quotes(nonce),
328 "uri": path,
329 "response": response_digest.decode(),
330 "algorithm": algorithm,
331 }
333 # Optional fields
334 if opaque:
335 header_fields["opaque"] = escape_quotes(opaque)
337 # QoP-dependent fields
338 if qop:
339 header_fields["qop"] = qop
340 header_fields["nc"] = ncvalue
341 header_fields["cnonce"] = cnonce
343 # Build header using templates for each field type
344 pairs: List[str] = []
345 for field, value in header_fields.items():
346 if field in QUOTED_AUTH_FIELDS:
347 pairs.append(f'{field}="{value}"')
348 else:
349 pairs.append(f"{field}={value}")
351 return f"Digest {', '.join(pairs)}"
353 def _authenticate(self, response: ClientResponse) -> bool:
354 """
355 Takes the given response and tries digest-auth, if needed.
357 Returns true if the original request must be resent.
358 """
359 if response.status != 401:
360 return False
362 auth_header = response.headers.get("www-authenticate", "")
363 if not auth_header:
364 return False # No authentication header present
366 method, sep, headers = auth_header.partition(" ")
367 if not sep:
368 # No space found in www-authenticate header
369 return False # Malformed auth header, missing scheme separator
371 if method.lower() != "digest":
372 # Not a digest auth challenge (could be Basic, Bearer, etc.)
373 return False
375 if not headers:
376 # We have a digest scheme but no parameters
377 return False # Malformed digest header, missing parameters
379 # We have a digest auth header with content
380 if not (header_pairs := parse_header_pairs(headers)):
381 # Failed to parse any key-value pairs
382 return False # Malformed digest header, no valid parameters
384 # Extract challenge parameters
385 self._challenge = {}
386 for field in CHALLENGE_FIELDS:
387 if value := header_pairs.get(field):
388 self._challenge[field] = value
390 # Return True only if we found at least one challenge parameter
391 return bool(self._challenge)
393 async def __call__(
394 self, request: ClientRequest, handler: ClientHandlerType
395 ) -> ClientResponse:
396 """Run the digest auth middleware."""
397 response = None
398 for retry_count in range(2):
399 # Apply authorization header if we have a challenge (on second attempt)
400 if retry_count > 0:
401 request.headers[hdrs.AUTHORIZATION] = self._encode(
402 request.method, request.url, request.body
403 )
405 # Send the request
406 response = await handler(request)
408 # Check if we need to authenticate
409 if not self._authenticate(response):
410 break
412 # At this point, response is guaranteed to be defined
413 assert response is not None
414 return response