Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_middleware_digest_auth.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

167 statements  

1""" 

2Digest authentication middleware for aiohttp client. 

3 

4This middleware implements HTTP Digest Authentication according to RFC 7616, 

5providing a more secure alternative to Basic Authentication. It supports all 

6standard hash algorithms including MD5, SHA, SHA-256, SHA-512 and their session 

7variants, as well as both 'auth' and 'auth-int' quality of protection (qop) options. 

8""" 

9 

10import hashlib 

11import os 

12import re 

13import sys 

14import time 

15from collections.abc import Callable 

16from typing import Final, Literal, TypedDict 

17 

18from yarl import URL 

19 

20from . import hdrs 

21from .client_exceptions import ClientError 

22from .client_middlewares import ClientHandlerType 

23from .client_reqrep import ClientRequest, ClientResponse 

24from .payload import Payload 

25 

26 

27class DigestAuthChallenge(TypedDict, total=False): 

28 realm: str 

29 nonce: str 

30 qop: str 

31 algorithm: str 

32 opaque: str 

33 domain: str 

34 stale: str 

35 

36 

37DigestFunctions: dict[str, Callable[[bytes], "hashlib._Hash"]] = { 

38 "MD5": hashlib.md5, 

39 "MD5-SESS": hashlib.md5, 

40 "SHA": hashlib.sha1, 

41 "SHA-SESS": hashlib.sha1, 

42 "SHA256": hashlib.sha256, 

43 "SHA256-SESS": hashlib.sha256, 

44 "SHA-256": hashlib.sha256, 

45 "SHA-256-SESS": hashlib.sha256, 

46 "SHA512": hashlib.sha512, 

47 "SHA512-SESS": hashlib.sha512, 

48 "SHA-512": hashlib.sha512, 

49 "SHA-512-SESS": hashlib.sha512, 

50} 

51 

52 

53# Compile the regex pattern once at module level for performance 

54_HEADER_PAIRS_PATTERN = re.compile( 

55 r'(?:^|\s|,\s*)(\w+)\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))' 

56 if sys.version_info < (3, 11) 

57 else r'(?:^|\s|,\s*)((?>\w+))\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))' 

58 # +------------|--------|--|-|-|--|----|------|----|--||-----|-> Match valid start/sep 

59 # +--------|--|-|-|--|----|------|----|--||-----|-> alphanumeric key (atomic 

60 # | | | | | | | | || | group reduces backtracking) 

61 # +--|-|-|--|----|------|----|--||-----|-> maybe whitespace 

62 # | | | | | | | || | 

63 # +-|-|--|----|------|----|--||-----|-> = (delimiter) 

64 # +-|--|----|------|----|--||-----|-> maybe whitespace 

65 # | | | | | || | 

66 # +--|----|------|----|--||-----|-> group quoted or unquoted 

67 # | | | | || | 

68 # +----|------|----|--||-----|-> if quoted... 

69 # +------|----|--||-----|-> anything but " or \ 

70 # +----|--||-----|-> escaped characters allowed 

71 # +--||-----|-> or can be empty string 

72 # || | 

73 # +|-----|-> if unquoted... 

74 # +-----|-> anything but , or <space> 

75 # +-> at least one char req'd 

76) 

77 

78 

79# RFC 7616: Challenge parameters to extract 

80CHALLENGE_FIELDS: Final[ 

81 tuple[ 

82 Literal["realm", "nonce", "qop", "algorithm", "opaque", "domain", "stale"], ... 

83 ] 

84] = ( 

85 "realm", 

86 "nonce", 

87 "qop", 

88 "algorithm", 

89 "opaque", 

90 "domain", 

91 "stale", 

92) 

93 

94# Supported digest authentication algorithms 

95# Use a tuple of sorted keys for predictable documentation and error messages 

96SUPPORTED_ALGORITHMS: Final[tuple[str, ...]] = tuple(sorted(DigestFunctions.keys())) 

97 

98# RFC 7616: Fields that require quoting in the Digest auth header 

99# These fields must be enclosed in double quotes in the Authorization header. 

100# Algorithm, qop, and nc are never quoted per RFC specifications. 

101# This frozen set is used by the template-based header construction to 

102# automatically determine which fields need quotes. 

103QUOTED_AUTH_FIELDS: Final[frozenset[str]] = frozenset( 

104 {"username", "realm", "nonce", "uri", "response", "opaque", "cnonce"} 

105) 

106 

107 

108def escape_quotes(value: str) -> str: 

109 """Escape double quotes for HTTP header values.""" 

110 return value.replace('"', '\\"') 

111 

112 

