Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_ntlm2/requests_ntlm2.py: 15%
67 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
1import io
3from requests.auth import AuthBase
5from .core import NtlmCompatibility, get_auth_type_from_header, get_cbt_data, get_ntlm_credentials
6from .dance import HttpNtlmContext
9class HttpNtlmAuth(AuthBase):
10 """
11 HTTP NTLM Authentication Handler for Requests.
12 """
14 def __init__(
15 self, username,
16 password,
17 send_cbt=True,
18 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT,
19 ntlm_strict_mode=False
20 ):
21 """Create an authentication handler for NTLM over HTTP.
23 :param str username: Username in 'domain\\username' format
24 :param str password: Password
25 :param bool send_cbt: Will send the channel bindings over a
26 HTTPS channel (Default: True)
27 :param ntlm_compatibility: The Lan Manager Compatibility Level to use with the auth message
28 :param ntlm_strict_mode: If False, tries to Type 2 (ie challenge response) NTLM message
29 that does not conform to the NTLM spec
30 """
32 self.username, self.password, self.domain = get_ntlm_credentials(username, password)
34 if self.domain:
35 self.domain = self.domain.upper()
36 self.password = password
37 self.send_cbt = send_cbt
38 self.ntlm_compatibility = ntlm_compatibility
39 self.ntlm_strict_mode = ntlm_strict_mode
41 # This exposes the encrypt/decrypt methods used to encrypt and decrypt
42 # messages sent after ntlm authentication. These methods are utilised
43 # by libraries that call requests_ntlm to encrypt and decrypt the
44 # messages sent after authentication
45 self.session_security = None
47 def retry_using_http_ntlm_auth(
48 self, auth_header_field, auth_header, response, auth_type, kwargs
49 ):
50 # Get the certificate of the server if using HTTPS for CBT
51 cbt_data = None
52 if self.send_cbt:
53 cbt_data = get_cbt_data(response)
55 # Attempt to authenticate using HTTP NTLM challenge/response
56 if auth_header in response.request.headers:
57 return response
59 content_length = int(
60 response.request.headers.get("Content-Length", "0"), base=10
61 )
62 if hasattr(response.request.body, "seek"):
63 if content_length > 0:
64 try:
65 response.request.body.seek(-content_length, 1)
66 except (io.UnsupportedOperation, OSError, IOError, ValueError):
67 response.request.body.seek(0, 0)
68 else:
69 response.request.body.seek(0, 0)
71 # Consume content and release the original connection
72 # to allow our new request to reuse the same one.
73 _ = response.content
74 response.raw.release_conn()
75 request = response.request.copy()
77 ntlm_context = HttpNtlmContext(
78 self.username,
79 self.password,
80 domain=self.domain,
81 auth_type=auth_type,
82 cbt_data=cbt_data,
83 ntlm_compatibility=self.ntlm_compatibility,
84 ntlm_strict_mode=self.ntlm_strict_mode
85 )
86 request.headers[auth_header] = ntlm_context.get_negotiate_header()
88 # A streaming response breaks authentication.
89 # This can be fixed by not streaming this request, which is safe
90 # because the returned response3 will still have stream=True set if
91 # specified in args. In addition, we expect this request to give us a
92 # challenge and not the real content, so the content will be short
93 # anyway.
94 args_nostream = dict(kwargs, stream=False)
95 response2 = response.connection.send(request, **args_nostream)
97 # needed to make NTLM auth compatible with requests-2.3.0
99 # Consume content and release the original connection
100 # to allow our new request to reuse the same one.
101 _ = response2.content
102 response2.raw.release_conn()
103 request = response2.request.copy()
105 # this is important for some web applications that store
106 # authentication-related info in cookies (it took a long time to
107 # figure out)
108 if response2.headers.get("set-cookie"):
109 request.headers["Cookie"] = response2.headers.get("set-cookie")
111 # get the challenge
112 ntlm_context.set_challenge_from_header(response2.headers[auth_header_field])
114 # build response
115 # Get the response based on the challenge message
116 request.headers[auth_header] = ntlm_context.get_authenticate_header()
117 response3 = response2.connection.send(request, **kwargs)
119 # Update the history.
120 response3.history.append(response)
121 response3.history.append(response2)
123 # Get the session_security object created by ntlm-auth for signing and
124 # sealing of messages
125 self.session_security = ntlm_context.session_security
127 return response3
129 def response_hook(self, r, **kwargs):
130 """The actual hook handler."""
131 if r.status_code == 401:
132 # Handle server auth.
133 www_authenticate = r.headers.get("www-authenticate", "")
134 auth_type = get_auth_type_from_header(www_authenticate)
136 if auth_type is not None:
137 return self.retry_using_http_ntlm_auth(
138 "www-authenticate", "Authorization", r, auth_type, kwargs
139 )
140 elif r.status_code == 407:
141 # If we didn't have server auth, do proxy auth.
142 proxy_authenticate = r.headers.get("proxy-authenticate", "")
143 auth_type = get_auth_type_from_header(proxy_authenticate)
144 if auth_type is not None:
145 return self.retry_using_http_ntlm_auth(
146 "proxy-authenticate", "Proxy-Authorization", r, auth_type, kwargs
147 )
149 return r
151 def __call__(self, r):
152 # we must keep the connection because NTLM authenticates the
153 # connection, not single requests
154 r.headers["Connection"] = "Keep-Alive"
156 r.register_hook("response", self.response_hook)
157 return r
159 def extract_username_and_password(self):
160 if self.domain:
161 return "{}\\{}".format(self.domain, self.username), self.password
162 return self.username, self.password