Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_middleware_digest_auth.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

139 statements  

1""" 

2Digest authentication middleware for aiohttp client. 

3 

4This middleware implements HTTP Digest Authentication according to RFC 7616, 

5providing a more secure alternative to Basic Authentication. It supports all 

6standard hash algorithms including MD5, SHA, SHA-256, SHA-512 and their session 

7variants, as well as both 'auth' and 'auth-int' quality of protection (qop) options. 

8""" 

9 

10import hashlib 

11import os 

12import re 

13import time 

14from typing import ( 

15 Callable, 

16 Dict, 

17 Final, 

18 FrozenSet, 

19 List, 

20 Literal, 

21 Tuple, 

22 TypedDict, 

23 Union, 

24) 

25 

26from yarl import URL 

27 

28from . import hdrs 

29from .client_exceptions import ClientError 

30from .client_middlewares import ClientHandlerType 

31from .client_reqrep import ClientRequest, ClientResponse 

32 

33 

34class DigestAuthChallenge(TypedDict, total=False): 

35 realm: str 

36 nonce: str 

37 qop: str 

38 algorithm: str 

39 opaque: str 

40 

41 

42DigestFunctions: Dict[str, Callable[[bytes], "hashlib._Hash"]] = { 

43 "MD5": hashlib.md5, 

44 "MD5-SESS": hashlib.md5, 

45 "SHA": hashlib.sha1, 

46 "SHA-SESS": hashlib.sha1, 

47 "SHA256": hashlib.sha256, 

48 "SHA256-SESS": hashlib.sha256, 

49 "SHA-256": hashlib.sha256, 

50 "SHA-256-SESS": hashlib.sha256, 

51 "SHA512": hashlib.sha512, 

52 "SHA512-SESS": hashlib.sha512, 

53 "SHA-512": hashlib.sha512, 

54 "SHA-512-SESS": hashlib.sha512, 

55} 

56 

57 

58# Compile the regex pattern once at module level for performance 

59_HEADER_PAIRS_PATTERN = re.compile( 

60 r'(\w+)\s*=\s*(?:"((?:[^"\\]|\\.)*)"|([^\s,]+))' 

61 # | | | | | | | | | || | 

62 # +----|--|-|-|--|----|------|----|--||-----|--> alphanumeric key 

63 # +--|-|-|--|----|------|----|--||-----|--> maybe whitespace 

64 # | | | | | | | || | 

65 # +-|-|--|----|------|----|--||-----|--> = (delimiter) 

66 # +-|--|----|------|----|--||-----|--> maybe whitespace 

67 # | | | | | || | 

68 # +--|----|------|----|--||-----|--> group quoted or unquoted 

69 # | | | | || | 

70 # +----|------|----|--||-----|--> if quoted... 

71 # +------|----|--||-----|--> anything but " or \ 

72 # +----|--||-----|--> escaped characters allowed 

73 # +--||-----|--> or can be empty string 

74 # || | 

75 # +|-----|--> if unquoted... 

76 # +-----|--> anything but , or <space> 

77 # +--> at least one char req'd 

78) 

79 

80 

81# RFC 7616: Challenge parameters to extract 

82CHALLENGE_FIELDS: Final[ 

83 Tuple[Literal["realm", "nonce", "qop", "algorithm", "opaque"], ...] 

84] = ( 

85 "realm", 

86 "nonce", 

87 "qop", 

88 "algorithm", 

89 "opaque", 

90) 

91 

92# Supported digest authentication algorithms 

93# Use a tuple of sorted keys for predictable documentation and error messages 

94SUPPORTED_ALGORITHMS: Final[Tuple[str, ...]] = tuple(sorted(DigestFunctions.keys())) 

95 

96# RFC 7616: Fields that require quoting in the Digest auth header 

97# These fields must be enclosed in double quotes in the Authorization header. 

98# Algorithm, qop, and nc are never quoted per RFC specifications. 

99# This frozen set is used by the template-based header construction to 

