Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_auth.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

169 statements  

1from __future__ import annotations 

2 

3import hashlib 

4import os 

5import re 

6import time 

7import typing 

8from base64 import b64encode 

9from urllib.request import parse_http_list 

10 

11from ._exceptions import ProtocolError 

12from ._models import Cookies, Request, Response 

13from ._utils import to_bytes, to_str, unquote 

14 

15if typing.TYPE_CHECKING: # pragma: no cover 

16 from hashlib import _Hash 

17 

18 

19__all__ = ["Auth", "BasicAuth", "DigestAuth", "NetRCAuth"] 

20 

21 

22class Auth: 

23 """ 

24 Base class for all authentication schemes. 

25 

26 To implement a custom authentication scheme, subclass `Auth` and override 

27 the `.auth_flow()` method. 

28 

29 If the authentication scheme does I/O such as disk access or network calls, or uses 

30 synchronization primitives such as locks, you should override `.sync_auth_flow()` 

31 and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized 

32 implementations that will be used by `Client` and `AsyncClient` respectively. 

33 """ 

34 

35 requires_request_body = False 

36 requires_response_body = False 

37 

38 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

39 """ 

40 Execute the authentication flow. 

41 

42 To dispatch a request, `yield` it: 

43 

44 ``` 

45 yield request 

46 ``` 

47 

48 The client will `.send()` the response back into the flow generator. You can 

49 access it like so: 

50 

51 ``` 

52 response = yield request 

53 ``` 

54 

55 A `return` (or reaching the end of the generator) will result in the 

56 client returning the last response obtained from the server. 

57 

58 You can dispatch as many requests as is necessary. 

59 """ 

60 yield request 

61 

62 def sync_auth_flow( 

63 self, request: Request 

64 ) -> typing.Generator[Request, Response, None]: 

65 """ 

66 Execute the authentication flow synchronously. 

67 

68 By default, this defers to `.auth_flow()`. You should override this method 

69 when the authentication scheme does I/O and/or uses concurrency primitives. 

70 """ 

71 if self.requires_request_body: 

72 request.read() 

73 

74 flow = self.auth_flow(request) 

75 request = next(flow) 

76 

77 while True: 

78 response = yield request 

79 if self.requires_response_body: 

80 response.read() 

81 

82 try: 

83 request = flow.send(response) 

84 except StopIteration: 

85 break 

86 

87 async def async_auth_flow( 

88 self, request: Request 

89 ) -> typing.AsyncGenerator[Request, Response]: 

90 """ 

91 Execute the authentication flow asynchronously. 

92 

93 By default, this defers to `.auth_flow()`. You should override this method 

94 when the authentication scheme does I/O and/or uses concurrency primitives. 

95 """ 

96 if self.requires_request_body: 

97 await request.aread() 

98 

99 flow = self.auth_flow(request) 

100 request = next(flow) 

101 

102 while True: 

103 response = yield request 

104 if self.requires_response_body: 

105 await response.aread() 

106 

107 try: 

108 request = flow.send(response) 

109 except StopIteration: 

110 break 

111 

112 

113class FunctionAuth(Auth): 

114 """ 

115 Allows the 'auth' argument to be passed as a simple callable function, 

116 that takes the request, and returns a new, modified request. 

117 """ 

118 

119 def __init__(self, func: typing.Callable[[Request], Request]) -> None: 

120 self._func = func 

121 

122 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

123 yield self._func(request) 

124 

125 

126class BasicAuth(Auth): 

127 """ 

128 Allows the 'auth' argument to be passed as a (username, password) pair, 

129 and uses HTTP Basic authentication. 

130 """ 

131 

132 def __init__(self, username: str | bytes, password: str | bytes) -> None: 

133 self._auth_header = self._build_auth_header(username, password) 

134 

135 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

136 request.headers["Authorization"] = self._auth_header 

137 yield request 

138 

139 def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str: 

140 userpass = b":".join((to_bytes(username), to_bytes(password))) 

141 token = b64encode(userpass).decode() 

142 return f"Basic {token}" 

143 

144 

145class NetRCAuth(Auth): 

146 """ 

147 Use a 'netrc' file to lookup basic auth credentials based on the url host. 

148 """ 

149 

150 def __init__(self, file: str | None = None) -> None: 

151 # Lazily import 'netrc'. 

152 # There's no need for us to load this module unless 'NetRCAuth' is being used. 

153 import netrc 

154 

155 self._netrc_info = netrc.netrc(file) 

156 

157 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

158 auth_info = self._netrc_info.authenticators(request.url.host) 

159 if auth_info is None or not auth_info[2]: 

160 # The netrc file did not have authentication credentials for this host. 

161 yield request 

162 else: 

163 # Build a basic auth header with credentials from the netrc file. 

164 request.headers["Authorization"] = self._build_auth_header( 

165 username=auth_info[0], password=auth_info[2] 

166 ) 

167 yield request 

168 

169 def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str: 

170 userpass = b":".join((to_bytes(username), to_bytes(password))) 

171 token = b64encode(userpass).decode() 

172 return f"Basic {token}" 

173 

174 

175class DigestAuth(Auth): 

176 _ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = { 

177 "MD5": hashlib.md5, 

178 "MD5-SESS": hashlib.md5, 

179 "SHA": hashlib.sha1, 

180 "SHA-SESS": hashlib.sha1, 

181 "SHA-256": hashlib.sha256, 

182 "SHA-256-SESS": hashlib.sha256, 

183 "SHA-512": hashlib.sha512, 

184 "SHA-512-SESS": hashlib.sha512, 

185 } 

186 

187 def __init__(self, username: str | bytes, password: str | bytes) -> None: 

