Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_middleware_digest_auth.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

173 statements  

1""" 

2Digest authentication middleware for aiohttp client. 

3 

4This middleware implements HTTP Digest Authentication according to RFC 7616, 

5providing a more secure alternative to Basic Authentication. It supports all 

6standard hash algorithms including MD5, SHA, SHA-256, SHA-512 and their session 

7variants, as well as both 'auth' and 'auth-int' quality of protection (qop) options. 

8""" 

9 

10import hashlib 

11import os 

12import re 

13import sys 

14import time 

15from collections.abc import Callable 

16from typing import Final, Literal, TypedDict 

17 

18from yarl import URL 

19 

20from . import hdrs 

21from .client_exceptions import ClientError 

22from .client_middlewares import ClientHandlerType 

23from .client_reqrep import ClientRequest, ClientResponse 

24from .payload import Payload 

25 

26 

27class DigestAuthChallenge(TypedDict, total=False): 

28 realm: str 

29 nonce: str 

30 qop: str 

31 algorithm: str 

32 opaque: str 

33 domain: str 

34 stale: str 

35 

36 

37DigestFunctions: dict[str, Callable[[bytes], "hashlib._Hash"]] = { 

38 "MD5": hashlib.md5, 

39 "MD5-SESS": hashlib.md5, 

40 "SHA": hashlib.sha1, 

41 "SHA-SESS": hashlib.sha1, 

42 "SHA256": hashlib.sha256, 

43 "SHA256-SESS": hashlib.sha256, 

44 "SHA-256": hashlib.sha256, 

45 "SHA-256-SESS": hashlib.sha256, 

46 "SHA512": hashlib.sha512, 

47 "SHA512-SESS": hashlib.sha512, 

48 "SHA-512": hashlib.sha512, 

49 "SHA-512-SESS": hashlib.sha512, 

50} 

51 

52 

53# Compile the regex pattern once at module level for performance 

54_HEADER_PAIRS_PATTERN = re.compile( 

55 r'(?:^|\s|,\s*)(\w+)\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))' 

56 if sys.version_info < (3, 11) 

57 else r'(?:^|\s|,\s*)((?>\w+))\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))' 

58 # +------------|--------|--|-|-|--|----|------|----|--||-----|-> Match valid start/sep 

59 # +--------|--|-|-|--|----|------|----|--||-----|-> alphanumeric key (atomic 

60 # | | | | | | | | || | group reduces backtracking) 

61 # +--|-|-|--|----|------|----|--||-----|-> maybe whitespace 

62 # | | | | | | | || | 

63 # +-|-|--|----|------|----|--||-----|-> = (delimiter) 

64 # +-|--|----|------|----|--||-----|-> maybe whitespace 

65 # | | | | | || | 

66 # +--|----|------|----|--||-----|-> group quoted or unquoted 

67 # | | | | || | 

68 # +----|------|----|--||-----|-> if quoted... 

69 # +------|----|--||-----|-> anything but " or \ 

70 # +----|--||-----|-> escaped characters allowed 

71 # +--||-----|-> or can be empty string 

72 # || | 

73 # +|-----|-> if unquoted... 

74 # +-----|-> anything but , or <space> 

75 # +-> at least one char req'd 

76) 

77 

78 

79# RFC 7616: Challenge parameters to extract 

80CHALLENGE_FIELDS: Final[ 

81 tuple[ 

82 Literal["realm", "nonce", "qop", "algorithm", "opaque", "domain", "stale"], ... 

83 ] 

84] = ( 

85 "realm", 

86 "nonce", 

87 "qop", 

88 "algorithm", 

89 "opaque", 

90 "domain", 

91 "stale", 

92) 

93 

94# Supported digest authentication algorithms 

95# Use a tuple of sorted keys for predictable documentation and error messages 

96SUPPORTED_ALGORITHMS: Final[tuple[str, ...]] = tuple(sorted(DigestFunctions.keys())) 