100# automatically determine which fields need quotes. 

101QUOTED_AUTH_FIELDS: Final[FrozenSet[str]] = frozenset( 

102 {"username", "realm", "nonce", "uri", "response", "opaque", "cnonce"} 

103) 

104 

105 

106def escape_quotes(value: str) -> str: 

107 """Escape double quotes for HTTP header values.""" 

108 return value.replace('"', '\\"') 

109 

110 

111def unescape_quotes(value: str) -> str: 

112 """Unescape double quotes in HTTP header values.""" 

113 return value.replace('\\"', '"') 

114 

115 

116def parse_header_pairs(header: str) -> Dict[str, str]: 

117 """ 

118 Parse key-value pairs from WWW-Authenticate or similar HTTP headers. 

119 

120 This function handles the complex format of WWW-Authenticate header values, 

121 supporting both quoted and unquoted values, proper handling of commas in 

122 quoted values, and whitespace variations per RFC 7616. 

123 

124 Examples of supported formats: 

125 - key1="value1", key2=value2 

126 - key1 = "value1" , key2="value, with, commas" 

127 - key1=value1,key2="value2" 

128 - realm="example.com", nonce="12345", qop="auth" 

129 

130 Args: 

131 header: The header value string to parse 

132 

133 Returns: 

134 Dictionary mapping parameter names to their values 

135 """ 

136 return { 

137 stripped_key: unescape_quotes(quoted_val) if quoted_val else unquoted_val 

138 for key, quoted_val, unquoted_val in _HEADER_PAIRS_PATTERN.findall(header) 

139 if (stripped_key := key.strip()) 

140 } 

141 

142 

143class DigestAuthMiddleware: 

144 """ 

145 HTTP digest authentication middleware for aiohttp client. 

146 

147 This middleware intercepts 401 Unauthorized responses containing a Digest 

148 authentication challenge, calculates the appropriate digest credentials, 

149 and automatically retries the request with the proper Authorization header. 

150 

151 Features: 

152 - Handles all aspects of Digest authentication handshake automatically 

153 - Supports all standard hash algorithms: 

154 - MD5, MD5-SESS 

155 - SHA, SHA-SESS 

156 - SHA256, SHA256-SESS, SHA-256, SHA-256-SESS 

157 - SHA512, SHA512-SESS, SHA-512, SHA-512-SESS 

158 - Supports 'auth' and 'auth-int' quality of protection modes 

159 - Properly handles quoted strings and parameter parsing 

160 - Includes replay attack protection with client nonce count tracking 

161 

162 Standards compliance: 

163 - RFC 7616: HTTP Digest Access Authentication (primary reference) 

164 - RFC 2617: HTTP Authentication (deprecated by RFC 7616) 

165 - RFC 1945: Section 11.1 (username restrictions) 

166 

167 Implementation notes: 

168 The core digest calculation is inspired by the implementation in 

169 https://github.com/requests/requests/blob/v2.18.4/requests/auth.py 

170 with added support for modern digest auth features and error handling. 

171 """ 

172 

173 def __init__( 

174 self, 

175 login: str, 

176 password: str, 

177 ) -> None: 

178 if login is None: 

179 raise ValueError("None is not allowed as login value") 

180 

181 if password is None: 

182 raise ValueError("None is not allowed as password value") 

183 

184 if ":" in login: 

185 raise ValueError('A ":" is not allowed in username (RFC 1945#section-11.1)') 

186 

187 self._login_str: Final[str] = login 

188 self._login_bytes: Final[bytes] = login.encode("utf-8") 

189 self._password_bytes: Final[bytes] = password.encode("utf-8") 

190 

191 self._last_nonce_bytes = b"" 

192 self._nonce_count = 0 

193 self._challenge: DigestAuthChallenge = {} 

194 

195 def _encode(self, method: str, url: URL, body: Union[bytes, str]) -> str: 

