Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_middleware_digest_auth.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Digest authentication middleware for aiohttp client.
4This middleware implements HTTP Digest Authentication according to RFC 7616,
5providing a more secure alternative to Basic Authentication. It supports all
6standard hash algorithms including MD5, SHA, SHA-256, SHA-512 and their session
7variants, as well as both 'auth' and 'auth-int' quality of protection (qop) options.
8"""
10import hashlib
11import os
12import re
13import sys
14import time
15from collections.abc import Callable
16from typing import Final, Literal, TypedDict
18from yarl import URL
20from . import hdrs
21from .client_exceptions import ClientError
22from .client_middlewares import ClientHandlerType
23from .client_reqrep import ClientRequest, ClientResponse
24from .payload import Payload
27class DigestAuthChallenge(TypedDict, total=False):
28 realm: str
29 nonce: str
30 qop: str
31 algorithm: str
32 opaque: str
33 domain: str
34 stale: str
37DigestFunctions: dict[str, Callable[[bytes], "hashlib._Hash"]] = {
38 "MD5": hashlib.md5,
39 "MD5-SESS": hashlib.md5,
40 "SHA": hashlib.sha1,
41 "SHA-SESS": hashlib.sha1,
42 "SHA256": hashlib.sha256,
43 "SHA256-SESS": hashlib.sha256,
44 "SHA-256": hashlib.sha256,
45 "SHA-256-SESS": hashlib.sha256,
46 "SHA512": hashlib.sha512,
47 "SHA512-SESS": hashlib.sha512,
48 "SHA-512": hashlib.sha512,
49 "SHA-512-SESS": hashlib.sha512,
50}
53# Compile the regex pattern once at module level for performance
54_HEADER_PAIRS_PATTERN = re.compile(
55 r'(?:^|\s|,\s*)(\w+)\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))'
56 if sys.version_info < (3, 11)
57 else r'(?:^|\s|,\s*)((?>\w+))\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))'
58 # +------------|--------|--|-|-|--|----|------|----|--||-----|-> Match valid start/sep
59 # +--------|--|-|-|--|----|------|----|--||-----|-> alphanumeric key (atomic
60 # | | | | | | | | || | group reduces backtracking)
61 # +--|-|-|--|----|------|----|--||-----|-> maybe whitespace
62 # | | | | | | | || |
63 # +-|-|--|----|------|----|--||-----|-> = (delimiter)
64 # +-|--|----|------|----|--||-----|-> maybe whitespace
65 # | | | | | || |
66 # +--|----|------|----|--||-----|-> group quoted or unquoted
67 # | | | | || |
68 # +----|------|----|--||-----|-> if quoted...
69 # +------|----|--||-----|-> anything but " or \
70 # +----|--||-----|-> escaped characters allowed
71 # +--||-----|-> or can be empty string
72 # || |
73 # +|-----|-> if unquoted...
74 # +-----|-> anything but , or <space>
75 # +-> at least one char req'd
76)
79# RFC 7616: Challenge parameters to extract
80CHALLENGE_FIELDS: Final[
81 tuple[
82 Literal["realm", "nonce", "qop", "algorithm", "opaque", "domain", "stale"], ...
83 ]
84] = (
85 "realm",
86 "nonce",
87 "qop",
88 "algorithm",
89 "opaque",
90 "domain",
91 "stale",
92)
94# Supported digest authentication algorithms
95# Use a tuple of sorted keys for predictable documentation and error messages
96SUPPORTED_ALGORITHMS: Final[tuple[str, ...]] = tuple(sorted(DigestFunctions.keys()))
98# RFC 7616: Fields that require quoting in the Digest auth header
99# These fields must be enclosed in double quotes in the Authorization header.
100# Algorithm, qop, and nc are never quoted per RFC specifications.
101# This frozen set is used by the template-based header construction to
102# automatically determine which fields need quotes.
103QUOTED_AUTH_FIELDS: Final[frozenset[str]] = frozenset(
104 {"username", "realm", "nonce", "uri", "response", "opaque", "cnonce"}
105)
108def escape_quotes(value: str) -> str:
109 """Escape double quotes for HTTP header values."""
110 return value.replace('"', '\\"')
113def unescape_quotes(value: str) -> str:
114 """Unescape double quotes in HTTP header values."""
115 return value.replace('\\"', '"')
118def parse_header_pairs(header: str) -> dict[str, str]:
119 """
120 Parse key-value pairs from WWW-Authenticate or similar HTTP headers.
122 This function handles the complex format of WWW-Authenticate header values,
123 supporting both quoted and unquoted values, proper handling of commas in
124 quoted values, and whitespace variations per RFC 7616.
126 Examples of supported formats:
127 - key1="value1", key2=value2
128 - key1 = "value1" , key2="value, with, commas"
129 - key1=value1,key2="value2"
130 - realm="example.com", nonce="12345", qop="auth"
132 Args:
133 header: The header value string to parse
135 Returns:
136 Dictionary mapping parameter names to their values
137 """
138 return {
139 stripped_key: unescape_quotes(quoted_val) if quoted_val else unquoted_val
140 for key, quoted_val, unquoted_val in _HEADER_PAIRS_PATTERN.findall(header)
141 if (stripped_key := key.strip())
142 }
145class DigestAuthMiddleware:
146 """
147 HTTP digest authentication middleware for aiohttp client.
149 This middleware intercepts 401 Unauthorized responses containing a Digest
150 authentication challenge, calculates the appropriate digest credentials,
151 and automatically retries the request with the proper Authorization header.
153 Features:
154 - Handles all aspects of Digest authentication handshake automatically
155 - Supports all standard hash algorithms:
156 - MD5, MD5-SESS
157 - SHA, SHA-SESS
158 - SHA256, SHA256-SESS, SHA-256, SHA-256-SESS
159 - SHA512, SHA512-SESS, SHA-512, SHA-512-SESS
160 - Supports 'auth' and 'auth-int' quality of protection modes
161 - Properly handles quoted strings and parameter parsing
162 - Includes replay attack protection with client nonce count tracking
163 - Supports preemptive authentication per RFC 7616 Section 3.6
165 Origin scoping:
166 The credentials are scoped to the origin of the first request the
167 middleware handles. A request to a different origin is passed through
168 untouched, so it never receives a digest response computed from those
169 credentials, unless that origin falls within a protection space the
170 anchor origin advertised through the RFC 7616 ``domain`` directive. Make
171 the first request through the middleware against the intended origin, as
172 the anchor is pinned to it and not reset for the life of the instance.
174 Standards compliance:
175 - RFC 7616: HTTP Digest Access Authentication (primary reference)
176 - RFC 2617: HTTP Authentication (deprecated by RFC 7616)
177 - RFC 1945: Section 11.1 (username restrictions)
179 Implementation notes:
180 The core digest calculation is inspired by the implementation in
181 https://github.com/requests/requests/blob/v2.18.4/requests/auth.py
182 with added support for modern digest auth features and error handling.
183 """
185 def __init__(
186 self,
187 login: str,
188 password: str,
189 preemptive: bool = True,
190 ) -> None:
191 if login is None:
192 raise ValueError("None is not allowed as login value")
194 if password is None:
195 raise ValueError("None is not allowed as password value")
197 if ":" in login:
198 raise ValueError('A ":" is not allowed in username (RFC 1945#section-11.1)')
200 self._login_str: Final[str] = login
201 self._login_bytes: Final[bytes] = login.encode("utf-8")
202 self._password_bytes: Final[bytes] = password.encode("utf-8")
204 self._last_nonce_bytes = b""
205 self._nonce_count = 0
206 self._challenge: DigestAuthChallenge = {}
207 self._preemptive: bool = preemptive
208 # Set of URLs defining the protection space
209 self._protection_space: list[str] = []
210 # Origin the credentials are scoped to; set on the first request.
211 self._origin: URL | None = None
213 async def _encode(self, method: str, url: URL, body: Payload | Literal[b""]) -> str:
214 """
215 Build digest authorization header for the current challenge.
217 Args:
218 method: The HTTP method (GET, POST, etc.)
219 url: The request URL
220 body: The request body (used for qop=auth-int)
222 Returns:
223 A fully formatted Digest authorization header string
225 Raises:
226 ClientError: If the challenge is missing required parameters or
227 contains unsupported values
229 """
230 challenge = self._challenge
231 if "realm" not in challenge:
232 raise ClientError(
233 "Malformed Digest auth challenge: Missing 'realm' parameter"
234 )
236 if "nonce" not in challenge:
237 raise ClientError(
238 "Malformed Digest auth challenge: Missing 'nonce' parameter"
239 )
241 # Empty realm values are allowed per RFC 7616 (SHOULD, not MUST, contain host name)
242 realm = challenge["realm"]
243 nonce = challenge["nonce"]
245 # Empty nonce values are not allowed as they are security-critical for replay protection
246 if not nonce:
247 raise ClientError(
248 "Security issue: Digest auth challenge contains empty 'nonce' value"
249 )
251 qop_raw = challenge.get("qop", "")
252 # Preserve original algorithm case for response while using uppercase for processing
253 algorithm_original = challenge.get("algorithm", "MD5")
254 algorithm = algorithm_original.upper()
255 opaque = challenge.get("opaque", "")
257 # Convert string values to bytes once
258 nonce_bytes = nonce.encode("utf-8")
259 realm_bytes = realm.encode("utf-8")
260 # Use the encoded request-target (raw_path_qs) since that is what is
261 # transmitted on the wire and what the server signs against. Using the
262 # decoded form would cause digest verification to fail when the path
263 # or query string contains percent-encoded reserved characters.
264 path = URL(url).raw_path_qs
266 # Process QoP
267 qop = ""
268 qop_bytes = b""
269 if qop_raw:
270 valid_qops = {"auth", "auth-int"}.intersection(
271 {q.strip() for q in qop_raw.split(",") if q.strip()}
272 )
273 if not valid_qops:
274 raise ClientError(
275 f"Digest auth error: Unsupported Quality of Protection (qop) value(s): {qop_raw}"
276 )
278 qop = "auth-int" if "auth-int" in valid_qops else "auth"
279 qop_bytes = qop.encode("utf-8")
281 if algorithm not in DigestFunctions:
282 raise ClientError(
283 f"Digest auth error: Unsupported hash algorithm: {algorithm}. "
284 f"Supported algorithms: {', '.join(SUPPORTED_ALGORITHMS)}"
285 )
286 hash_fn: Final = DigestFunctions[algorithm]
288 def H(x: bytes) -> bytes:
289 """RFC 7616 Section 3: Hash function H(data) = hex(hash(data))."""
290 return hash_fn(x).hexdigest().encode()
292 def KD(s: bytes, d: bytes) -> bytes:
293 """RFC 7616 Section 3: KD(secret, data) = H(concat(secret, ":", data))."""
294 return H(b":".join((s, d)))
296 # Calculate A1 and A2
297 A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes))
298 A2 = f"{method.upper()}:{path}".encode()
299 if qop == "auth-int":
300 if isinstance(body, Payload): # will always be empty bytes unless Payload
301 entity_bytes = await body.as_bytes() # Get bytes from Payload
302 else:
303 entity_bytes = body
304 entity_hash = H(entity_bytes)
305 A2 = b":".join((A2, entity_hash))
307 HA1 = H(A1)
308 HA2 = H(A2)
310 # Nonce count handling
311 if nonce_bytes == self._last_nonce_bytes:
312 self._nonce_count += 1
313 else:
314 self._nonce_count = 1
316 self._last_nonce_bytes = nonce_bytes
317 ncvalue = f"{self._nonce_count:08x}"
318 ncvalue_bytes = ncvalue.encode("utf-8")
320 # Generate client nonce
321 cnonce = hashlib.sha1(
322 b"".join(
323 [
324 str(self._nonce_count).encode("utf-8"),
325 nonce_bytes,
326 time.ctime().encode("utf-8"),
327 os.urandom(8),
328 ]
329 )
330 ).hexdigest()[:16]
331 cnonce_bytes = cnonce.encode("utf-8")
333 # Special handling for session-based algorithms
334 if algorithm.upper().endswith("-SESS"):
335 HA1 = H(b":".join((HA1, nonce_bytes, cnonce_bytes)))
337 # Calculate the response digest
338 if qop:
339 noncebit = b":".join(
340 (nonce_bytes, ncvalue_bytes, cnonce_bytes, qop_bytes, HA2)
341 )
342 response_digest = KD(HA1, noncebit)
343 else:
344 response_digest = KD(HA1, b":".join((nonce_bytes, HA2)))
346 # Define a dict mapping of header fields to their values
347 # Group fields into always-present, optional, and qop-dependent
348 header_fields = {
349 # Always present fields
350 "username": escape_quotes(self._login_str),
351 "realm": escape_quotes(realm),
352 "nonce": escape_quotes(nonce),
353 "uri": path,
354 "response": response_digest.decode(),
355 "algorithm": algorithm_original,
356 }
358 # Optional fields
359 if opaque:
360 header_fields["opaque"] = escape_quotes(opaque)
362 # QoP-dependent fields
363 if qop:
364 header_fields["qop"] = qop
365 header_fields["nc"] = ncvalue
366 header_fields["cnonce"] = cnonce
368 # Build header using templates for each field type
369 pairs: list[str] = []
370 for field, value in header_fields.items():
371 if field in QUOTED_AUTH_FIELDS:
372 pairs.append(f'{field}="{value}"')
373 else:
374 pairs.append(f"{field}={value}")
376 return f"Digest {', '.join(pairs)}"
378 def _in_protection_space(self, url: URL) -> bool:
379 """
380 Check if the given URL is within the current protection space.
382 According to RFC 7616, a URI is in the protection space if any URI
383 in the protection space is a prefix of it (after both have been made absolute).
384 """
385 request_str = str(url)
386 for space_str in self._protection_space:
387 # Check if request starts with space URL
388 if not request_str.startswith(space_str):
389 continue
390 # Exact match or space ends with / (proper directory prefix)
391 if len(request_str) == len(space_str) or space_str[-1] == "/":
392 return True
393 # Check next char is / to ensure proper path boundary
394 if request_str[len(space_str)] == "/":
395 return True
396 return False
398 def _authenticate(self, response: ClientResponse) -> bool:
399 """
400 Takes the given response and tries digest-auth, if needed.
402 Returns true if the original request must be resent.
403 """
404 if response.status != 401:
405 return False
407 auth_header = response.headers.get("www-authenticate", "")
408 if not auth_header:
409 return False # No authentication header present
411 method, sep, headers = auth_header.partition(" ")
412 if not sep:
413 # No space found in www-authenticate header
414 return False # Malformed auth header, missing scheme separator
416 if method.lower() != "digest":
417 # Not a digest auth challenge (could be Basic, Bearer, etc.)
418 return False
420 if not headers:
421 # We have a digest scheme but no parameters
422 return False # Malformed digest header, missing parameters
424 # We have a digest auth header with content
425 if not (header_pairs := parse_header_pairs(headers)):
426 # Failed to parse any key-value pairs
427 return False # Malformed digest header, no valid parameters
429 # Extract challenge parameters
430 self._challenge = {}
431 for field in CHALLENGE_FIELDS:
432 if (value := header_pairs.get(field)) is not None:
433 self._challenge[field] = value
435 # Update protection space based on domain parameter or default to origin
436 origin = response.url.origin()
438 if domain := self._challenge.get("domain"):
439 # Parse space-separated list of URIs
440 self._protection_space = []
441 for uri in domain.split():
442 # Remove quotes if present
443 uri = uri.strip('"')
444 if uri.startswith("/"):
445 # Path-absolute, relative to origin
446 self._protection_space.append(str(origin.join(URL(uri))))
447 else:
448 # Absolute URI
449 self._protection_space.append(str(URL(uri)))
450 else:
451 # No domain specified, protection space is entire origin
452 self._protection_space = [str(origin)]
454 # Return True only if we found at least one challenge parameter
455 return bool(self._challenge)
457 async def __call__(
458 self, request: ClientRequest, handler: ClientHandlerType
459 ) -> ClientResponse:
460 """Run the digest auth middleware."""
461 # Credentials are scoped to the first request's origin. Other origins
462 # pass through untouched unless a challenge from the anchor origin
463 # advertised them via RFC 7616 domain; mirrors aiohttp stripping
464 # Authorization on cross-origin redirects.
465 origin = request.url.origin()
466 if self._origin is None:
467 self._origin = origin
468 elif origin != self._origin and not self._in_protection_space(request.url):
469 return await handler(request)
471 response = None
472 for retry_count in range(2):
473 # Apply authorization header if:
474 # 1. This is a retry after 401 (retry_count > 0), OR
475 # 2. Preemptive auth is enabled AND we have a challenge AND the URL is in protection space
476 if retry_count > 0 or (
477 self._preemptive
478 and self._challenge
479 and self._in_protection_space(request.url)
480 ):
481 request.headers[hdrs.AUTHORIZATION] = await self._encode(
482 request.method, request.url, request.body
483 )
485 # Send the request
486 response = await handler(request)
488 # Check if we need to authenticate
489 if not self._authenticate(response):
490 break
492 # At this point, response is guaranteed to be defined
493 assert response is not None
494 return response