97 

98# RFC 7616: Fields that require quoting in the Digest auth header 

99# These fields must be enclosed in double quotes in the Authorization header. 

100# Algorithm, qop, and nc are never quoted per RFC specifications. 

101# This frozen set is used by the template-based header construction to 

102# automatically determine which fields need quotes. 

103QUOTED_AUTH_FIELDS: Final[frozenset[str]] = frozenset( 

104 {"username", "realm", "nonce", "uri", "response", "opaque", "cnonce"} 

105) 

106 

107 

108def escape_quotes(value: str) -> str: 

109 """Escape double quotes for HTTP header values.""" 

110 return value.replace('"', '\\"') 

111 

112 

113def unescape_quotes(value: str) -> str: 

114 """Unescape double quotes in HTTP header values.""" 

115 return value.replace('\\"', '"') 

116 

117 

118def parse_header_pairs(header: str) -> dict[str, str]: 

119 """ 

120 Parse key-value pairs from WWW-Authenticate or similar HTTP headers. 

121 

122 This function handles the complex format of WWW-Authenticate header values, 

123 supporting both quoted and unquoted values, proper handling of commas in 

124 quoted values, and whitespace variations per RFC 7616. 

125 

126 Examples of supported formats: 

127 - key1="value1", key2=value2 

128 - key1 = "value1" , key2="value, with, commas" 

129 - key1=value1,key2="value2" 

130 - realm="example.com", nonce="12345", qop="auth" 

131 

132 Args: 

133 header: The header value string to parse 

134 

135 Returns: 

136 Dictionary mapping parameter names to their values 

137 """ 

138 return { 

139 stripped_key: unescape_quotes(quoted_val) if quoted_val else unquoted_val 

140 for key, quoted_val, unquoted_val in _HEADER_PAIRS_PATTERN.findall(header) 

141 if (stripped_key := key.strip()) 

142 } 

143 

144 

145class DigestAuthMiddleware: 

146 """ 

147 HTTP digest authentication middleware for aiohttp client. 

148 

149 This middleware intercepts 401 Unauthorized responses containing a Digest 

150 authentication challenge, calculates the appropriate digest credentials, 

151 and automatically retries the request with the proper Authorization header. 

152 

153 Features: 

154 - Handles all aspects of Digest authentication handshake automatically 

155 - Supports all standard hash algorithms: 

156 - MD5, MD5-SESS 

157 - SHA, SHA-SESS 

158 - SHA256, SHA256-SESS, SHA-256, SHA-256-SESS 

159 - SHA512, SHA512-SESS, SHA-512, SHA-512-SESS 

160 - Supports 'auth' and 'auth-int' quality of protection modes 

161 - Properly handles quoted strings and parameter parsing 

162 - Includes replay attack protection with client nonce count tracking 

163 - Supports preemptive authentication per RFC 7616 Section 3.6 

164 

165 Origin scoping: 

166 The credentials are scoped to the origin of the first request the 

167 middleware handles. A request to a different origin is passed through 

168 untouched, so it never receives a digest response computed from those 

169 credentials, unless that origin falls within a protection space the 

170 anchor origin advertised through the RFC 7616 ``domain`` directive. Make 

171 the first request through the middleware against the intended origin, as 

172 the anchor is pinned to it and not reset for the life of the instance. 

173 

174 Standards compliance: 

175 - RFC 7616: HTTP Digest Access Authentication (primary reference) 

176 - RFC 2617: HTTP Authentication (deprecated by RFC 7616) 

177 - RFC 1945: Section 11.1 (username restrictions) 

178 

179 Implementation notes: 

180 The core digest calculation is inspired by the implementation in 

181 https://github.com/requests/requests/blob/v2.18.4/requests/auth.py 

182 with added support for modern digest auth features and error handling. 

183 """ 

184 

