1import base64
2import logging
3
4import ntlm_auth.ntlm
5
6from .core import NtlmCompatibility, fix_target_info
7
8
9logger = logging.getLogger(__name__)
10
11
12class HttpNtlmContext(ntlm_auth.ntlm.NtlmContext):
13 """Thin wrapper over ntlm_auth.ntlm.NtlmContext for HTTP"""
14
15 def __init__(
16 self,
17 username,
18 password,
19 domain=None,
20 workstation=None,
21 cbt_data=None,
22 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT,
23 auth_type=None,
24 ntlm_strict_mode=False,
25 ):
26 r"""
27 Initialises a NTLM context to use when authenticating using the NTLM
28 protocol.
29 Initialises the NTLM context to use when sending and receiving messages
30 to and from the server. You should be using this object as it supports
31 NTLMv2 authenticate and it easier to use than before. It also brings in
32 the ability to use signing and sealing with session_security and
33 generate a MIC structure.
34
35 :param username: The username to authenticate with
36 :param password: The password for the username
37 :param domain: The domain part of the username (None if n/a)
38 :param workstation: The localworkstation (None if n/a)
39 :param cbt_data: A GssChannelBindingsStruct or None to bind channel
40 data with the auth process. See: https://tools.ietf.org/html/rfc5929
41 :param ntlm_compatibility: (Default 3)
42 The Lan Manager Compatibility Level to use with the auth message
43 This is set by an Administrator in the registry key
44 'HKLM\SYSTEM\CurrentControlSet\Control\Lsa\LmCompatibilityLevel'
45 The values correspond to the following;
46 0 : LM and NTLMv1
47 1 : LM, NTLMv1 and NTLMv1 with Extended Session Security
48 2 : NTLMv1 and NTLMv1 with Extended Session Security
49 3-5 : NTLMv2 Only
50 Note: Values 3 to 5 are no different from a client perspective
51 :param auth_type: either 'NTLM' or 'Negotiate'
52 :param ntlm_strict_mode: If False, tries to Type 2 (ie challenge response) NTLM message
53 that does not conform to the NTLM spec
54 """
55 if auth_type not in ("NTLM", "Negotiate"):
56 raise ValueError(
57 f'Expected "NTLM" or "Negotiate" auth_type, got {auth_type}'
58 )
59 self._auth_type = auth_type
60 self._challenge_token = None
61 self.ntlm_strict_mode = ntlm_strict_mode
62 super().__init__(
63 username,
64 password,
65 domain=domain,
66 workstation=workstation,
67 cbt_data=cbt_data,
68 ntlm_compatibility=ntlm_compatibility,
69 )
70
71 @property
72 def negotiate_message(self):
73 return self._negotiate_message
74
75 @negotiate_message.setter
76 def negotiate_message(self, value):
77 self._negotiate_message = value
78
79 @property
80 def challenge_message(self):
81 return self._challenge_message
82
83 @challenge_message.setter
84 def challenge_message(self, value):
85 self._challenge_message = value
86
87 @property
88 def authenticate_message(self):
89 return self._authenticate_message
90
91 @authenticate_message.setter
92 def authenticate_message(self, value):
93 self._authenticate_message = value
94
95 @property
96 def session_security(self):
97 return self._session_security
98
99 @session_security.setter
100 def session_security(self, value):
101 self._session_security = value
102
103 def create_negotiate_message(self):
104 msg = self.step()
105 return base64.b64encode(msg)
106
107 def parse_challenge_message(self, msg2):
108 challenge_msg = base64.b64decode(msg2)
109
110 if self.ntlm_strict_mode:
111 self._challenge_token = challenge_msg
112 else:
113 fixed_challenge_msg = fix_target_info(challenge_msg)
114 if fixed_challenge_msg != challenge_msg:
115 logger.debug("original challenge: %s", base64.b64encode(challenge_msg))
116 logger.debug("modified challenge: %s", base64.b64encode(fixed_challenge_msg))
117 self._challenge_token = fixed_challenge_msg
118
119 def create_authenticate_message(self):
120 msg = self.step(self._challenge_token)
121 return base64.b64encode(msg)
122
123 def get_negotiate_header(self):
124 negotiate_message = self.create_negotiate_message().decode("ascii")
125 result = "{auth_type} {negotiate_message}".format(
126 auth_type=self._auth_type, negotiate_message=negotiate_message
127 )
128 return result
129
130 def set_challenge_from_header(self, raw_header_value):
131 if not raw_header_value:
132 return None
133
134 match_strings = (
135 f"{self._auth_type} ",
136 "{}: {} ".format("Proxy-Authenticate", self._auth_type),
137 "{}: {} ".format("WWW-Authenticate", self._auth_type),
138 )
139 for header_value in raw_header_value.split(","):
140 header_value = header_value.strip()
141 for auth_strip in match_strings:
142 if header_value.startswith(auth_strip):
143 challenge = header_value.replace(auth_strip, "")
144 return self.parse_challenge_message(challenge)
145 return None
146
147 def get_authenticate_header(self):
148 authenticate_message = self.create_authenticate_message()
149 authenticate_message = authenticate_message.decode("ascii")
150 return "{auth_type} {authenticate_message}".format(
151 auth_type=self._auth_type,
152 authenticate_message=authenticate_message
153 )