Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_auth.py: 25%

152 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import hashlib 

2import os 

3import re 

4import time 

5import typing 

6from base64 import b64encode 

7from urllib.request import parse_http_list 

8 

9from ._exceptions import ProtocolError 

10from ._models import Request, Response 

11from ._utils import to_bytes, to_str, unquote 

12 

13if typing.TYPE_CHECKING: # pragma: no cover 

14 from hashlib import _Hash 

15 

16 

17class Auth: 

18 """ 

19 Base class for all authentication schemes. 

20 

21 To implement a custom authentication scheme, subclass `Auth` and override 

22 the `.auth_flow()` method. 

23 

24 If the authentication scheme does I/O such as disk access or network calls, or uses 

25 synchronization primitives such as locks, you should override `.sync_auth_flow()` 

26 and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized 

27 implementations that will be used by `Client` and `AsyncClient` respectively. 

28 """ 

29 

30 requires_request_body = False 

31 requires_response_body = False 

32 

33 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

34 """ 

35 Execute the authentication flow. 

36 

37 To dispatch a request, `yield` it: 

38 

39 ``` 

40 yield request 

41 ``` 

42 

43 The client will `.send()` the response back into the flow generator. You can 

44 access it like so: 

45 

46 ``` 

47 response = yield request 

48 ``` 

49 

50 A `return` (or reaching the end of the generator) will result in the 

51 client returning the last response obtained from the server. 

52 

53 You can dispatch as many requests as is necessary. 

54 """ 

55 yield request 

56 

57 def sync_auth_flow( 

58 self, request: Request 

59 ) -> typing.Generator[Request, Response, None]: 

60 """ 

61 Execute the authentication flow synchronously. 

62 

63 By default, this defers to `.auth_flow()`. You should override this method 

64 when the authentication scheme does I/O and/or uses concurrency primitives. 

65 """ 

66 if self.requires_request_body: 

67 request.read() 

68 

69 flow = self.auth_flow(request) 

70 request = next(flow) 

71 

72 while True: 

73 response = yield request 

74 if self.requires_response_body: 

75 response.read() 

76 

77 try: 

78 request = flow.send(response) 

79 except StopIteration: 

80 break 

81 

82 async def async_auth_flow( 

83 self, request: Request 

84 ) -> typing.AsyncGenerator[Request, Response]: 

85 """ 

86 Execute the authentication flow asynchronously. 

87 

88 By default, this defers to `.auth_flow()`. You should override this method 

89 when the authentication scheme does I/O and/or uses concurrency primitives. 

90 """ 

91 if self.requires_request_body: 

92 await request.aread() 

93 

94 flow = self.auth_flow(request) 

95 request = next(flow) 

96 

97 while True: 

98 response = yield request 

99 if self.requires_response_body: 

100 await response.aread() 

101 

102 try: 

103 request = flow.send(response) 

104 except StopIteration: 

105 break 

106 

107 

108class FunctionAuth(Auth): 

109 """ 

110 Allows the 'auth' argument to be passed as a simple callable function, 

111 that takes the request, and returns a new, modified request. 

112 """ 

113 

114 def __init__(self, func: typing.Callable[[Request], Request]) -> None: 

115 self._func = func 

116 

117 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

118 yield self._func(request) 

119 

120 

121class BasicAuth(Auth): 

122 """ 

123 Allows the 'auth' argument to be passed as a (username, password) pair, 

124 and uses HTTP Basic authentication. 

125 """ 

126 

127 def __init__( 

128 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] 

129 ): 

130 self._auth_header = self._build_auth_header(username, password) 

131 

132 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

133 request.headers["Authorization"] = self._auth_header 

134 yield request 

135 

136 def _build_auth_header( 

137 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] 

138 ) -> str: 

139 userpass = b":".join((to_bytes(username), to_bytes(password))) 

140 token = b64encode(userpass).decode() 

141 return f"Basic {token}" 

142 

143 

144class DigestAuth(Auth): 

145 _ALGORITHM_TO_HASH_FUNCTION: typing.Dict[str, typing.Callable[[bytes], "_Hash"]] = { 

146 "MD5": hashlib.md5, 

147 "MD5-SESS": hashlib.md5, 

148 "SHA": hashlib.sha1, 

149 "SHA-SESS": hashlib.sha1, 

150 "SHA-256": hashlib.sha256, 

151 "SHA-256-SESS": hashlib.sha256, 

152 "SHA-512": hashlib.sha512, 

153 "SHA-512-SESS": hashlib.sha512, 

154 } 

155 

156 def __init__( 

157 self, username: typing.Union[str, bytes], password: typing.Union[str, bytes] 

158 ) -> None: 

159 self._username = to_bytes(username) 

160 self._password = to_bytes(password) 

161 self._last_challenge: typing.Optional[_DigestAuthChallenge] = None 

162 self._nonce_count = 1 

163 

164 def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: 

165 if self._last_challenge: 

166 request.headers["Authorization"] = self._build_auth_header( 

167 request, self._last_challenge 

168 ) 

169 

170 response = yield request 

171 

172 if response.status_code != 401 or "www-authenticate" not in response.headers: 

173 # If the response is not a 401 then we don't 

174 # need to build an authenticated request. 

175 return 

176 

