Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests_toolbelt/auth/http_proxy_digest.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

58 statements  

1# -*- coding: utf-8 -*- 

2"""The module containing HTTPProxyDigestAuth.""" 

3import re 

4 

5from requests import cookies, utils 

6 

7from . import _digest_auth_compat as auth 

8 

9 

10class HTTPProxyDigestAuth(auth.HTTPDigestAuth): 

11 """HTTP digest authentication between proxy 

12 

13 :param stale_rejects: The number of rejects indicate that: 

14 the client may wish to simply retry the request 

15 with a new encrypted response, without reprompting the user for a 

16 new username and password. i.e., retry build_digest_header 

17 :type stale_rejects: int 

18 """ 

19 _pat = re.compile(r'digest ', flags=re.IGNORECASE) 

20 

21 def __init__(self, *args, **kwargs): 

22 super(HTTPProxyDigestAuth, self).__init__(*args, **kwargs) 

23 self.stale_rejects = 0 

24 

25 self.init_per_thread_state() 

26 

27 @property 

28 def stale_rejects(self): 

29 thread_local = getattr(self, '_thread_local', None) 

30 if thread_local is None: 

31 return self._stale_rejects 

32 return thread_local.stale_rejects 

33 

34 @stale_rejects.setter 

35 def stale_rejects(self, value): 

36 thread_local = getattr(self, '_thread_local', None) 

37 if thread_local is None: 

38 self._stale_rejects = value 

39 else: 

40 thread_local.stale_rejects = value 

41 

42 def init_per_thread_state(self): 

43 try: 

44 super(HTTPProxyDigestAuth, self).init_per_thread_state() 

45 except AttributeError: 

46 # If we're not on requests 2.8.0+ this method does not exist 

47 pass 

48 

49 def handle_407(self, r, **kwargs): 

50 """Handle HTTP 407 only once, otherwise give up 

51 

52 :param r: current response 

53 :returns: responses, along with the new response 

54 """ 

55 if r.status_code == 407 and self.stale_rejects < 2: 

56 s_auth = r.headers.get("proxy-authenticate") 

57 if s_auth is None: 

58 raise IOError( 

59 "proxy server violated RFC 7235:" 

60 "407 response MUST contain header proxy-authenticate") 

61 elif not self._pat.match(s_auth): 

62 return r 

63 

64 self.chal = utils.parse_dict_header( 

65 self._pat.sub('', s_auth, count=1)) 

66 

67 # if we present the user/passwd and still get rejected 

68 # https://tools.ietf.org/html/rfc2617#section-3.2.1 

69 if ('Proxy-Authorization' in r.request.headers and 

70 'stale' in self.chal): 

71 if self.chal['stale'].lower() == 'true': # try again 

72 self.stale_rejects += 1 

73 # wrong user/passwd 

74 elif self.chal['stale'].lower() == 'false': 

75 raise IOError("User or password is invalid") 

76 

77 # Consume content and release the original connection 

78 # to allow our new request to reuse the same one. 

79 r.content 

80 r.close() 

81 prep = r.request.copy() 

82 cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw) 

83 prep.prepare_cookies(prep._cookies) 

84 

85 prep.headers['Proxy-Authorization'] = self.build_digest_header( 

86 prep.method, prep.url) 

87 _r = r.connection.send(prep, **kwargs) 

88 _r.history.append(r) 

89 _r.request = prep 

90 

91 return _r 

92 else: # give up authenticate 

93 return r 

94 

95 def __call__(self, r): 

96 self.init_per_thread_state() 

97 # if we have nonce, then just use it, otherwise server will tell us 

98 if self.last_nonce: 

99 r.headers['Proxy-Authorization'] = self.build_digest_header( 

100 r.method, r.url 

101 ) 

102 r.register_hook('response', self.handle_407) 

103 return r