113def unescape_quotes(value: str) -> str: 

114 """Unescape double quotes in HTTP header values.""" 

115 return value.replace('\\"', '"') 

116 

117 

118def parse_header_pairs(header: str) -> dict[str, str]: 

119 """ 

120 Parse key-value pairs from WWW-Authenticate or similar HTTP headers. 

121 

122 This function handles the complex format of WWW-Authenticate header values, 

123 supporting both quoted and unquoted values, proper handling of commas in 

124 quoted values, and whitespace variations per RFC 7616. 

125 

126 Examples of supported formats: 

127 - key1="value1", key2=value2 

128 - key1 = "value1" , key2="value, with, commas" 

129 - key1=value1,key2="value2" 

130 - realm="example.com", nonce="12345", qop="auth" 

131 

132 Args: 

133 header: The header value string to parse 

134 

135 Returns: 

136 Dictionary mapping parameter names to their values 

137 """ 

138 return { 

139 stripped_key: unescape_quotes(quoted_val) if quoted_val else unquoted_val 

140 for key, quoted_val, unquoted_val in _HEADER_PAIRS_PATTERN.findall(header) 

141 if (stripped_key := key.strip()) 

142 } 

143 

144 

145class DigestAuthMiddleware: 

146 """ 

147 HTTP digest authentication middleware for aiohttp client. 

148 

149 This middleware intercepts 401 Unauthorized responses containing a Digest 

150 authentication challenge, calculates the appropriate digest credentials, 

151 and automatically retries the request with the proper Authorization header. 

152 

153 Features: 

154 - Handles all aspects of Digest authentication handshake automatically 

155 - Supports all standard hash algorithms: 

156 - MD5, MD5-SESS 

157 - SHA, SHA-SESS 

158 - SHA256, SHA256-SESS, SHA-256, SHA-256-SESS 

159 - SHA512, SHA512-SESS, SHA-512, SHA-512-SESS 

160 - Supports 'auth' and 'auth-int' quality of protection modes 

161 - Properly handles quoted strings and parameter parsing 

162 - Includes replay attack protection with client nonce count tracking 

163 - Supports preemptive authentication per RFC 7616 Section 3.6 

164 

165 Standards compliance: 

166 - RFC 7616: HTTP Digest Access Authentication (primary reference) 

167 - RFC 2617: HTTP Authentication (deprecated by RFC 7616) 

168 - RFC 1945: Section 11.1 (username restrictions) 

169 

170 Implementation notes: 

171 The core digest calculation is inspired by the implementation in 

172 https://github.com/requests/requests/blob/v2.18.4/requests/auth.py 

173 with added support for modern digest auth features and error handling. 

174 """ 

175 

176 def __init__( 

177 self, 

178 login: str, 

179 password: str, 

180 preemptive: bool = True, 

181 ) -> None: 

182 if login is None: 

183 raise ValueError("None is not allowed as login value") 

184 

185 if password is None: 

186 raise ValueError("None is not allowed as password value") 

187 

188 if ":" in login: 

189 raise ValueError('A ":" is not allowed in username (RFC 1945#section-11.1)') 

190 

191 self._login_str: Final[str] = login 

192 self._login_bytes: Final[bytes] = login.encode("utf-8") 

193 self._password_bytes: Final[bytes] = password.encode("utf-8") 

194 

195 self._last_nonce_bytes = b"" 

196 self._nonce_count = 0 

197 self._challenge: DigestAuthChallenge = {} 

198 self._preemptive: bool = preemptive 

199 # Set of URLs defining the protection space 

200 self._protection_space: list[str] = [] 

201 

202 async def _encode(self, method: str, url: URL, body: Payload | Literal[b""]) -> str: 

203 """ 

204 Build digest authorization header for the current challenge. 

205 

206 Args: 

207 method: The HTTP method (GET, POST, etc.) 

208 url: The request URL 

209 body: The request body (used for qop=auth-int) 

210 

211 Returns: 

212 A fully formatted Digest authorization header string 

213 

214 Raises: 

215 ClientError: If the challenge is missing required parameters or 

216 contains unsupported values 

217 

218 """ 

219 challenge = self._challenge 

220 if "realm" not in challenge: 

221 raise ClientError( 

222 "Malformed Digest auth challenge: Missing 'realm' parameter" 

223 ) 

224 

225 if "nonce" not in challenge: 

226 raise ClientError( 

227 "Malformed Digest auth challenge: Missing 'nonce' parameter" 

228 ) 

229 

230 # Empty realm values are allowed per RFC 7616 (SHOULD, not MUST, contain host name) 

231 realm = challenge["realm"] 