177 for auth_header in response.headers.get_list("www-authenticate"): 

178 if auth_header.lower().startswith("digest "): 

179 break 

180 else: 

181 # If the response does not include a 'WWW-Authenticate: Digest ...' 

182 # header, then we don't need to build an authenticated request. 

183 return 

184 

185 self._last_challenge = self._parse_challenge(request, response, auth_header) 

186 self._nonce_count = 1 

187 

188 request.headers["Authorization"] = self._build_auth_header( 

189 request, self._last_challenge 

190 ) 

191 yield request 

192 

193 def _parse_challenge( 

194 self, request: Request, response: Response, auth_header: str 

195 ) -> "_DigestAuthChallenge": 

196 """ 

197 Returns a challenge from a Digest WWW-Authenticate header. 

198 These take the form of: 

199 `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"` 

200 """ 

201 scheme, _, fields = auth_header.partition(" ") 

202 

203 # This method should only ever have been called with a Digest auth header. 

204 assert scheme.lower() == "digest" 

205 

206 header_dict: typing.Dict[str, str] = {} 

207 for field in parse_http_list(fields): 

208 key, value = field.strip().split("=", 1) 

209 header_dict[key] = unquote(value) 

210 

211 try: 

212 realm = header_dict["realm"].encode() 

213 nonce = header_dict["nonce"].encode() 

214 algorithm = header_dict.get("algorithm", "MD5") 

215 opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None 

216 qop = header_dict["qop"].encode() if "qop" in header_dict else None 

217 return _DigestAuthChallenge( 

218 realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop 

219 ) 

220 except KeyError as exc: 

221 message = "Malformed Digest WWW-Authenticate header" 

222 raise ProtocolError(message, request=request) from exc 

223 

224 def _build_auth_header( 

225 self, request: Request, challenge: "_DigestAuthChallenge" 

226 ) -> str: 

227 hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()] 

228 

229 def digest(data: bytes) -> bytes: 

230 return hash_func(data).hexdigest().encode() 

231 

232 A1 = b":".join((self._username, challenge.realm, self._password)) 

233 

234 path = request.url.raw_path 

235 A2 = b":".join((request.method.encode(), path)) 

236 # TODO: implement auth-int 

237 HA2 = digest(A2) 

238 

239 nc_value = b"%08x" % self._nonce_count 

240 cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce) 

241 self._nonce_count += 1 

242 

243 HA1 = digest(A1) 

244 if challenge.algorithm.lower().endswith("-sess"): 

245 HA1 = digest(b":".join((HA1, challenge.nonce, cnonce))) 

246 

247 qop = self._resolve_qop(challenge.qop, request=request) 

248 if qop is None: 

249 digest_data = [HA1, challenge.nonce, HA2] 

250 else: 

251 digest_data = [challenge.nonce, nc_value, cnonce, qop, HA2] 

252 key_digest = b":".join(digest_data) 

253 

254 format_args = { 

255 "username": self._username, 

256 "realm": challenge.realm, 

257 "nonce": challenge.nonce, 

258 "uri": path, 

259 "response": digest(b":".join((HA1, key_digest))), 

260 "algorithm": challenge.algorithm.encode(), 

261 } 

262 if challenge.opaque: 

263 format_args["opaque"] = challenge.opaque 

264 if qop: 

265 format_args["qop"] = b"auth" 

266 format_args["nc"] = nc_value 

267 format_args["cnonce"] = cnonce 

268 

269 return "Digest " + self._get_header_value(format_args) 

270 

271 def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes: 

272 s = str(nonce_count).encode() 

273 s += nonce 

274 s += time.ctime().encode() 

275 s += os.urandom(8) 

276 

277 return hashlib.sha1(s).hexdigest()[:16].encode() 

278 

279 def _get_header_value(self, header_fields: typing.Dict[str, bytes]) -> str: 

280 NON_QUOTED_FIELDS = ("algorithm", "qop", "nc") 

281 QUOTED_TEMPLATE = '{}="{}"' 

282 NON_QUOTED_TEMPLATE = "{}={}" 

283 

284 header_value = "" 

285 for i, (field, value) in enumerate(header_fields.items()): 

286 if i > 0: 

287 header_value += ", " 

288 template = ( 

289 QUOTED_TEMPLATE 

290 if field not in NON_QUOTED_FIELDS 

291 else NON_QUOTED_TEMPLATE 

292 ) 

293 header_value += template.format(field, to_str(value)) 

294 

295 return header_value 

296 

297 def _resolve_qop( 

298 self, qop: typing.Optional[bytes], request: Request 

299 ) -> typing.Optional[bytes]: 

300 if qop is None: 

301 return None 

302 qops = re.split(b", ?", qop) 

303 if b"auth" in qops: 

304 return b"auth" 

305 

306 if qops == [b"auth-int"]: 

307 raise NotImplementedError("Digest auth-int support is not yet implemented") 

308 

309 message = f'Unexpected qop value "{qop!r}" in digest auth' 

310 raise ProtocolError(message, request=request) 

311 

312 

313class _DigestAuthChallenge(typing.NamedTuple): 

314 realm: bytes 

315 nonce: bytes 

316 algorithm: str 

317 opaque: typing.Optional[bytes] 

318 qop: typing.Optional[bytes]