Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_middleware_digest_auth.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Digest authentication middleware for aiohttp client.
4This middleware implements HTTP Digest Authentication according to RFC 7616,
5providing a more secure alternative to Basic Authentication. It supports all
6standard hash algorithms including MD5, SHA, SHA-256, SHA-512 and their session
7variants, as well as both 'auth' and 'auth-int' quality of protection (qop) options.
8"""
10import hashlib
11import os
12import re
13import sys
14import time
15from collections.abc import Callable
16from typing import Final, Literal, TypedDict
18from yarl import URL
20from . import hdrs
21from .client_exceptions import ClientError
22from .client_middlewares import ClientHandlerType
23from .client_reqrep import ClientRequest, ClientResponse
24from .payload import Payload
27class DigestAuthChallenge(TypedDict, total=False):
28 realm: str
29 nonce: str
30 qop: str
31 algorithm: str
32 opaque: str
33 domain: str
34 stale: str
37DigestFunctions: dict[str, Callable[[bytes], "hashlib._Hash"]] = {
38 "MD5": hashlib.md5,
39 "MD5-SESS": hashlib.md5,
40 "SHA": hashlib.sha1,
41 "SHA-SESS": hashlib.sha1,
42 "SHA256": hashlib.sha256,
43 "SHA256-SESS": hashlib.sha256,
44 "SHA-256": hashlib.sha256,
45 "SHA-256-SESS": hashlib.sha256,
46 "SHA512": hashlib.sha512,
47 "SHA512-SESS": hashlib.sha512,
48 "SHA-512": hashlib.sha512,
49 "SHA-512-SESS": hashlib.sha512,
50}
53# Compile the regex pattern once at module level for performance
54_HEADER_PAIRS_PATTERN = re.compile(
55 r'(?:^|\s|,\s*)(\w+)\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))'
56 if sys.version_info < (3, 11)
57 else r'(?:^|\s|,\s*)((?>\w+))\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))'
58 # +------------|--------|--|-|-|--|----|------|----|--||-----|-> Match valid start/sep
59 # +--------|--|-|-|--|----|------|----|--||-----|-> alphanumeric key (atomic
60 # | | | | | | | | || | group reduces backtracking)
61 # +--|-|-|--|----|------|----|--||-----|-> maybe whitespace
62 # | | | | | | | || |
63 # +-|-|--|----|------|----|--||-----|-> = (delimiter)
64 # +-|--|----|------|----|--||-----|-> maybe whitespace
65 # | | | | | || |
66 # +--|----|------|----|--||-----|-> group quoted or unquoted
67 # | | | | || |
68 # +----|------|----|--||-----|-> if quoted...
69 # +------|----|--||-----|-> anything but " or \
70 # +----|--||-----|-> escaped characters allowed
71 # +--||-----|-> or can be empty string
72 # || |
73 # +|-----|-> if unquoted...
74 # +-----|-> anything but , or <space>
75 # +-> at least one char req'd
76)
79# RFC 7616: Challenge parameters to extract
80CHALLENGE_FIELDS: Final[
81 tuple[
82 Literal["realm", "nonce", "qop", "algorithm", "opaque", "domain", "stale"], ...
83 ]
84] = (
85 "realm",
86 "nonce",
87 "qop",
88 "algorithm",
89 "opaque",
90 "domain",
91 "stale",
92)
94# Supported digest authentication algorithms
95# Use a tuple of sorted keys for predictable documentation and error messages
96SUPPORTED_ALGORITHMS: Final[tuple[str, ...]] = tuple(sorted(DigestFunctions.keys()))
98# RFC 7616: Fields that require quoting in the Digest auth header
99# These fields must be enclosed in double quotes in the Authorization header.
100# Algorithm, qop, and nc are never quoted per RFC specifications.
101# This frozen set is used by the template-based header construction to
102# automatically determine which fields need quotes.
103QUOTED_AUTH_FIELDS: Final[frozenset[str]] = frozenset(
104 {"username", "realm", "nonce", "uri", "response", "opaque", "cnonce"}
105)
108def escape_quotes(value: str) -> str:
109 """Escape double quotes for HTTP header values."""
110 return value.replace('"', '\\"')
113def unescape_quotes(value: str) -> str:
114 """Unescape double quotes in HTTP header values."""
115 return value.replace('\\"', '"')
118def parse_header_pairs(header: str) -> dict[str, str]:
119 """
120 Parse key-value pairs from WWW-Authenticate or similar HTTP headers.
122 This function handles the complex format of WWW-Authenticate header values,
123 supporting both quoted and unquoted values, proper handling of commas in
124 quoted values, and whitespace variations per RFC 7616.
126 Examples of supported formats:
127 - key1="value1", key2=value2
128 - key1 = "value1" , key2="value, with, commas"
129 - key1=value1,key2="value2"
130 - realm="example.com", nonce="12345", qop="auth"
132 Args:
133 header: The header value string to parse
135 Returns:
136 Dictionary mapping parameter names to their values
137 """
138 return {
139 stripped_key: unescape_quotes(quoted_val) if quoted_val else unquoted_val
140 for key, quoted_val, unquoted_val in _HEADER_PAIRS_PATTERN.findall(header)
141 if (stripped_key := key.strip())
142 }
145class DigestAuthMiddleware:
146 """
147 HTTP digest authentication middleware for aiohttp client.
149 This middleware intercepts 401 Unauthorized responses containing a Digest
150 authentication challenge, calculates the appropriate digest credentials,
151 and automatically retries the request with the proper Authorization header.
153 Features:
154 - Handles all aspects of Digest authentication handshake automatically
155 - Supports all standard hash algorithms:
156 - MD5, MD5-SESS
157 - SHA, SHA-SESS
158 - SHA256, SHA256-SESS, SHA-256, SHA-256-SESS
159 - SHA512, SHA512-SESS, SHA-512, SHA-512-SESS
160 - Supports 'auth' and 'auth-int' quality of protection modes
161 - Properly handles quoted strings and parameter parsing
162 - Includes replay attack protection with client nonce count tracking
163 - Supports preemptive authentication per RFC 7616 Section 3.6
165 Standards compliance:
166 - RFC 7616: HTTP Digest Access Authentication (primary reference)
167 - RFC 2617: HTTP Authentication (deprecated by RFC 7616)
168 - RFC 1945: Section 11.1 (username restrictions)
170 Implementation notes:
171 The core digest calculation is inspired by the implementation in
172 https://github.com/requests/requests/blob/v2.18.4/requests/auth.py
173 with added support for modern digest auth features and error handling.
174 """
176 def __init__(
177 self,
178 login: str,
179 password: str,
180 preemptive: bool = True,
181 ) -> None:
182 if login is None:
183 raise ValueError("None is not allowed as login value")
185 if password is None:
186 raise ValueError("None is not allowed as password value")
188 if ":" in login:
189 raise ValueError('A ":" is not allowed in username (RFC 1945#section-11.1)')
191 self._login_str: Final[str] = login
192 self._login_bytes: Final[bytes] = login.encode("utf-8")
193 self._password_bytes: Final[bytes] = password.encode("utf-8")
195 self._last_nonce_bytes = b""
196 self._nonce_count = 0
197 self._challenge: DigestAuthChallenge = {}
198 self._preemptive: bool = preemptive
199 # Set of URLs defining the protection space
200 self._protection_space: list[str] = []
202 async def _encode(self, method: str, url: URL, body: Payload | Literal[b""]) -> str:
203 """
204 Build digest authorization header for the current challenge.
206 Args:
207 method: The HTTP method (GET, POST, etc.)
208 url: The request URL
209 body: The request body (used for qop=auth-int)
211 Returns:
212 A fully formatted Digest authorization header string
214 Raises:
215 ClientError: If the challenge is missing required parameters or
216 contains unsupported values
218 """
219 challenge = self._challenge
220 if "realm" not in challenge:
221 raise ClientError(
222 "Malformed Digest auth challenge: Missing 'realm' parameter"
223 )
225 if "nonce" not in challenge:
226 raise ClientError(
227 "Malformed Digest auth challenge: Missing 'nonce' parameter"
228 )
230 # Empty realm values are allowed per RFC 7616 (SHOULD, not MUST, contain host name)
231 realm = challenge["realm"]
232 nonce = challenge["nonce"]
234 # Empty nonce values are not allowed as they are security-critical for replay protection
235 if not nonce:
236 raise ClientError(
237 "Security issue: Digest auth challenge contains empty 'nonce' value"
238 )
240 qop_raw = challenge.get("qop", "")
241 # Preserve original algorithm case for response while using uppercase for processing
242 algorithm_original = challenge.get("algorithm", "MD5")
243 algorithm = algorithm_original.upper()
244 opaque = challenge.get("opaque", "")
246 # Convert string values to bytes once
247 nonce_bytes = nonce.encode("utf-8")
248 realm_bytes = realm.encode("utf-8")
249 # Use the encoded request-target (raw_path_qs) since that is what is
250 # transmitted on the wire and what the server signs against. Using the
251 # decoded form would cause digest verification to fail when the path
252 # or query string contains percent-encoded reserved characters.
253 path = URL(url).raw_path_qs
255 # Process QoP
256 qop = ""
257 qop_bytes = b""
258 if qop_raw:
259 valid_qops = {"auth", "auth-int"}.intersection(
260 {q.strip() for q in qop_raw.split(",") if q.strip()}
261 )
262 if not valid_qops:
263 raise ClientError(
264 f"Digest auth error: Unsupported Quality of Protection (qop) value(s): {qop_raw}"
265 )
267 qop = "auth-int" if "auth-int" in valid_qops else "auth"
268 qop_bytes = qop.encode("utf-8")
270 if algorithm not in DigestFunctions:
271 raise ClientError(
272 f"Digest auth error: Unsupported hash algorithm: {algorithm}. "
273 f"Supported algorithms: {', '.join(SUPPORTED_ALGORITHMS)}"
274 )
275 hash_fn: Final = DigestFunctions[algorithm]
277 def H(x: bytes) -> bytes:
278 """RFC 7616 Section 3: Hash function H(data) = hex(hash(data))."""
279 return hash_fn(x).hexdigest().encode()
281 def KD(s: bytes, d: bytes) -> bytes:
282 """RFC 7616 Section 3: KD(secret, data) = H(concat(secret, ":", data))."""
283 return H(b":".join((s, d)))
285 # Calculate A1 and A2
286 A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes))
287 A2 = f"{method.upper()}:{path}".encode()
288 if qop == "auth-int":
289 if isinstance(body, Payload): # will always be empty bytes unless Payload
290 entity_bytes = await body.as_bytes() # Get bytes from Payload
291 else:
292 entity_bytes = body
293 entity_hash = H(entity_bytes)
294 A2 = b":".join((A2, entity_hash))
296 HA1 = H(A1)
297 HA2 = H(A2)
299 # Nonce count handling
300 if nonce_bytes == self._last_nonce_bytes:
301 self._nonce_count += 1
302 else:
303 self._nonce_count = 1
305 self._last_nonce_bytes = nonce_bytes
306 ncvalue = f"{self._nonce_count:08x}"
307 ncvalue_bytes = ncvalue.encode("utf-8")
309 # Generate client nonce
310 cnonce = hashlib.sha1(
311 b"".join(
312 [
313 str(self._nonce_count).encode("utf-8"),
314 nonce_bytes,
315 time.ctime().encode("utf-8"),
316 os.urandom(8),
317 ]
318 )
319 ).hexdigest()[:16]
320 cnonce_bytes = cnonce.encode("utf-8")
322 # Special handling for session-based algorithms
323 if algorithm.upper().endswith("-SESS"):
324 HA1 = H(b":".join((HA1, nonce_bytes, cnonce_bytes)))
326 # Calculate the response digest
327 if qop:
328 noncebit = b":".join(
329 (nonce_bytes, ncvalue_bytes, cnonce_bytes, qop_bytes, HA2)
330 )
331 response_digest = KD(HA1, noncebit)
332 else:
333 response_digest = KD(HA1, b":".join((nonce_bytes, HA2)))
335 # Define a dict mapping of header fields to their values
336 # Group fields into always-present, optional, and qop-dependent
337 header_fields = {
338 # Always present fields
339 "username": escape_quotes(self._login_str),
340 "realm": escape_quotes(realm),
341 "nonce": escape_quotes(nonce),
342 "uri": path,
343 "response": response_digest.decode(),
344 "algorithm": algorithm_original,
345 }
347 # Optional fields
348 if opaque:
349 header_fields["opaque"] = escape_quotes(opaque)
351 # QoP-dependent fields
352 if qop:
353 header_fields["qop"] = qop
354 header_fields["nc"] = ncvalue
355 header_fields["cnonce"] = cnonce
357 # Build header using templates for each field type
358 pairs: list[str] = []
359 for field, value in header_fields.items():
360 if field in QUOTED_AUTH_FIELDS:
361 pairs.append(f'{field}="{value}"')
362 else:
363 pairs.append(f"{field}={value}")
365 return f"Digest {', '.join(pairs)}"
367 def _in_protection_space(self, url: URL) -> bool:
368 """
369 Check if the given URL is within the current protection space.
371 According to RFC 7616, a URI is in the protection space if any URI
372 in the protection space is a prefix of it (after both have been made absolute).
373 """
374 request_str = str(url)
375 for space_str in self._protection_space:
376 # Check if request starts with space URL
377 if not request_str.startswith(space_str):
378 continue
379 # Exact match or space ends with / (proper directory prefix)
380 if len(request_str) == len(space_str) or space_str[-1] == "/":
381 return True
382 # Check next char is / to ensure proper path boundary
383 if request_str[len(space_str)] == "/":
384 return True
385 return False
387 def _authenticate(self, response: ClientResponse) -> bool:
388 """
389 Takes the given response and tries digest-auth, if needed.
391 Returns true if the original request must be resent.
392 """
393 if response.status != 401:
394 return False
396 auth_header = response.headers.get("www-authenticate", "")
397 if not auth_header:
398 return False # No authentication header present
400 method, sep, headers = auth_header.partition(" ")
401 if not sep:
402 # No space found in www-authenticate header
403 return False # Malformed auth header, missing scheme separator
405 if method.lower() != "digest":
406 # Not a digest auth challenge (could be Basic, Bearer, etc.)
407 return False
409 if not headers:
410 # We have a digest scheme but no parameters
411 return False # Malformed digest header, missing parameters
413 # We have a digest auth header with content
414 if not (header_pairs := parse_header_pairs(headers)):
415 # Failed to parse any key-value pairs
416 return False # Malformed digest header, no valid parameters
418 # Extract challenge parameters
419 self._challenge = {}
420 for field in CHALLENGE_FIELDS:
421 if (value := header_pairs.get(field)) is not None:
422 self._challenge[field] = value
424 # Update protection space based on domain parameter or default to origin
425 origin = response.url.origin()
427 if domain := self._challenge.get("domain"):
428 # Parse space-separated list of URIs
429 self._protection_space = []
430 for uri in domain.split():
431 # Remove quotes if present
432 uri = uri.strip('"')
433 if uri.startswith("/"):
434 # Path-absolute, relative to origin
435 self._protection_space.append(str(origin.join(URL(uri))))
436 else:
437 # Absolute URI
438 self._protection_space.append(str(URL(uri)))
439 else:
440 # No domain specified, protection space is entire origin
441 self._protection_space = [str(origin)]
443 # Return True only if we found at least one challenge parameter
444 return bool(self._challenge)
446 async def __call__(
447 self, request: ClientRequest, handler: ClientHandlerType
448 ) -> ClientResponse:
449 """Run the digest auth middleware."""
450 response = None
451 for retry_count in range(2):
452 # Apply authorization header if:
453 # 1. This is a retry after 401 (retry_count > 0), OR
454 # 2. Preemptive auth is enabled AND we have a challenge AND the URL is in protection space
455 if retry_count > 0 or (
456 self._preemptive
457 and self._challenge
458 and self._in_protection_space(request.url)
459 ):
460 request.headers[hdrs.AUTHORIZATION] = await self._encode(
461 request.method, request.url, request.body
462 )
464 # Send the request
465 response = await handler(request)
467 # Check if we need to authenticate
468 if not self._authenticate(response):
469 break
471 # At this point, response is guaranteed to be defined
472 assert response is not None
473 return response