Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests_ntlm2/requests_ntlm2.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

67 statements  

1import io 

2 

3from requests.auth import AuthBase 

4 

5from .core import NtlmCompatibility, get_auth_type_from_header, get_cbt_data, get_ntlm_credentials 

6from .dance import HttpNtlmContext 

7 

8 

9class HttpNtlmAuth(AuthBase): 

10 """ 

11 HTTP NTLM Authentication Handler for Requests. 

12 """ 

13 

14 def __init__( 

15 self, username, 

16 password, 

17 send_cbt=True, 

18 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT, 

19 ntlm_strict_mode=False 

20 ): 

21 """Create an authentication handler for NTLM over HTTP. 

22 

23 :param str username: Username in 'domain\\username' format 

24 :param str password: Password 

25 :param bool send_cbt: Will send the channel bindings over a 

26 HTTPS channel (Default: True) 

27 :param ntlm_compatibility: The Lan Manager Compatibility Level to use with the auth message 

28 :param ntlm_strict_mode: If False, tries to Type 2 (ie challenge response) NTLM message 

29 that does not conform to the NTLM spec 

30 """ 

31 

32 self.username, self.password, self.domain = get_ntlm_credentials(username, password) 

33 

34 if self.domain: 

35 self.domain = self.domain.upper() 

36 self.password = password 

37 self.send_cbt = send_cbt 

38 self.ntlm_compatibility = ntlm_compatibility 

39 self.ntlm_strict_mode = ntlm_strict_mode 

40 

41 # This exposes the encrypt/decrypt methods used to encrypt and decrypt 

42 # messages sent after ntlm authentication. These methods are utilised 

43 # by libraries that call requests_ntlm to encrypt and decrypt the 

44 # messages sent after authentication 

45 self.session_security = None 

46 

47 def retry_using_http_ntlm_auth( 

48 self, auth_header_field, auth_header, response, auth_type, kwargs 

49 ): 

50 # Get the certificate of the server if using HTTPS for CBT 

51 cbt_data = None 

52 if self.send_cbt: 

53 cbt_data = get_cbt_data(response) 

54 

55 # Attempt to authenticate using HTTP NTLM challenge/response 

56 if auth_header in response.request.headers: 

57 return response 

58 

59 content_length = int( 

60 response.request.headers.get("Content-Length", "0"), base=10 

61 ) 

62 if hasattr(response.request.body, "seek"): 

63 if content_length > 0: 

64 try: 

65 response.request.body.seek(-content_length, 1) 

66 except (io.UnsupportedOperation, OSError, ValueError): 

67 response.request.body.seek(0, 0) 

68 else: 

69 response.request.body.seek(0, 0) 

70 

71 # Consume content and release the original connection 

72 # to allow our new request to reuse the same one. 

73 _ = response.content 

74 response.raw.release_conn() 

75 request = response.request.copy() 

76 

77 ntlm_context = HttpNtlmContext( 

78 self.username, 

79 self.password, 

80 domain=self.domain, 

81 auth_type=auth_type, 

82 cbt_data=cbt_data, 

83 ntlm_compatibility=self.ntlm_compatibility, 

84 ntlm_strict_mode=self.ntlm_strict_mode 

85 ) 

86 request.headers[auth_header] = ntlm_context.get_negotiate_header() 

87 

88 # A streaming response breaks authentication. 

89 # This can be fixed by not streaming this request, which is safe 

90 # because the returned response3 will still have stream=True set if 

91 # specified in args. In addition, we expect this request to give us a 

92 # challenge and not the real content, so the content will be short 

93 # anyway. 

94 args_nostream = dict(kwargs, stream=False) 

95 response2 = response.connection.send(request, **args_nostream) 

96 

97 # needed to make NTLM auth compatible with requests-2.3.0 

98 

99 # Consume content and release the original connection 

100 # to allow our new request to reuse the same one. 

101 _ = response2.content 

102 response2.raw.release_conn() 

103 request = response2.request.copy() 

104 

105 # this is important for some web applications that store 

106 # authentication-related info in cookies (it took a long time to 

107 # figure out) 

108 if response2.headers.get("set-cookie"): 

109 request.headers["Cookie"] = response2.headers.get("set-cookie") 

110 

111 # get the challenge 

112 ntlm_context.set_challenge_from_header(response2.headers[auth_header_field]) 

113 

114 # build response 

115 # Get the response based on the challenge message 

116 request.headers[auth_header] = ntlm_context.get_authenticate_header() 

117 response3 = response2.connection.send(request, **kwargs) 

118 

119 # Update the history. 

120 response3.history.append(response) 

121 response3.history.append(response2) 

122 

123 # Get the session_security object created by ntlm-auth for signing and 

124 # sealing of messages 

125 self.session_security = ntlm_context.session_security 

126 

127 return response3 

128 

129 def response_hook(self, r, **kwargs): 

130 """The actual hook handler.""" 

131 if r.status_code == 401: 

132 # Handle server auth. 

133 www_authenticate = r.headers.get("www-authenticate", "") 

134 auth_type = get_auth_type_from_header(www_authenticate) 

135 

136 if auth_type is not None: 

137 return self.retry_using_http_ntlm_auth( 

138 "www-authenticate", "Authorization", r, auth_type, kwargs 

139 ) 

140 elif r.status_code == 407: 

141 # If we didn't have server auth, do proxy auth. 

142 proxy_authenticate = r.headers.get("proxy-authenticate", "") 

143 auth_type = get_auth_type_from_header(proxy_authenticate) 

144 if auth_type is not None: 

145 return self.retry_using_http_ntlm_auth( 

146 "proxy-authenticate", "Proxy-Authorization", r, auth_type, kwargs 

147 ) 

148 

149 return r 

150 

151 def __call__(self, r): 

152 # we must keep the connection because NTLM authenticates the 

153 # connection, not single requests 

154 r.headers["Connection"] = "Keep-Alive" 

155 

156 r.register_hook("response", self.response_hook) 

157 return r 

158 

159 def extract_username_and_password(self): 

160 if self.domain: 

161 return f"{self.domain}\\{self.username}", self.password 

162 return self.username, self.password