Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_toolbelt/auth/guess.py: 17%

94 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:53 +0000

1# -*- coding: utf-8 -*- 

2"""The module containing the code for GuessAuth.""" 

3from requests import auth 

4from requests import cookies 

5 

6from . import _digest_auth_compat as auth_compat, http_proxy_digest 

7 

8 

9class GuessAuth(auth.AuthBase): 

10 """Guesses the auth type by the WWW-Authentication header.""" 

11 def __init__(self, username, password): 

12 self.username = username 

13 self.password = password 

14 self.auth = None 

15 self.pos = None 

16 

17 def _handle_basic_auth_401(self, r, kwargs): 

18 if self.pos is not None: 

19 r.request.body.seek(self.pos) 

20 

21 # Consume content and release the original connection 

22 # to allow our new request to reuse the same one. 

23 r.content 

24 r.raw.release_conn() 

25 prep = r.request.copy() 

26 if not hasattr(prep, '_cookies'): 

27 prep._cookies = cookies.RequestsCookieJar() 

28 cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw) 

29 prep.prepare_cookies(prep._cookies) 

30 

31 self.auth = auth.HTTPBasicAuth(self.username, self.password) 

32 prep = self.auth(prep) 

33 _r = r.connection.send(prep, **kwargs) 

34 _r.history.append(r) 

35 _r.request = prep 

36 

37 return _r 

38 

39 def _handle_digest_auth_401(self, r, kwargs): 

40 self.auth = auth_compat.HTTPDigestAuth(self.username, self.password) 

41 try: 

42 self.auth.init_per_thread_state() 

43 except AttributeError: 

44 # If we're not on requests 2.8.0+ this method does not exist and 

45 # is not relevant. 

46 pass 

47 

48 # Check that the attr exists because much older versions of requests 

49 # set this attribute lazily. For example: 

50 # https://github.com/kennethreitz/requests/blob/33735480f77891754304e7f13e3cdf83aaaa76aa/requests/auth.py#L59 

51 if (hasattr(self.auth, 'num_401_calls') and 

52 self.auth.num_401_calls is None): 

53 self.auth.num_401_calls = 1 

54 # Digest auth would resend the request by itself. We can take a 

55 # shortcut here. 

56 return self.auth.handle_401(r, **kwargs) 

57 

58 def handle_401(self, r, **kwargs): 

59 """Resends a request with auth headers, if needed.""" 

60 

61 www_authenticate = r.headers.get('www-authenticate', '').lower() 

62 

63 if 'basic' in www_authenticate: 

64 return self._handle_basic_auth_401(r, kwargs) 

65 

66 if 'digest' in www_authenticate: 

67 return self._handle_digest_auth_401(r, kwargs) 

68 

69 def __call__(self, request): 

70 if self.auth is not None: 

71 return self.auth(request) 

72 

73 try: 

74 self.pos = request.body.tell() 

75 except AttributeError: 

76 pass 

77 

78 request.register_hook('response', self.handle_401) 

79 return request 

80 

81 

82class GuessProxyAuth(GuessAuth): 

83 """ 

84 Guesses the auth type by WWW-Authentication and Proxy-Authentication 

85 headers 

86 """ 

87 def __init__(self, username=None, password=None, 

88 proxy_username=None, proxy_password=None): 

89 super(GuessProxyAuth, self).__init__(username, password) 

90 self.proxy_username = proxy_username 

91 self.proxy_password = proxy_password 

92 self.proxy_auth = None 

93 

94 def _handle_basic_auth_407(self, r, kwargs): 

95 if self.pos is not None: 

96 r.request.body.seek(self.pos) 

97 

98 r.content 

99 r.raw.release_conn() 

100 prep = r.request.copy() 

101 if not hasattr(prep, '_cookies'): 

102 prep._cookies = cookies.RequestsCookieJar() 

103 cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw) 

104 prep.prepare_cookies(prep._cookies) 

105 

106 self.proxy_auth = auth.HTTPProxyAuth(self.proxy_username, 

107 self.proxy_password) 

108 prep = self.proxy_auth(prep) 

109 _r = r.connection.send(prep, **kwargs) 

110 _r.history.append(r) 

111 _r.request = prep 

112 

113 return _r 

114 

115 def _handle_digest_auth_407(self, r, kwargs): 

116 self.proxy_auth = http_proxy_digest.HTTPProxyDigestAuth( 

117 username=self.proxy_username, 

118 password=self.proxy_password) 

119 

120 try: 

121 self.auth.init_per_thread_state() 

122 except AttributeError: 

123 pass 

124 

125 return self.proxy_auth.handle_407(r, **kwargs) 

126 

127 def handle_407(self, r, **kwargs): 

128 proxy_authenticate = r.headers.get('Proxy-Authenticate', '').lower() 

129 

130 if 'basic' in proxy_authenticate: 

131 return self._handle_basic_auth_407(r, kwargs) 

132 

133 if 'digest' in proxy_authenticate: 

134 return self._handle_digest_auth_407(r, kwargs) 

135 

136 def __call__(self, request): 

137 if self.proxy_auth is not None: 

138 request = self.proxy_auth(request) 

139 

140 try: 

141 self.pos = request.body.tell() 

142 except AttributeError: 

143 pass 

144 

145 request.register_hook('response', self.handle_407) 

146 return super(GuessProxyAuth, self).__call__(request)