Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_auth.py: 26%

166 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import hashlib 

2import netrc 

3import os 

4import re 

5import time 

6import typing 

7from base64 import b64encode 

8from urllib.request import parse_http_list 

9 

10from ._exceptions import ProtocolError 

11from ._models import Request, Response 

12from ._utils import to_bytes, to_str, unquote 

13 

14if typing.TYPE_CHECKING: # pragma: no cover 

15 from hashlib import _Hash 

16 

17 

18class Auth: 

19 """ 

20 Base class for all authentication schemes. 

21 

22 To implement a custom authentication scheme, subclass `Auth` and override 

23 the `.auth_flow()` method. 

24 

25 If the authentication scheme does I/O such as disk access or network calls, or uses 

26 synchronization primitives such as locks, you should override `.sync_auth_flow()` 

27 and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized 

28 implementations that will be used by `Client` and `AsyncClient` respectively. 

29 """ 

30 

31 requires_request_body = False 

32 requires_response_body = False 

33 

34 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

35 """ 

36 Execute the authentication flow. 

37 

38 To dispatch a request, `yield` it: 

39 

40 ``` 

41 yield request 

42 ``` 

43 

44 The client will `.send()` the response back into the flow generator. You can 

45 access it like so: 

46 

47 ``` 

48 response = yield request 

49 ``` 

50 

51 A `return` (or reaching the end of the generator) will result in the 

52 client returning the last response obtained from the server. 

53 

54 You can dispatch as many requests as is necessary. 

55 """ 

56 yield request 

57 

58 def sync_auth_flow( 

59 self, request: Request 

60 ) -> typing.Generator[Request, Response, None]: 

61 """ 

62 Execute the authentication flow synchronously. 

63 

64 By default, this defers to `.auth_flow()`. You should override this method 

65 when the authentication scheme does I/O and/or uses concurrency primitives. 

66 """ 

67 if self.requires_request_body: 

68 request.read() 

69 

70 flow = self.auth_flow(request) 

71 request = next(flow) 

72 

73 while True: 

74 response = yield request 

75 if self.requires_response_body: 

76 response.read() 

77 

78 try: 

79 request = flow.send(response) 

80 except StopIteration: 

81 break 

82 

83 async def async_auth_flow( 

84 self, request: Request 

85 ) -> typing.AsyncGenerator[Request, Response]: 

86 """ 

87 Execute the authentication flow asynchronously. 

88 

89 By default, this defers to `.auth_flow()`. You should override this method 

90 when the authentication scheme does I/O and/or uses concurrency primitives. 

91 """ 

92 if self.requires_request_body: 

93 await request.aread() 

94 

95 flow = self.auth_flow(request) 

96 request = next(flow) 

97 

98 while True: 

99 response = yield request 

100 if self.requires_response_body: 

101 await response.aread() 

102 

103 try: 

104 request = flow.send(response) 

105 except StopIteration: 

106 break 

107 

108 

109class FunctionAuth(Auth): 

110 """ 

111 Allows the 'auth' argument to be passed as a simple callable function, 

112 that takes the request, and returns a new, modified request. 

113 """ 

114 

115 def __init__(self, func: typing.Callable[[Request], Request]) -> None: 

116 self._func = func 

117 

118 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

119 yield self._func(request) 

120 

121 

122class BasicAuth(Auth): 

123 """ 

124 Allows the 'auth' argument to be passed as a (username, password) pair, 

125 and uses HTTP Basic authentication. 

126 """ 

127 

128 def __init__( 

129 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] 

130 ): 

131 self._auth_header = self._build_auth_header(username, password) 

132 

133 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

134 request.headers["Authorization"] = self._auth_header 

135 yield request 

136 

137 def _build_auth_header( 

138 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] 

139 ) -> str: 

140 userpass = b":".join((to_bytes(username), to_bytes(password))) 

141 token = b64encode(userpass).decode() 

142 return f"Basic {token}" 

143 

144 

145class NetRCAuth(Auth): 

146 """ 

147 Use a 'netrc' file to lookup basic auth credentials based on the url host. 

148 """ 

149 

150 def __init__(self, file: typing.Optional[str] = None): 

151 self._netrc_info = netrc.netrc(file) 

152 

153 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

154 auth_info = self._netrc_info.authenticators(request.url.host) 

155 if auth_info is None or not auth_info[2]: 

156 # The netrc file did not have authentication credentials for this host. 

157 yield request 

158 else: 

159 # Build a basic auth header with credentials from the netrc file. 

160 request.headers["Authorization"] = self._build_auth_header( 

161 username=auth_info[0], password=auth_info[2] 

162 ) 

163 yield request 

164 

165 def _build_auth_header( 

166 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] 

167 ) -> str: 

168 userpass = b":".join((to_bytes(username), to_bytes(password))) 

169 token = b64encode(userpass).decode() 

170 return f"Basic {token}" 

171 

172 

173class DigestAuth(Auth): 

174 _ALGORITHM_TO_HASH_FUNCTION: typing.Dict[str, typing.Callable[[bytes], "_Hash"]] = { 

175 "MD5": hashlib.md5, 

176 "MD5-SESS": hashlib.md5, 

177 "SHA": hashlib.sha1, 

178 "SHA-SESS": hashlib.sha1, 

179 "SHA-256": hashlib.sha256, 

180 "SHA-256-SESS": hashlib.sha256, 

181 "SHA-512": hashlib.sha512, 

182 "SHA-512-SESS": hashlib.sha512, 

183 } 

184 

185 def __init__( 

186 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] 

187 ) -> None: 

188 self._username = to_bytes(username) 

189 self._password = to_bytes(password) 

190 self._last_challenge: typing.Optional[_DigestAuthChallenge] = None 