185 def __init__( 

186 self, 

187 login: str, 

188 password: str, 

189 preemptive: bool = True, 

190 ) -> None: 

191 if login is None: 

192 raise ValueError("None is not allowed as login value") 

193 

194 if password is None: 

195 raise ValueError("None is not allowed as password value") 

196 

197 if ":" in login: 

198 raise ValueError('A ":" is not allowed in username (RFC 1945#section-11.1)') 

199 

200 self._login_str: Final[str] = login 

201 self._login_bytes: Final[bytes] = login.encode("utf-8") 

202 self._password_bytes: Final[bytes] = password.encode("utf-8") 

203 

204 self._last_nonce_bytes = b"" 

205 self._nonce_count = 0 

206 self._challenge: DigestAuthChallenge = {} 

207 self._preemptive: bool = preemptive 

208 # Set of URLs defining the protection space 

209 self._protection_space: list[str] = [] 

210 # Origin the credentials are scoped to; set on the first request. 

211 self._origin: URL | None = None 

212 

213 async def _encode(self, method: str, url: URL, body: Payload | Literal[b""]) -> str: 

214 """ 

215 Build digest authorization header for the current challenge. 

216 

217 Args: 

218 method: The HTTP method (GET, POST, etc.) 

219 url: The request URL 

220 body: The request body (used for qop=auth-int) 

221 

222 Returns: 

223 A fully formatted Digest authorization header string 

224 

225 Raises: 

226 ClientError: If the challenge is missing required parameters or 

227 contains unsupported values 

228 

229 """ 

230 challenge = self._challenge 

231 if "realm" not in challenge: 

232 raise ClientError( 

233 "Malformed Digest auth challenge: Missing 'realm' parameter" 

234 ) 

235 

236 if "nonce" not in challenge: 

237 raise ClientError( 

238 "Malformed Digest auth challenge: Missing 'nonce' parameter" 

239 ) 

240 

241 # Empty realm values are allowed per RFC 7616 (SHOULD, not MUST, contain host name) 

242 realm = challenge["realm"] 

243 nonce = challenge["nonce"] 

244 

245 # Empty nonce values are not allowed as they are security-critical for replay protection 

246 if not nonce: 

247 raise ClientError( 

248 "Security issue: Digest auth challenge contains empty 'nonce' value" 

249 ) 

250 

251 qop_raw = challenge.get("qop", "") 

252 # Preserve original algorithm case for response while using uppercase for processing 

253 algorithm_original = challenge.get("algorithm", "MD5") 

254 algorithm = algorithm_original.upper() 

255 opaque = challenge.get("opaque", "") 

256 

257 # Convert string values to bytes once 

258 nonce_bytes = nonce.encode("utf-8") 

259 realm_bytes = realm.encode("utf-8") 

260 # Use the encoded request-target (raw_path_qs) since that is what is 

261 # transmitted on the wire and what the server signs against. Using the 

262 # decoded form would cause digest verification to fail when the path 

263 # or query string contains percent-encoded reserved characters. 

264 path = URL(url).raw_path_qs 

265 

266 # Process QoP 

267 qop = "" 

268 qop_bytes = b"" 

269 if qop_raw: 

270 valid_qops = {"auth", "auth-int"}.intersection( 

271 {q.strip() for q in qop_raw.split(",") if q.strip()} 

272 ) 

273 if not valid_qops: 

274 raise ClientError( 

275 f"Digest auth error: Unsupported Quality of Protection (qop) value(s): {qop_raw}" 

276 ) 

277 

278 qop = "auth-int" if "auth-int" in valid_qops else "auth" 

279 qop_bytes = qop.encode("utf-8") 

280 

281 if algorithm not in DigestFunctions: 

282 raise ClientError( 

283 f"Digest auth error: Unsupported hash algorithm: {algorithm}. " 

284 f"Supported algorithms: {', '.join(SUPPORTED_ALGORITHMS)}" 

285 ) 

