Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_middleware_digest_auth.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Digest authentication middleware for aiohttp client.
4This middleware implements HTTP Digest Authentication according to RFC 7616,
5providing a more secure alternative to Basic Authentication. It supports all
6standard hash algorithms including MD5, SHA, SHA-256, SHA-512 and their session
7variants, as well as both 'auth' and 'auth-int' quality of protection (qop) options.
8"""
10import hashlib
11import os
12import re
13import sys
14import time
15from collections.abc import Callable
16from typing import Final, Literal, TypedDict
18from yarl import URL
20from . import hdrs
21from .client_exceptions import ClientError
22from .client_middlewares import ClientHandlerType
23from .client_reqrep import ClientRequest, ClientResponse
24from .payload import Payload
27class DigestAuthChallenge(TypedDict, total=False):
28 realm: str
29 nonce: str
30 qop: str
31 algorithm: str
32 opaque: str
33 domain: str
34 stale: str
37DigestFunctions: dict[str, Callable[[bytes], "hashlib._Hash"]] = {
38 "MD5": hashlib.md5,
39 "MD5-SESS": hashlib.md5,
40 "SHA": hashlib.sha1,
41 "SHA-SESS": hashlib.sha1,
42 "SHA256": hashlib.sha256,
43 "SHA256-SESS": hashlib.sha256,
44 "SHA-256": hashlib.sha256,
45 "SHA-256-SESS": hashlib.sha256,
46 "SHA512": hashlib.sha512,
47 "SHA512-SESS": hashlib.sha512,
48 "SHA-512": hashlib.sha512,
49 "SHA-512-SESS": hashlib.sha512,
50}
53# Compile the regex pattern once at module level for performance
54_HEADER_PAIRS_PATTERN = re.compile(
55 r'(?:^|\s|,\s*)(\w+)\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))'
56 if sys.version_info < (3, 11)
57 else r'(?:^|\s|,\s*)((?>\w+))\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))'
58 # +------------|--------|--|-|-|--|----|------|----|--||-----|-> Match valid start/sep
59 # +--------|--|-|-|--|----|------|----|--||-----|-> alphanumeric key (atomic
60 # | | | | | | | | || | group reduces backtracking)
61 # +--|-|-|--|----|------|----|--||-----|-> maybe whitespace
62 # | | | | | | | || |
63 # +-|-|--|----|------|----|--||-----|-> = (delimiter)
64 # +-|--|----|------|----|--||-----|-> maybe whitespace
65 # | | | | | || |
66 # +--|----|------|----|--||-----|-> group quoted or unquoted
67 # | | | | || |
68 # +----|------|----|--||-----|-> if quoted...
69 # +------|----|--||-----|-> anything but " or \
70 # +----|--||-----|-> escaped characters allowed
71 # +--||-----|-> or can be empty string
72 # || |
73 # +|-----|-> if unquoted...
74 # +-----|-> anything but , or <space>
75 # +-> at least one char req'd
76)
79# RFC 7616: Challenge parameters to extract
80CHALLENGE_FIELDS: Final[
81 tuple[
82 Literal["realm", "nonce", "qop", "algorithm", "opaque", "domain", "stale"], ...
83 ]
84] = (
85 "realm",
86 "nonce",
87 "qop",
88 "algorithm",
89 "opaque",
90 "domain",
91 "stale",
92)
94# Supported digest authentication algorithms
95# Use a tuple of sorted keys for predictable documentation and error messages
96SUPPORTED_ALGORITHMS: Final[tuple[str, ...]] = tuple(sorted(DigestFunctions.keys()))
98# RFC 7616: Fields that require quoting in the Digest auth header
99# These fields must be enclosed in double quotes in the Authorization header.
100# Algorithm, qop, and nc are never quoted per RFC specifications.
101# This frozen set is used by the template-based header construction to
102# automatically determine which fields need quotes.
103QUOTED_AUTH_FIELDS: Final[frozenset[str]] = frozenset(
104 {"username", "realm", "nonce", "uri", "response", "opaque", "cnonce"}
105)
108def escape_quotes(value: str) -> str:
109 """Escape double quotes for HTTP header values."""
110 return value.replace('"', '\\"')
113def unescape_quotes(value: str) -> str:
114 """Unescape double quotes in HTTP header values."""
115 return value.replace('\\"', '"')
118def parse_header_pairs(header: str) -> dict[str, str]:
119 """
120 Parse key-value pairs from WWW-Authenticate or similar HTTP headers.
122 This function handles the complex format of WWW-Authenticate header values,
123 supporting both quoted and unquoted values, proper handling of commas in
124 quoted values, and whitespace variations per RFC 7616.
126 Examples of supported formats:
127 - key1="value1", key2=value2
128 - key1 = "value1" , key2="value, with, commas"
129 - key1=value1,key2="value2"
130 - realm="example.com", nonce="12345", qop="auth"
132 Args:
133 header: The header value string to parse
135 Returns:
136 Dictionary mapping parameter names to their values
137 """
138 return {
139 stripped_key: unescape_quotes(quoted_val) if quoted_val else unquoted_val
140 for key, quoted_val, unquoted_val in _HEADER_PAIRS_PATTERN.findall(header)
141 if (stripped_key := key.strip())
142 }
145class DigestAuthMiddleware:
146 """
147 HTTP digest authentication middleware for aiohttp client.
149 This middleware intercepts 401 Unauthorized responses containing a Digest
150 authentication challenge, calculates the appropriate digest credentials,
151 and automatically retries the request with the proper Authorization header.
153 Features:
154 - Handles all aspects of Digest authentication handshake automatically
155 - Supports all standard hash algorithms:
156 - MD5, MD5-SESS
157 - SHA, SHA-SESS
158 - SHA256, SHA256-SESS, SHA-256, SHA-256-SESS
159 - SHA512, SHA512-SESS, SHA-512, SHA-512-SESS
160 - Supports 'auth' and 'auth-int' quality of protection modes
161 - Properly handles quoted strings and parameter parsing
162 - Includes replay attack protection with client nonce count tracking
163 - Supports preemptive authentication per RFC 7616 Section 3.6
165 Standards compliance:
166 - RFC 7616: HTTP Digest Access Authentication (primary reference)
167 - RFC 2617: HTTP Authentication (deprecated by RFC 7616)
168 - RFC 1945: Section 11.1 (username restrictions)
170 Implementation notes:
171 The core digest calculation is inspired by the implementation in
172 https://github.com/requests/requests/blob/v2.18.4/requests/auth.py
173 with added support for modern digest auth features and error handling.
174 """
176 def __init__(
177 self,
178 login: str,
179 password: str,
180 preemptive: bool = True,
181 ) -> None:
182 if login is None:
183 raise ValueError("None is not allowed as login value")
185 if password is None:
186 raise ValueError("None is not allowed as password value")
188 if ":" in login:
189 raise ValueError('A ":" is not allowed in username (RFC 1945#section-11.1)')
191 self._login_str: Final[str] = login
192 self._login_bytes: Final[bytes] = login.encode("utf-8")
193 self._password_bytes: Final[bytes] = password.encode("utf-8")
195 self._last_nonce_bytes = b""
196 self._nonce_count = 0
197 self._challenge: DigestAuthChallenge = {}
198 self._preemptive: bool = preemptive
199 # Set of URLs defining the protection space
200 self._protection_space: list[str] = []
202 async def _encode(self, method: str, url: URL, body: Payload | Literal[b""]) -> str:
203 """
204 Build digest authorization header for the current challenge.
206 Args:
207 method: The HTTP method (GET, POST, etc.)
208 url: The request URL
209 body: The request body (used for qop=auth-int)
211 Returns:
212 A fully formatted Digest authorization header string
214 Raises:
215 ClientError: If the challenge is missing required parameters or
216 contains unsupported values
218 """
219 challenge = self._challenge
220 if "realm" not in challenge:
221 raise ClientError(
222 "Malformed Digest auth challenge: Missing 'realm' parameter"
223 )
225 if "nonce" not in challenge:
226 raise ClientError(
227 "Malformed Digest auth challenge: Missing 'nonce' parameter"
228 )
230 # Empty realm values are allowed per RFC 7616 (SHOULD, not MUST, contain host name)
231 realm = challenge["realm"]
232 nonce = challenge["nonce"]
234 # Empty nonce values are not allowed as they are security-critical for replay protection
235 if not nonce:
236 raise ClientError(
237 "Security issue: Digest auth challenge contains empty 'nonce' value"
238 )
240 qop_raw = challenge.get("qop", "")
241 # Preserve original algorithm case for response while using uppercase for processing
242 algorithm_original = challenge.get("algorithm", "MD5")
243 algorithm = algorithm_original.upper()
244 opaque = challenge.get("opaque", "")
246 # Convert string values to bytes once
247 nonce_bytes = nonce.encode("utf-8")
248 realm_bytes = realm.encode("utf-8")
249 path = URL(url).path_qs
251 # Process QoP
252 qop = ""
253 qop_bytes = b""
254 if qop_raw:
255 valid_qops = {"auth", "auth-int"}.intersection(
256 {q.strip() for q in qop_raw.split(",") if q.strip()}
257 )
258 if not valid_qops:
259 raise ClientError(
260 f"Digest auth error: Unsupported Quality of Protection (qop) value(s): {qop_raw}"
261 )
263 qop = "auth-int" if "auth-int" in valid_qops else "auth"
264 qop_bytes = qop.encode("utf-8")
266 if algorithm not in DigestFunctions:
267 raise ClientError(
268 f"Digest auth error: Unsupported hash algorithm: {algorithm}. "
269 f"Supported algorithms: {', '.join(SUPPORTED_ALGORITHMS)}"
270 )
271 hash_fn: Final = DigestFunctions[algorithm]
273 def H(x: bytes) -> bytes:
274 """RFC 7616 Section 3: Hash function H(data) = hex(hash(data))."""
275 return hash_fn(x).hexdigest().encode()
277 def KD(s: bytes, d: bytes) -> bytes:
278 """RFC 7616 Section 3: KD(secret, data) = H(concat(secret, ":", data))."""
279 return H(b":".join((s, d)))
281 # Calculate A1 and A2
282 A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes))
283 A2 = f"{method.upper()}:{path}".encode()
284 if qop == "auth-int":
285 if isinstance(body, Payload): # will always be empty bytes unless Payload
286 entity_bytes = await body.as_bytes() # Get bytes from Payload
287 else:
288 entity_bytes = body
289 entity_hash = H(entity_bytes)
290 A2 = b":".join((A2, entity_hash))
292 HA1 = H(A1)
293 HA2 = H(A2)
295 # Nonce count handling
296 if nonce_bytes == self._last_nonce_bytes:
297 self._nonce_count += 1
298 else:
299 self._nonce_count = 1
301 self._last_nonce_bytes = nonce_bytes
302 ncvalue = f"{self._nonce_count:08x}"
303 ncvalue_bytes = ncvalue.encode("utf-8")
305 # Generate client nonce
306 cnonce = hashlib.sha1(
307 b"".join(
308 [
309 str(self._nonce_count).encode("utf-8"),
310 nonce_bytes,
311 time.ctime().encode("utf-8"),
312 os.urandom(8),
313 ]
314 )
315 ).hexdigest()[:16]
316 cnonce_bytes = cnonce.encode("utf-8")
318 # Special handling for session-based algorithms
319 if algorithm.upper().endswith("-SESS"):
320 HA1 = H(b":".join((HA1, nonce_bytes, cnonce_bytes)))
322 # Calculate the response digest
323 if qop:
324 noncebit = b":".join(
325 (nonce_bytes, ncvalue_bytes, cnonce_bytes, qop_bytes, HA2)
326 )
327 response_digest = KD(HA1, noncebit)
328 else:
329 response_digest = KD(HA1, b":".join((nonce_bytes, HA2)))
331 # Define a dict mapping of header fields to their values
332 # Group fields into always-present, optional, and qop-dependent
333 header_fields = {
334 # Always present fields
335 "username": escape_quotes(self._login_str),
336 "realm": escape_quotes(realm),
337 "nonce": escape_quotes(nonce),
338 "uri": path,
339 "response": response_digest.decode(),
340 "algorithm": algorithm_original,
341 }
343 # Optional fields
344 if opaque:
345 header_fields["opaque"] = escape_quotes(opaque)
347 # QoP-dependent fields
348 if qop:
349 header_fields["qop"] = qop
350 header_fields["nc"] = ncvalue
351 header_fields["cnonce"] = cnonce
353 # Build header using templates for each field type
354 pairs: list[str] = []
355 for field, value in header_fields.items():
356 if field in QUOTED_AUTH_FIELDS:
357 pairs.append(f'{field}="{value}"')
358 else:
359 pairs.append(f"{field}={value}")
361 return f"Digest {', '.join(pairs)}"
363 def _in_protection_space(self, url: URL) -> bool:
364 """
365 Check if the given URL is within the current protection space.
367 According to RFC 7616, a URI is in the protection space if any URI
368 in the protection space is a prefix of it (after both have been made absolute).
369 """
370 request_str = str(url)
371 for space_str in self._protection_space:
372 # Check if request starts with space URL
373 if not request_str.startswith(space_str):
374 continue
375 # Exact match or space ends with / (proper directory prefix)
376 if len(request_str) == len(space_str) or space_str[-1] == "/":
377 return True
378 # Check next char is / to ensure proper path boundary
379 if request_str[len(space_str)] == "/":
380 return True
381 return False
383 def _authenticate(self, response: ClientResponse) -> bool:
384 """
385 Takes the given response and tries digest-auth, if needed.
387 Returns true if the original request must be resent.
388 """
389 if response.status != 401:
390 return False
392 auth_header = response.headers.get("www-authenticate", "")
393 if not auth_header:
394 return False # No authentication header present
396 method, sep, headers = auth_header.partition(" ")
397 if not sep:
398 # No space found in www-authenticate header
399 return False # Malformed auth header, missing scheme separator
401 if method.lower() != "digest":
402 # Not a digest auth challenge (could be Basic, Bearer, etc.)
403 return False
405 if not headers:
406 # We have a digest scheme but no parameters
407 return False # Malformed digest header, missing parameters
409 # We have a digest auth header with content
410 if not (header_pairs := parse_header_pairs(headers)):
411 # Failed to parse any key-value pairs
412 return False # Malformed digest header, no valid parameters
414 # Extract challenge parameters
415 self._challenge = {}
416 for field in CHALLENGE_FIELDS:
417 if (value := header_pairs.get(field)) is not None:
418 self._challenge[field] = value
420 # Update protection space based on domain parameter or default to origin
421 origin = response.url.origin()
423 if domain := self._challenge.get("domain"):
424 # Parse space-separated list of URIs
425 self._protection_space = []
426 for uri in domain.split():
427 # Remove quotes if present
428 uri = uri.strip('"')
429 if uri.startswith("/"):
430 # Path-absolute, relative to origin
431 self._protection_space.append(str(origin.join(URL(uri))))
432 else:
433 # Absolute URI
434 self._protection_space.append(str(URL(uri)))
435 else:
436 # No domain specified, protection space is entire origin
437 self._protection_space = [str(origin)]
439 # Return True only if we found at least one challenge parameter
440 return bool(self._challenge)
442 async def __call__(
443 self, request: ClientRequest, handler: ClientHandlerType
444 ) -> ClientResponse:
445 """Run the digest auth middleware."""
446 response = None
447 for retry_count in range(2):
448 # Apply authorization header if:
449 # 1. This is a retry after 401 (retry_count > 0), OR
450 # 2. Preemptive auth is enabled AND we have a challenge AND the URL is in protection space
451 if retry_count > 0 or (
452 self._preemptive
453 and self._challenge
454 and self._in_protection_space(request.url)
455 ):
456 request.headers[hdrs.AUTHORIZATION] = await self._encode(
457 request.method, request.url, request.body
458 )
460 # Send the request
461 response = await handler(request)
463 # Check if we need to authenticate
464 if not self._authenticate(response):
465 break
467 # At this point, response is guaranteed to be defined
468 assert response is not None
469 return response