Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_auth.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1from __future__ import annotations 

2 

3import hashlib 

4import os 

5import re 

6import time 

7import typing 

8from base64 import b64encode 

9from urllib.request import parse_http_list 

10 

11from ._exceptions import ProtocolError 

12from ._models import Cookies, Request, Response 

13from ._utils import to_bytes, to_str, unquote 

14 

15if typing.TYPE_CHECKING: # pragma: no cover 

16 from hashlib import _Hash 

17 

18 

19class Auth: 

20 """ 

21 Base class for all authentication schemes. 

22 

23 To implement a custom authentication scheme, subclass `Auth` and override 

24 the `.auth_flow()` method. 

25 

26 If the authentication scheme does I/O such as disk access or network calls, or uses 

27 synchronization primitives such as locks, you should override `.sync_auth_flow()` 

28 and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized 

29 implementations that will be used by `Client` and `AsyncClient` respectively. 

30 """ 

31 

32 requires_request_body = False 

33 requires_response_body = False 

34 

35 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

36 """ 

37 Execute the authentication flow. 

38 

39 To dispatch a request, `yield` it: 

40 

41 ``` 

42 yield request 

43 ``` 

44 

45 The client will `.send()` the response back into the flow generator. You can 

46 access it like so: 

47 

48 ``` 

49 response = yield request 

50 ``` 

51 

52 A `return` (or reaching the end of the generator) will result in the 

53 client returning the last response obtained from the server. 

54 

55 You can dispatch as many requests as is necessary. 

56 """ 

57 yield request 

58 

59 def sync_auth_flow( 

60 self, request: Request 

61 ) -> typing.Generator[Request, Response, None]: 

62 """ 

63 Execute the authentication flow synchronously. 

64 

65 By default, this defers to `.auth_flow()`. You should override this method 

66 when the authentication scheme does I/O and/or uses concurrency primitives. 

67 """ 

68 if self.requires_request_body: 

69 request.read() 

70 

71 flow = self.auth_flow(request) 

72 request = next(flow) 

73 

74 while True: 

75 response = yield request 

76 if self.requires_response_body: 

77 response.read() 

78 

79 try: 

80 request = flow.send(response) 

81 except StopIteration: 

82 break 

83 

84 async def async_auth_flow( 

85 self, request: Request 

86 ) -> typing.AsyncGenerator[Request, Response]: 

87 """ 

88 Execute the authentication flow asynchronously. 

89 

90 By default, this defers to `.auth_flow()`. You should override this method 

91 when the authentication scheme does I/O and/or uses concurrency primitives. 

92 """ 

93 if self.requires_request_body: 

94 await request.aread() 

95 

96 flow = self.auth_flow(request) 

97 request = next(flow) 

98 

99 while True: 

100 response = yield request 

101 if self.requires_response_body: 

102 await response.aread() 

103 

104 try: 

105 request = flow.send(response) 

106 except StopIteration: 

107 break 

108 

109 

110class FunctionAuth(Auth): 

111 """ 

112 Allows the 'auth' argument to be passed as a simple callable function, 

113 that takes the request, and returns a new, modified request. 

114 """ 

115 

116 def __init__(self, func: typing.Callable[[Request], Request]) -> None: 

117 self._func = func 

118 

119 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

120 yield self._func(request) 

121 

122 

123class BasicAuth(Auth): 

124 """ 

125 Allows the 'auth' argument to be passed as a (username, password) pair, 

126 and uses HTTP Basic authentication. 

127 """ 

128 

129 def __init__(self, username: str | bytes, password: str | bytes) -> None: 

130 self._auth_header = self._build_auth_header(username, password) 

131 

132 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

133 request.headers["Authorization"] = self._auth_header 

134 yield request 

135 

136 def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str: 

137 userpass = b":".join((to_bytes(username), to_bytes(password))) 

138 token = b64encode(userpass).decode() 

139 return f"Basic {token}" 

140 

141 

142class NetRCAuth(Auth): 

143 """ 

144 Use a 'netrc' file to lookup basic auth credentials based on the url host. 

145 """ 

146 

147 def __init__(self, file: str | None = None) -> None: 

148 # Lazily import 'netrc'. 

149 # There's no need for us to load this module unless 'NetRCAuth' is being used. 

150 import netrc 

151 

152 self._netrc_info = netrc.netrc(file) 

153 

154 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

155 auth_info = self._netrc_info.authenticators(request.url.host) 

156 if auth_info is None or not auth_info[2]: 

157 # The netrc file did not have authentication credentials for this host. 

158 yield request 

159 else: 

160 # Build a basic auth header with credentials from the netrc file. 

161 request.headers["Authorization"] = self._build_auth_header( 

162 username=auth_info[0], password=auth_info[2] 

163 ) 

164 yield request 

165 

166 def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str: 

167 userpass = b":".join((to_bytes(username), to_bytes(password))) 

168 token = b64encode(userpass).decode() 

169 return f"Basic {token}" 

170 

171 

172class DigestAuth(Auth): 

173 _ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = { 

174 "MD5": hashlib.md5, 

175 "MD5-SESS": hashlib.md5, 

176 "SHA": hashlib.sha1, 

177 "SHA-SESS": hashlib.sha1, 

178 "SHA-256": hashlib.sha256, 

179 "SHA-256-SESS": hashlib.sha256, 

180 "SHA-512": hashlib.sha512, 

181 "SHA-512-SESS": hashlib.sha512, 

182 } 

183 

184 def __init__(self, username: str | bytes, password: str | bytes) -> None: 

185 self._username = to_bytes(username) 

186 self._password = to_bytes(password) 