286 hash_fn: Final = DigestFunctions[algorithm] 

287 

288 def H(x: bytes) -> bytes: 

289 """RFC 7616 Section 3: Hash function H(data) = hex(hash(data)).""" 

290 return hash_fn(x).hexdigest().encode() 

291 

292 def KD(s: bytes, d: bytes) -> bytes: 

293 """RFC 7616 Section 3: KD(secret, data) = H(concat(secret, ":", data)).""" 

294 return H(b":".join((s, d))) 

295 

296 # Calculate A1 and A2 

297 A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes)) 

298 A2 = f"{method.upper()}:{path}".encode() 

299 if qop == "auth-int": 

300 if isinstance(body, Payload): # will always be empty bytes unless Payload 

301 entity_bytes = await body.as_bytes() # Get bytes from Payload 

302 else: 

303 entity_bytes = body 

304 entity_hash = H(entity_bytes) 

305 A2 = b":".join((A2, entity_hash)) 

306 

307 HA1 = H(A1) 

308 HA2 = H(A2) 

309 

310 # Nonce count handling 

311 if nonce_bytes == self._last_nonce_bytes: 

312 self._nonce_count += 1 

313 else: 

314 self._nonce_count = 1 

315 

316 self._last_nonce_bytes = nonce_bytes 

317 ncvalue = f"{self._nonce_count:08x}" 

318 ncvalue_bytes = ncvalue.encode("utf-8") 

319 

320 # Generate client nonce 

321 cnonce = hashlib.sha1( 

322 b"".join( 

323 [ 

324 str(self._nonce_count).encode("utf-8"), 

325 nonce_bytes, 

326 time.ctime().encode("utf-8"), 

327 os.urandom(8), 

328 ] 

329 ) 

330 ).hexdigest()[:16] 

331 cnonce_bytes = cnonce.encode("utf-8") 

332 

333 # Special handling for session-based algorithms 

334 if algorithm.upper().endswith("-SESS"): 

335 HA1 = H(b":".join((HA1, nonce_bytes, cnonce_bytes))) 

336 

337 # Calculate the response digest 

338 if qop: 

339 noncebit = b":".join( 

340 (nonce_bytes, ncvalue_bytes, cnonce_bytes, qop_bytes, HA2) 

341 ) 

342 response_digest = KD(HA1, noncebit) 

343 else: 

344 response_digest = KD(HA1, b":".join((nonce_bytes, HA2))) 

345 

346 # Define a dict mapping of header fields to their values 

347 # Group fields into always-present, optional, and qop-dependent 

348 header_fields = { 

349 # Always present fields 

350 "username": escape_quotes(self._login_str), 

351 "realm": escape_quotes(realm), 

352 "nonce": escape_quotes(nonce), 

353 "uri": path, 

354 "response": response_digest.decode(), 

355 "algorithm": algorithm_original, 

356 } 

357 

358 # Optional fields 

359 if opaque: 

360 header_fields["opaque"] = escape_quotes(opaque) 

361 

362 # QoP-dependent fields 

363 if qop: 

364 header_fields["qop"] = qop 

365 header_fields["nc"] = ncvalue 

366 header_fields["cnonce"] = cnonce 

367 

368 # Build header using templates for each field type 

369 pairs: list[str] = [] 

370 for field, value in header_fields.items(): 

371 if field in QUOTED_AUTH_FIELDS: 

372 pairs.append(f'{field}="{value}"') 

373 else: 

374 pairs.append(f"{field}={value}") 

375 

376 return f"Digest {', '.join(pairs)}" 

377 

378 def _in_protection_space(self, url: URL) -> bool: 

379 """ 

380 Check if the given URL is within the current protection space. 

381 

382 According to RFC 7616, a URI is in the protection space if any URI 

383 in the protection space is a prefix of it (after both have been made absolute). 

384 """ 

385 request_str = str(url) 