191 self._nonce_count = 1 

192 

193 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

194 if self._last_challenge: 

195 request.headers["Authorization"] = self._build_auth_header( 

196 request, self._last_challenge 

197 ) 

198 

199 response = yield request 

200 

201 if response.status_code != 401 or "www-authenticate" not in response.headers: 

202 # If the response is not a 401 then we don't 

203 # need to build an authenticated request. 

204 return 

205 

206 for auth_header in response.headers.get_list("www-authenticate"): 

207 if auth_header.lower().startswith("digest "): 

208 break 

209 else: 

210 # If the response does not include a 'WWW-Authenticate: Digest ...' 

211 # header, then we don't need to build an authenticated request. 

212 return 

213 

214 self._last_challenge = self._parse_challenge(request, response, auth_header) 

215 self._nonce_count = 1 

216 

217 request.headers["Authorization"] = self._build_auth_header( 

218 request, self._last_challenge 

219 ) 

220 yield request 

221 

222 def _parse_challenge( 

223 self, request: Request, response: Response, auth_header: str 

224 ) -> "_DigestAuthChallenge": 

225 """ 

226 Returns a challenge from a Digest WWW-Authenticate header. 

227 These take the form of: 

228 `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"` 

229 """ 

230 scheme, _, fields = auth_header.partition(" ") 

231 

232 # This method should only ever have been called with a Digest auth header. 

233 assert scheme.lower() == "digest" 

234 

235 header_dict: typing.Dict[str, str] = {} 

236 for field in parse_http_list(fields): 

237 key, value = field.strip().split("=", 1) 

238 header_dict[key] = unquote(value) 

239 

240 try: 

241 realm = header_dict["realm"].encode() 

242 nonce = header_dict["nonce"].encode() 

243 algorithm = header_dict.get("algorithm", "MD5") 

244 opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None 

245 qop = header_dict["qop"].encode() if "qop" in header_dict else None 

246 return _DigestAuthChallenge( 

247 realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop 

248 ) 

249 except KeyError as exc: 

250 message = "Malformed Digest WWW-Authenticate header" 

251 raise ProtocolError(message, request=request) from exc 

252 

253 def _build_auth_header( 

254 self, request: Request, challenge: "_DigestAuthChallenge" 

255 ) -> str: 

256 hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()] 

257 

258 def digest(data: bytes) -> bytes: 

259 return hash_func(data).hexdigest().encode() 

260 

261 A1 = b":".join((self._username, challenge.realm, self._password)) 

262 

263 path = request.url.raw_path 

264 A2 = b":".join((request.method.encode(), path)) 

265 # TODO: implement auth-int 

266 HA2 = digest(A2) 

267 

268 nc_value = b"%08x" % self._nonce_count 

269 cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce) 

270 self._nonce_count += 1 

271 

272 HA1 = digest(A1) 

273 if challenge.algorithm.lower().endswith("-sess"): 

274 HA1 = digest(b":".join((HA1, challenge.nonce, cnonce))) 

275 

276 qop = self._resolve_qop(challenge.qop, request=request) 

277 if qop is None: 

278 digest_data = [HA1, challenge.nonce, HA2] 

279 else: 

280 digest_data = [challenge.nonce, nc_value, cnonce, qop, HA2] 

281 key_digest = b":".join(digest_data) 

282 

283 format_args = { 

284 "username": self._username, 

285 "realm": challenge.realm, 

286 "nonce": challenge.nonce, 

287 "uri": path, 

288 "response": digest(b":".join((HA1, key_digest))), 

289 "algorithm": challenge.algorithm.encode(), 

290 } 

291 if challenge.opaque: 

292 format_args["opaque"] = challenge.opaque 

293 if qop: 

294 format_args["qop"] = b"auth" 

295 format_args["nc"] = nc_value 

296 format_args["cnonce"] = cnonce 

297 

298 return "Digest " + self._get_header_value(format_args) 

299 

300 def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes: 

301 s = str(nonce_count).encode() 

302 s += nonce 

303 s += time.ctime().encode() 

304 s += os.urandom(8) 

305 

306 return hashlib.sha1(s).hexdigest()[:16].encode() 

307 

308 def _get_header_value(self, header_fields: typing.Dict[str, bytes]) -> str: 

309 NON_QUOTED_FIELDS = ("algorithm", "qop", "nc") 

310 QUOTED_TEMPLATE = '{}="{}"' 

311 NON_QUOTED_TEMPLATE = "{}={}" 

312 

313 header_value = "" 

314 for i, (field, value) in enumerate(header_fields.items()): 

315 if i > 0: 

316 header_value += ", " 

317 template = ( 

318 QUOTED_TEMPLATE 

319 if field not in NON_QUOTED_FIELDS 

320 else NON_QUOTED_TEMPLATE 

321 ) 

322 header_value += template.format(field, to_str(value)) 

323 

324 return header_value 

325 

326 def _resolve_qop( 

327 self, qop: typing.Optional[bytes], request: Request 

328 ) -> typing.Optional[bytes]: 

329 if qop is None: 

330 return None 

331 qops = re.split(b", ?", qop) 

332 if b"auth" in qops: 

333 return b"auth" 

334 

335 if qops == [b"auth-int"]: 

336 raise NotImplementedError("Digest auth-int support is not yet implemented") 

337 

338 message = f'Unexpected qop value "{qop!r}" in digest auth' 

339 raise ProtocolError(message, request=request) 

340 

341 

342class _DigestAuthChallenge(typing.NamedTuple): 

343 realm: bytes 

344 nonce: bytes 

345 algorithm: str 

346 opaque: typing.Optional[bytes] 

347 qop: typing.Optional[bytes]