196 """ 

197 Build digest authorization header for the current challenge. 

198 

199 Args: 

200 method: The HTTP method (GET, POST, etc.) 

201 url: The request URL 

202 body: The request body (used for qop=auth-int) 

203 

204 Returns: 

205 A fully formatted Digest authorization header string 

206 

207 Raises: 

208 ClientError: If the challenge is missing required parameters or 

209 contains unsupported values 

210 """ 

211 challenge = self._challenge 

212 if "realm" not in challenge: 

213 raise ClientError( 

214 "Malformed Digest auth challenge: Missing 'realm' parameter" 

215 ) 

216 

217 if "nonce" not in challenge: 

218 raise ClientError( 

219 "Malformed Digest auth challenge: Missing 'nonce' parameter" 

220 ) 

221 

222 # Empty realm values are allowed per RFC 7616 (SHOULD, not MUST, contain host name) 

223 realm = challenge["realm"] 

224 nonce = challenge["nonce"] 

225 

226 # Empty nonce values are not allowed as they are security-critical for replay protection 

227 if not nonce: 

228 raise ClientError( 

229 "Security issue: Digest auth challenge contains empty 'nonce' value" 

230 ) 

231 

232 qop_raw = challenge.get("qop", "") 

233 algorithm = challenge.get("algorithm", "MD5").upper() 

234 opaque = challenge.get("opaque", "") 

235 

236 # Convert string values to bytes once 

237 nonce_bytes = nonce.encode("utf-8") 

238 realm_bytes = realm.encode("utf-8") 

239 path = URL(url).path_qs 

240 

241 # Process QoP 

242 qop = "" 

243 qop_bytes = b"" 

244 if qop_raw: 

245 valid_qops = {"auth", "auth-int"}.intersection( 

246 {q.strip() for q in qop_raw.split(",") if q.strip()} 

247 ) 

248 if not valid_qops: 

249 raise ClientError( 

250 f"Digest auth error: Unsupported Quality of Protection (qop) value(s): {qop_raw}" 

251 ) 

252 

253 qop = "auth-int" if "auth-int" in valid_qops else "auth" 

254 qop_bytes = qop.encode("utf-8") 

255 

256 if algorithm not in DigestFunctions: 

257 raise ClientError( 

258 f"Digest auth error: Unsupported hash algorithm: {algorithm}. " 

259 f"Supported algorithms: {', '.join(SUPPORTED_ALGORITHMS)}" 

260 ) 

261 hash_fn: Final = DigestFunctions[algorithm] 

262 

263 def H(x: bytes) -> bytes: 

264 """RFC 7616 Section 3: Hash function H(data) = hex(hash(data)).""" 

265 return hash_fn(x).hexdigest().encode() 

266 

267 def KD(s: bytes, d: bytes) -> bytes: 

268 """RFC 7616 Section 3: KD(secret, data) = H(concat(secret, ":", data)).""" 

269 return H(b":".join((s, d))) 

270 

271 # Calculate A1 and A2 

272 A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes)) 

273 A2 = f"{method.upper()}:{path}".encode() 

274 if qop == "auth-int": 

275 if isinstance(body, str): 

276 entity_str = body.encode("utf-8", errors="replace") 

277 else: 

278 entity_str = body 

279 entity_hash = H(entity_str) 

280 A2 = b":".join((A2, entity_hash)) 

281 

282 HA1 = H(A1) 

283 HA2 = H(A2) 

284 

285 # Nonce count handling 

286 if nonce_bytes == self._last_nonce_bytes: 

287 self._nonce_count += 1 

288 else: 

289 self._nonce_count = 1 

290 

291 self._last_nonce_bytes = nonce_bytes 

292 ncvalue = f"{self._nonce_count:08x}" 

293 ncvalue_bytes = ncvalue.encode("utf-8") 

294 

295 # Generate client nonce 

296 cnonce = hashlib.sha1( 

297 b"".join( 

298 [ 

299 str(self._nonce_count).encode("utf-8"), 

300 nonce_bytes, 

301 time.ctime().encode("utf-8"), 

302 os.urandom(8), 

303 ] 

304 ) 

305 ).hexdigest()[:16] 