386 for space_str in self._protection_space: 

387 # Check if request starts with space URL 

388 if not request_str.startswith(space_str): 

389 continue 

390 # Exact match or space ends with / (proper directory prefix) 

391 if len(request_str) == len(space_str) or space_str[-1] == "/": 

392 return True 

393 # Check next char is / to ensure proper path boundary 

394 if request_str[len(space_str)] == "/": 

395 return True 

396 return False 

397 

398 def _authenticate(self, response: ClientResponse) -> bool: 

399 """ 

400 Takes the given response and tries digest-auth, if needed. 

401 

402 Returns true if the original request must be resent. 

403 """ 

404 if response.status != 401: 

405 return False 

406 

407 auth_header = response.headers.get("www-authenticate", "") 

408 if not auth_header: 

409 return False # No authentication header present 

410 

411 method, sep, headers = auth_header.partition(" ") 

412 if not sep: 

413 # No space found in www-authenticate header 

414 return False # Malformed auth header, missing scheme separator 

415 

416 if method.lower() != "digest": 

417 # Not a digest auth challenge (could be Basic, Bearer, etc.) 

418 return False 

419 

420 if not headers: 

421 # We have a digest scheme but no parameters 

422 return False # Malformed digest header, missing parameters 

423 

424 # We have a digest auth header with content 

425 if not (header_pairs := parse_header_pairs(headers)): 

426 # Failed to parse any key-value pairs 

427 return False # Malformed digest header, no valid parameters 

428 

429 # Extract challenge parameters 

430 self._challenge = {} 

431 for field in CHALLENGE_FIELDS: 

432 if (value := header_pairs.get(field)) is not None: 

433 self._challenge[field] = value 

434 

435 # Update protection space based on domain parameter or default to origin 

436 origin = response.url.origin() 

437 

438 if domain := self._challenge.get("domain"): 

439 # Parse space-separated list of URIs 

440 self._protection_space = [] 

441 for uri in domain.split(): 

442 # Remove quotes if present 

443 uri = uri.strip('"') 

444 if uri.startswith("/"): 

445 # Path-absolute, relative to origin 

446 self._protection_space.append(str(origin.join(URL(uri)))) 

447 else: 

448 # Absolute URI 

449 self._protection_space.append(str(URL(uri))) 

450 else: 

451 # No domain specified, protection space is entire origin 

452 self._protection_space = [str(origin)] 

453 

454 # Return True only if we found at least one challenge parameter 

455 return bool(self._challenge) 

456 

457 async def __call__( 

458 self, request: ClientRequest, handler: ClientHandlerType 

459 ) -> ClientResponse: 

460 """Run the digest auth middleware.""" 

461 # Credentials are scoped to the first request's origin. Other origins 

462 # pass through untouched unless a challenge from the anchor origin 

463 # advertised them via RFC 7616 domain; mirrors aiohttp stripping 

464 # Authorization on cross-origin redirects. 

465 origin = request.url.origin() 

466 if self._origin is None: 

467 self._origin = origin 

468 elif origin != self._origin and not self._in_protection_space(request.url): 

469 return await handler(request) 

470 

471 response = None 

472 for retry_count in range(2): 

473 # Apply authorization header if: 

474 # 1. This is a retry after 401 (retry_count > 0), OR 

475 # 2. Preemptive auth is enabled AND we have a challenge AND the URL is in protection space 

476 if retry_count > 0 or ( 

477 self._preemptive 

478 and self._challenge 

479 and self._in_protection_space(request.url) 

480 ): 

481 request.headers[hdrs.AUTHORIZATION] = await self._encode( 

482 request.method, request.url, request.body 

483 ) 

484 

485 # Send the request 

486 response = await handler(request) 

487 

488 # Check if we need to authenticate 

489 if not self._authenticate(response): 

490 break 

491 

492 # At this point, response is guaranteed to be defined 

493 assert response is not None 

494 return response