187 self._last_challenge: _DigestAuthChallenge | None = None 

188 self._nonce_count = 1 

189 

190 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

191 if self._last_challenge: 

192 request.headers["Authorization"] = self._build_auth_header( 

193 request, self._last_challenge 

194 ) 

195 

196 response = yield request 

197 

198 if response.status_code != 401 or "www-authenticate" not in response.headers: 

199 # If the response is not a 401 then we don't 

200 # need to build an authenticated request. 

201 return 

202 

203 for auth_header in response.headers.get_list("www-authenticate"): 

204 if auth_header.lower().startswith("digest "): 

205 break 

206 else: 

207 # If the response does not include a 'WWW-Authenticate: Digest ...' 

208 # header, then we don't need to build an authenticated request. 

209 return 

210 

211 self._last_challenge = self._parse_challenge(request, response, auth_header) 

212 self._nonce_count = 1 

213 

214 request.headers["Authorization"] = self._build_auth_header( 

215 request, self._last_challenge 

216 ) 

217 if response.cookies: 

218 Cookies(response.cookies).set_cookie_header(request=request) 

219 yield request 

220 

221 def _parse_challenge( 

222 self, request: Request, response: Response, auth_header: str 

223 ) -> _DigestAuthChallenge: 

224 """ 

225 Returns a challenge from a Digest WWW-Authenticate header. 

226 These take the form of: 

227 `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"` 

228 """ 

229 scheme, _, fields = auth_header.partition(" ") 

230 

231 # This method should only ever have been called with a Digest auth header. 

232 assert scheme.lower() == "digest" 

233 

234 header_dict: dict[str, str] = {} 

235 for field in parse_http_list(fields): 

236 key, value = field.strip().split("=", 1) 

237 header_dict[key] = unquote(value) 

238 

239 try: 

240 realm = header_dict["realm"].encode() 

241 nonce = header_dict["nonce"].encode() 

242 algorithm = header_dict.get("algorithm", "MD5") 

243 opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None 

244 qop = header_dict["qop"].encode() if "qop" in header_dict else None 

245 return _DigestAuthChallenge( 

246 realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop 

247 ) 

248 except KeyError as exc: 

249 message = "Malformed Digest WWW-Authenticate header" 

250 raise ProtocolError(message, request=request) from exc 

251 

252 def _build_auth_header( 

253 self, request: Request, challenge: _DigestAuthChallenge 

254 ) -> str: 

255 hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()] 

256 

257 def digest(data: bytes) -> bytes: 

258 return hash_func(data).hexdigest().encode() 

259 

260 A1 = b":".join((self._username, challenge.realm, self._password)) 

261 

262 path = request.url.raw_path 

263 A2 = b":".join((request.method.encode(), path)) 

264 # TODO: implement auth-int 

265 HA2 = digest(A2) 

266 

267 nc_value = b"%08x" % self._nonce_count 

268 cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce) 

269 self._nonce_count += 1 

270 

271 HA1 = digest(A1) 

272 if challenge.algorithm.lower().endswith("-sess"): 

273 HA1 = digest(b":".join((HA1, challenge.nonce, cnonce))) 

274 

275 qop = self._resolve_qop(challenge.qop, request=request) 

276 if qop is None: 

277 # Following RFC 2069 

278 digest_data = [HA1, challenge.nonce, HA2] 

279 else: 

280 # Following RFC 2617/7616 

281 digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2] 

282 

283 format_args = { 

284 "username": self._username, 

285 "realm": challenge.realm, 

286 "nonce": challenge.nonce, 

287 "uri": path, 

288 "response": digest(b":".join(digest_data)), 

289 "algorithm": challenge.algorithm.encode(), 

290 } 

291 if challenge.opaque: 

292 format_args["opaque"] = challenge.opaque 

293 if qop: 

294 format_args["qop"] = b"auth" 

295 format_args["nc"] = nc_value 

296 format_args["cnonce"] = cnonce 

297 

298 return "Digest " + self._get_header_value(format_args) 

299 

300 def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes: 

301 s = str(nonce_count).encode() 

302 s += nonce 

303 s += time.ctime().encode() 

304 s += os.urandom(8) 

305 

306 return hashlib.sha1(s).hexdigest()[:16].encode() 

307 

308 def _get_header_value(self, header_fields: dict[str, bytes]) -> str: 

309 NON_QUOTED_FIELDS = ("algorithm", "qop", "nc") 

310 QUOTED_TEMPLATE = '{}="{}"' 

311 NON_QUOTED_TEMPLATE = "{}={}" 

312 

313 header_value = "" 

314 for i, (field, value) in enumerate(header_fields.items()): 

315 if i > 0: 

316 header_value += ", " 

317 template = ( 

318 QUOTED_TEMPLATE 

319 if field not in NON_QUOTED_FIELDS 

320 else NON_QUOTED_TEMPLATE 

321 ) 

322 header_value += template.format(field, to_str(value)) 

323 

324 return header_value 

325 

326 def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None: 

327 if qop is None: 

328 return None 

329 qops = re.split(b", ?", qop) 

330 if b"auth" in qops: 

331 return b"auth" 

332 

333 if qops == [b"auth-int"]: 

334 raise NotImplementedError("Digest auth-int support is not yet implemented") 

335 

336 message = f'Unexpected qop value "{qop!r}" in digest auth' 

337 raise ProtocolError(message, request=request) 

338 

339 

340class _DigestAuthChallenge(typing.NamedTuple): 

341 realm: bytes 

342 nonce: bytes 

343 algorithm: str 

344 opaque: bytes | None 

345 qop: bytes | None