1import io
2
3from requests.auth import AuthBase
4
5from .core import NtlmCompatibility, get_auth_type_from_header, get_cbt_data, get_ntlm_credentials
6from .dance import HttpNtlmContext
7
8
9class HttpNtlmAuth(AuthBase):
10 """
11 HTTP NTLM Authentication Handler for Requests.
12 """
13
14 def __init__(
15 self, username,
16 password,
17 send_cbt=True,
18 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT,
19 ntlm_strict_mode=False
20 ):
21 """Create an authentication handler for NTLM over HTTP.
22
23 :param str username: Username in 'domain\\username' format
24 :param str password: Password
25 :param bool send_cbt: Will send the channel bindings over a
26 HTTPS channel (Default: True)
27 :param ntlm_compatibility: The Lan Manager Compatibility Level to use with the auth message
28 :param ntlm_strict_mode: If False, tries to Type 2 (ie challenge response) NTLM message
29 that does not conform to the NTLM spec
30 """
31
32 self.username, self.password, self.domain = get_ntlm_credentials(username, password)
33
34 if self.domain:
35 self.domain = self.domain.upper()
36 self.password = password
37 self.send_cbt = send_cbt
38 self.ntlm_compatibility = ntlm_compatibility
39 self.ntlm_strict_mode = ntlm_strict_mode
40
41 # This exposes the encrypt/decrypt methods used to encrypt and decrypt
42 # messages sent after ntlm authentication. These methods are utilised
43 # by libraries that call requests_ntlm to encrypt and decrypt the
44 # messages sent after authentication
45 self.session_security = None
46
47 def retry_using_http_ntlm_auth(
48 self, auth_header_field, auth_header, response, auth_type, kwargs
49 ):
50 # Get the certificate of the server if using HTTPS for CBT
51 cbt_data = None
52 if self.send_cbt:
53 cbt_data = get_cbt_data(response)
54
55 # Attempt to authenticate using HTTP NTLM challenge/response
56 if auth_header in response.request.headers:
57 return response
58
59 content_length = int(
60 response.request.headers.get("Content-Length", "0"), base=10
61 )
62 if hasattr(response.request.body, "seek"):
63 if content_length > 0:
64 try:
65 response.request.body.seek(-content_length, 1)
66 except (io.UnsupportedOperation, OSError, ValueError):
67 response.request.body.seek(0, 0)
68 else:
69 response.request.body.seek(0, 0)
70
71 # Consume content and release the original connection
72 # to allow our new request to reuse the same one.
73 _ = response.content
74 response.raw.release_conn()
75 request = response.request.copy()
76
77 ntlm_context = HttpNtlmContext(
78 self.username,
79 self.password,
80 domain=self.domain,
81 auth_type=auth_type,
82 cbt_data=cbt_data,
83 ntlm_compatibility=self.ntlm_compatibility,
84 ntlm_strict_mode=self.ntlm_strict_mode
85 )
86 request.headers[auth_header] = ntlm_context.get_negotiate_header()
87
88 # A streaming response breaks authentication.
89 # This can be fixed by not streaming this request, which is safe
90 # because the returned response3 will still have stream=True set if
91 # specified in args. In addition, we expect this request to give us a
92 # challenge and not the real content, so the content will be short
93 # anyway.
94 args_nostream = dict(kwargs, stream=False)
95 response2 = response.connection.send(request, **args_nostream)
96
97 # needed to make NTLM auth compatible with requests-2.3.0
98
99 # Consume content and release the original connection
100 # to allow our new request to reuse the same one.
101 _ = response2.content
102 response2.raw.release_conn()
103 request = response2.request.copy()
104
105 # this is important for some web applications that store
106 # authentication-related info in cookies (it took a long time to
107 # figure out)
108 if response2.headers.get("set-cookie"):
109 request.headers["Cookie"] = response2.headers.get("set-cookie")
110
111 # get the challenge
112 ntlm_context.set_challenge_from_header(response2.headers[auth_header_field])
113
114 # build response
115 # Get the response based on the challenge message
116 request.headers[auth_header] = ntlm_context.get_authenticate_header()
117 response3 = response2.connection.send(request, **kwargs)
118
119 # Update the history.
120 response3.history.append(response)
121 response3.history.append(response2)
122
123 # Get the session_security object created by ntlm-auth for signing and
124 # sealing of messages
125 self.session_security = ntlm_context.session_security
126
127 return response3
128
129 def response_hook(self, r, **kwargs):
130 """The actual hook handler."""
131 if r.status_code == 401:
132 # Handle server auth.
133 www_authenticate = r.headers.get("www-authenticate", "")
134 auth_type = get_auth_type_from_header(www_authenticate)
135
136 if auth_type is not None:
137 return self.retry_using_http_ntlm_auth(
138 "www-authenticate", "Authorization", r, auth_type, kwargs
139 )
140 elif r.status_code == 407:
141 # If we didn't have server auth, do proxy auth.
142 proxy_authenticate = r.headers.get("proxy-authenticate", "")
143 auth_type = get_auth_type_from_header(proxy_authenticate)
144 if auth_type is not None:
145 return self.retry_using_http_ntlm_auth(
146 "proxy-authenticate", "Proxy-Authorization", r, auth_type, kwargs
147 )
148
149 return r
150
151 def __call__(self, r):
152 # we must keep the connection because NTLM authenticates the
153 # connection, not single requests
154 r.headers["Connection"] = "Keep-Alive"
155
156 r.register_hook("response", self.response_hook)
157 return r
158
159 def extract_username_and_password(self):
160 if self.domain:
161 return f"{self.domain}\\{self.username}", self.password
162 return self.username, self.password