232 nonce = challenge["nonce"] 

233 

234 # Empty nonce values are not allowed as they are security-critical for replay protection 

235 if not nonce: 

236 raise ClientError( 

237 "Security issue: Digest auth challenge contains empty 'nonce' value" 

238 ) 

239 

240 qop_raw = challenge.get("qop", "") 

241 # Preserve original algorithm case for response while using uppercase for processing 

242 algorithm_original = challenge.get("algorithm", "MD5") 

243 algorithm = algorithm_original.upper() 

244 opaque = challenge.get("opaque", "") 

245 

246 # Convert string values to bytes once 

247 nonce_bytes = nonce.encode("utf-8") 

248 realm_bytes = realm.encode("utf-8") 

249 # Use the encoded request-target (raw_path_qs) since that is what is 

250 # transmitted on the wire and what the server signs against. Using the 

251 # decoded form would cause digest verification to fail when the path 

252 # or query string contains percent-encoded reserved characters. 

253 path = URL(url).raw_path_qs 

254 

255 # Process QoP 

256 qop = "" 

257 qop_bytes = b"" 

258 if qop_raw: 

259 valid_qops = {"auth", "auth-int"}.intersection( 

260 {q.strip() for q in qop_raw.split(",") if q.strip()} 

261 ) 

262 if not valid_qops: 

263 raise ClientError( 

264 f"Digest auth error: Unsupported Quality of Protection (qop) value(s): {qop_raw}" 

265 ) 

266 

267 qop = "auth-int" if "auth-int" in valid_qops else "auth" 

268 qop_bytes = qop.encode("utf-8") 

269 

270 if algorithm not in DigestFunctions: 

271 raise ClientError( 

272 f"Digest auth error: Unsupported hash algorithm: {algorithm}. " 

273 f"Supported algorithms: {', '.join(SUPPORTED_ALGORITHMS)}" 

274 ) 

275 hash_fn: Final = DigestFunctions[algorithm] 

276 

277 def H(x: bytes) -> bytes: 

278 """RFC 7616 Section 3: Hash function H(data) = hex(hash(data)).""" 

279 return hash_fn(x).hexdigest().encode() 

280 

281 def KD(s: bytes, d: bytes) -> bytes: 

282 """RFC 7616 Section 3: KD(secret, data) = H(concat(secret, ":", data)).""" 

283 return H(b":".join((s, d))) 

284 

285 # Calculate A1 and A2 

286 A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes)) 

287 A2 = f"{method.upper()}:{path}".encode() 

288 if qop == "auth-int": 

289 if isinstance(body, Payload): # will always be empty bytes unless Payload 

290 entity_bytes = await body.as_bytes() # Get bytes from Payload 

291 else: 

292 entity_bytes = body 

293 entity_hash = H(entity_bytes) 

294 A2 = b":".join((A2, entity_hash)) 

295 

296 HA1 = H(A1) 

297 HA2 = H(A2) 

298 

299 # Nonce count handling 

300 if nonce_bytes == self._last_nonce_bytes: 

301 self._nonce_count += 1 

302 else: 

303 self._nonce_count = 1 

304 

305 self._last_nonce_bytes = nonce_bytes 

306 ncvalue = f"{self._nonce_count:08x}" 

307 ncvalue_bytes = ncvalue.encode("utf-8") 

308 

309 # Generate client nonce 

310 cnonce = hashlib.sha1( 

311 b"".join( 

312 [ 

313 str(self._nonce_count).encode("utf-8"), 

314 nonce_bytes, 

315 time.ctime().encode("utf-8"), 

316 os.urandom(8), 

317 ] 

318 ) 

319 ).hexdigest()[:16] 

320 cnonce_bytes = cnonce.encode("utf-8") 

321 

322 # Special handling for session-based algorithms 

323 if algorithm.upper().endswith("-SESS"): 

324 HA1 = H(b":".join((HA1, nonce_bytes, cnonce_bytes))) 

325 

326 # Calculate the response digest 

327 if qop: 

328 noncebit = b":".join( 

329 (nonce_bytes, ncvalue_bytes, cnonce_bytes, qop_bytes, HA2) 

330 ) 

331 response_digest = KD(HA1, noncebit) 

332 else: 

333 response_digest = KD(HA1, b":".join((nonce_bytes, HA2))) 

334 

335 # Define a dict mapping of header fields to their values 

336 # Group fields into always-present, optional, and qop-dependent 

337 header_fields = { 

338 # Always present fields 

339 "username": escape_quotes(self._login_str), 

340 "realm": escape_quotes(realm), 

341 "nonce": escape_quotes(nonce), 

342 "uri": path, 

343 "response": response_digest.decode(), 

344 "algorithm": algorithm_original, 

345 } 