306 cnonce_bytes = cnonce.encode("utf-8") 

307 

308 # Special handling for session-based algorithms 

309 if algorithm.upper().endswith("-SESS"): 

310 HA1 = H(b":".join((HA1, nonce_bytes, cnonce_bytes))) 

311 

312 # Calculate the response digest 

313 if qop: 

314 noncebit = b":".join( 

315 (nonce_bytes, ncvalue_bytes, cnonce_bytes, qop_bytes, HA2) 

316 ) 

317 response_digest = KD(HA1, noncebit) 

318 else: 

319 response_digest = KD(HA1, b":".join((nonce_bytes, HA2))) 

320 

321 # Define a dict mapping of header fields to their values 

322 # Group fields into always-present, optional, and qop-dependent 

323 header_fields = { 

324 # Always present fields 

325 "username": escape_quotes(self._login_str), 

326 "realm": escape_quotes(realm), 

327 "nonce": escape_quotes(nonce), 

328 "uri": path, 

329 "response": response_digest.decode(), 

330 "algorithm": algorithm, 

331 } 

332 

333 # Optional fields 

334 if opaque: 

335 header_fields["opaque"] = escape_quotes(opaque) 

336 

337 # QoP-dependent fields 

338 if qop: 

339 header_fields["qop"] = qop 

340 header_fields["nc"] = ncvalue 

341 header_fields["cnonce"] = cnonce 

342 

343 # Build header using templates for each field type 

344 pairs: List[str] = [] 

345 for field, value in header_fields.items(): 

346 if field in QUOTED_AUTH_FIELDS: 

347 pairs.append(f'{field}="{value}"') 

348 else: 

349 pairs.append(f"{field}={value}") 

350 

351 return f"Digest {', '.join(pairs)}" 

352 

353 def _authenticate(self, response: ClientResponse) -> bool: 

354 """ 

355 Takes the given response and tries digest-auth, if needed. 

356 

357 Returns true if the original request must be resent. 

358 """ 

359 if response.status != 401: 

360 return False 

361 

362 auth_header = response.headers.get("www-authenticate", "") 

363 if not auth_header: 

364 return False # No authentication header present 

365 

366 method, sep, headers = auth_header.partition(" ") 

367 if not sep: 

368 # No space found in www-authenticate header 

369 return False # Malformed auth header, missing scheme separator 

370 

371 if method.lower() != "digest": 

372 # Not a digest auth challenge (could be Basic, Bearer, etc.) 

373 return False 

374 

375 if not headers: 

376 # We have a digest scheme but no parameters 

377 return False # Malformed digest header, missing parameters 

378 

379 # We have a digest auth header with content 

380 if not (header_pairs := parse_header_pairs(headers)): 

381 # Failed to parse any key-value pairs 

382 return False # Malformed digest header, no valid parameters 

383 

384 # Extract challenge parameters 

385 self._challenge = {} 

386 for field in CHALLENGE_FIELDS: 

387 if value := header_pairs.get(field): 

388 self._challenge[field] = value 

389 

390 # Return True only if we found at least one challenge parameter 

391 return bool(self._challenge) 

392 

393 async def __call__( 

394 self, request: ClientRequest, handler: ClientHandlerType 

395 ) -> ClientResponse: 

396 """Run the digest auth middleware.""" 

397 response = None 

398 for retry_count in range(2): 

399 # Apply authorization header if we have a challenge (on second attempt) 

400 if retry_count > 0: 

401 request.headers[hdrs.AUTHORIZATION] = self._encode( 

402 request.method, request.url, request.body 

403 ) 

404 

405 # Send the request 

406 response = await handler(request) 

407 

408 # Check if we need to authenticate 

409 if not self._authenticate(response): 

410 break 

411 

412 # At this point, response is guaranteed to be defined 

413 assert response is not None 

414 return response