188 self._username = to_bytes(username) 

189 self._password = to_bytes(password) 

190 self._last_challenge: _DigestAuthChallenge | None = None 

191 self._nonce_count = 1 

192 

193 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

194 if self._last_challenge: 

195 request.headers["Authorization"] = self._build_auth_header( 

196 request, self._last_challenge 

197 ) 

198 

199 response = yield request 

200 

201 if response.status_code != 401 or "www-authenticate" not in response.headers: 

202 # If the response is not a 401 then we don't 

203 # need to build an authenticated request. 

204 return 

205 

206 for auth_header in response.headers.get_list("www-authenticate"): 

207 if auth_header.lower().startswith("digest "): 

208 break 

209 else: 

210 # If the response does not include a 'WWW-Authenticate: Digest ...' 

211 # header, then we don't need to build an authenticated request. 

212 return 

213 

214 self._last_challenge = self._parse_challenge(request, response, auth_header) 

215 self._nonce_count = 1 

216 

217 request.headers["Authorization"] = self._build_auth_header( 

218 request, self._last_challenge 

219 ) 

220 if response.cookies: 

221 Cookies(response.cookies).set_cookie_header(request=request) 

222 yield request 

223 

224 def _parse_challenge( 

225 self, request: Request, response: Response, auth_header: str 

226 ) -> _DigestAuthChallenge: 

227 """ 

228 Returns a challenge from a Digest WWW-Authenticate header. 

229 These take the form of: 

230 `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"` 

231 """ 

232 scheme, _, fields = auth_header.partition(" ") 

233 

234 # This method should only ever have been called with a Digest auth header. 

235 assert scheme.lower() == "digest" 

236 

237 header_dict: dict[str, str] = {} 

238 for field in parse_http_list(fields): 

239 key, value = field.strip().split("=", 1) 

240 header_dict[key] = unquote(value) 

241 

242 try: 

243 realm = header_dict["realm"].encode() 

244 nonce = header_dict["nonce"].encode() 

245 algorithm = header_dict.get("algorithm", "MD5") 

246 opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None 

247 qop = header_dict["qop"].encode() if "qop" in header_dict else None 

248 return _DigestAuthChallenge( 

249 realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop 

250 ) 

251 except KeyError as exc: 

252 message = "Malformed Digest WWW-Authenticate header" 

253 raise ProtocolError(message, request=request) from exc 

254 

255 def _build_auth_header( 

256 self, request: Request, challenge: _DigestAuthChallenge 

257 ) -> str: 

258 hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()] 

259 

260 def digest(data: bytes) -> bytes: 

261 return hash_func(data).hexdigest().encode() 

262 

263 A1 = b":".join((self._username, challenge.realm, self._password)) 

264 

265 path = request.url.raw_path 

266 A2 = b":".join((request.method.encode(), path)) 

267 # TODO: implement auth-int 

268 HA2 = digest(A2) 

269 

270 nc_value = b"%08x" % self._nonce_count 

271 cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce) 

272 self._nonce_count += 1 

273 

274 HA1 = digest(A1) 

275 if challenge.algorithm.lower().endswith("-sess"): 

276 HA1 = digest(b":".join((HA1, challenge.nonce, cnonce))) 

277 

278 qop = self._resolve_qop(challenge.qop, request=request) 

279 if qop is None: 

280 # Following RFC 2069 

281 digest_data = [HA1, challenge.nonce, HA2] 

282 else: 

283 # Following RFC 2617/7616 

284 digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2] 

285 

286 format_args = { 

287 "username": self._username, 

288 "realm": challenge.realm, 

289 "nonce": challenge.nonce, 

290 "uri": path, 

291 "response": digest(b":".join(digest_data)), 

292 "algorithm": challenge.algorithm.encode(), 

293 } 

294 if challenge.opaque: 

295 format_args["opaque"] = challenge.opaque 

296 if qop: 

297 format_args["qop"] = b"auth" 

298 format_args["nc"] = nc_value 

299 format_args["cnonce"] = cnonce 

300 

301 return "Digest " + self._get_header_value(format_args) 

302 

303 def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes: 

304 s = str(nonce_count).encode() 

305 s += nonce 

306 s += time.ctime().encode() 

307 s += os.urandom(8) 

308 

309 return hashlib.sha1(s).hexdigest()[:16].encode() 

310 

311 def _get_header_value(self, header_fields: dict[str, bytes]) -> str: 

312 NON_QUOTED_FIELDS = ("algorithm", "qop", "nc") 

313 QUOTED_TEMPLATE = '{}="{}"' 

314 NON_QUOTED_TEMPLATE = "{}={}" 

315 

316 header_value = "" 

317 for i, (field, value) in enumerate(header_fields.items()): 

318 if i > 0: 

319 header_value += ", " 

320 template = ( 

321 QUOTED_TEMPLATE 

322 if field not in NON_QUOTED_FIELDS 

323 else NON_QUOTED_TEMPLATE 

324 ) 

325 header_value += template.format(field, to_str(value)) 

326 

327 return header_value 

328 

329 def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None: 

330 if qop is None: 

331 return None 

332 qops = re.split(b", ?", qop) 

333 if b"auth" in qops: 

334 return b"auth" 

335 

336 if qops == [b"auth-int"]: 

337 raise NotImplementedError("Digest auth-int support is not yet implemented") 

338 

339 message = f'Unexpected qop value "{qop!r}" in digest auth' 

340 raise ProtocolError(message, request=request) 

341 

342 

343class _DigestAuthChallenge(typing.NamedTuple): 

344 realm: bytes 

345 nonce: bytes 

346 algorithm: str 

347 opaque: bytes | None 

348 qop: bytes | None