346 

347 # Optional fields 

348 if opaque: 

349 header_fields["opaque"] = escape_quotes(opaque) 

350 

351 # QoP-dependent fields 

352 if qop: 

353 header_fields["qop"] = qop 

354 header_fields["nc"] = ncvalue 

355 header_fields["cnonce"] = cnonce 

356 

357 # Build header using templates for each field type 

358 pairs: list[str] = [] 

359 for field, value in header_fields.items(): 

360 if field in QUOTED_AUTH_FIELDS: 

361 pairs.append(f'{field}="{value}"') 

362 else: 

363 pairs.append(f"{field}={value}") 

364 

365 return f"Digest {', '.join(pairs)}" 

366 

367 def _in_protection_space(self, url: URL) -> bool: 

368 """ 

369 Check if the given URL is within the current protection space. 

370 

371 According to RFC 7616, a URI is in the protection space if any URI 

372 in the protection space is a prefix of it (after both have been made absolute). 

373 """ 

374 request_str = str(url) 

375 for space_str in self._protection_space: 

376 # Check if request starts with space URL 

377 if not request_str.startswith(space_str): 

378 continue 

379 # Exact match or space ends with / (proper directory prefix) 

380 if len(request_str) == len(space_str) or space_str[-1] == "/": 

381 return True 

382 # Check next char is / to ensure proper path boundary 

383 if request_str[len(space_str)] == "/": 

384 return True 

385 return False 

386 

387 def _authenticate(self, response: ClientResponse) -> bool: 

388 """ 

389 Takes the given response and tries digest-auth, if needed. 

390 

391 Returns true if the original request must be resent. 

392 """ 

393 if response.status != 401: 

394 return False 

395 

396 auth_header = response.headers.get("www-authenticate", "") 

397 if not auth_header: 

398 return False # No authentication header present 

399 

400 method, sep, headers = auth_header.partition(" ") 

401 if not sep: 

402 # No space found in www-authenticate header 

403 return False # Malformed auth header, missing scheme separator 

404 

405 if method.lower() != "digest": 

406 # Not a digest auth challenge (could be Basic, Bearer, etc.) 

407 return False 

408 

409 if not headers: 

410 # We have a digest scheme but no parameters 

411 return False # Malformed digest header, missing parameters 

412 

413 # We have a digest auth header with content 

414 if not (header_pairs := parse_header_pairs(headers)): 

415 # Failed to parse any key-value pairs 

416 return False # Malformed digest header, no valid parameters 

417 

418 # Extract challenge parameters 

419 self._challenge = {} 

420 for field in CHALLENGE_FIELDS: 

421 if (value := header_pairs.get(field)) is not None: 

422 self._challenge[field] = value 

423 

424 # Update protection space based on domain parameter or default to origin 

425 origin = response.url.origin() 

426 

427 if domain := self._challenge.get("domain"): 

428 # Parse space-separated list of URIs 

429 self._protection_space = [] 

430 for uri in domain.split(): 

431 # Remove quotes if present 

432 uri = uri.strip('"') 

433 if uri.startswith("/"): 

434 # Path-absolute, relative to origin 

435 self._protection_space.append(str(origin.join(URL(uri)))) 

436 else: 

437 # Absolute URI 

438 self._protection_space.append(str(URL(uri))) 

439 else: 

440 # No domain specified, protection space is entire origin 

441 self._protection_space = [str(origin)] 

442 

443 # Return True only if we found at least one challenge parameter 

444 return bool(self._challenge) 

445 

446 async def __call__( 

447 self, request: ClientRequest, handler: ClientHandlerType 

448 ) -> ClientResponse: 

449 """Run the digest auth middleware.""" 

450 response = None 

451 for retry_count in range(2): 

452 # Apply authorization header if: 

453 # 1. This is a retry after 401 (retry_count > 0), OR 

454 # 2. Preemptive auth is enabled AND we have a challenge AND the URL is in protection space 

455 if retry_count > 0 or ( 

456 self._preemptive 

457 and self._challenge 

458 and self._in_protection_space(request.url) 

459 ): 

460 request.headers[hdrs.AUTHORIZATION] = await self._encode( 

461 request.method, request.url, request.body 

462 ) 

463 

464 # Send the request 

465 response = await handler(request) 

466 

467 # Check if we need to authenticate 

468 if not self._authenticate(response): 

469 break 

470 

471 # At this point, response is guaranteed to be defined 

472 assert response is not None 

473 return response