Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/ntlm_auth/ntlm.py: 38%
126 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:03 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:03 +0000
1# Copyright: (c) 2018, Jordan Borean (@jborean93) <jborean93@gmail.com>
2# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
4import base64
5import struct
7from ntlm_auth.constants import NegotiateFlags
8from ntlm_auth.exceptions import NoAuthContextError
9from ntlm_auth.messages import AuthenticateMessage, ChallengeMessage, \
10 NegotiateMessage
11from ntlm_auth.session_security import SessionSecurity
14class NtlmContext(object):
16 def __init__(self, username, password, domain=None, workstation=None,
17 cbt_data=None, ntlm_compatibility=3):
18 r"""
19 Initialises a NTLM context to use when authenticating using the NTLM
20 protocol.
21 Initialises the NTLM context to use when sending and receiving messages
22 to and from the server. You should be using this object as it supports
23 NTLMv2 authenticate and it easier to use than before. It also brings in
24 the ability to use signing and sealing with session_security and
25 generate a MIC structure.
27 :param username: The username to authenticate with
28 :param password: The password for the username
29 :param domain: The domain part of the username (None if n/a)
30 :param workstation: The localworkstation (None if n/a)
31 :param cbt_data: A GssChannelBindingsStruct or None to bind channel
32 data with the auth process
33 :param ntlm_compatibility: (Default 3)
34 The Lan Manager Compatibility Level to use with the auth message
35 This is set by an Administrator in the registry key
36 'HKLM\SYSTEM\CurrentControlSet\Control\Lsa\LmCompatibilityLevel'
37 The values correspond to the following;
38 0 : LM and NTLMv1
39 1 : LM, NTLMv1 and NTLMv1 with Extended Session Security
40 2 : NTLMv1 and NTLMv1 with Extended Session Security
41 3-5 : NTLMv2 Only
42 Note: Values 3 to 5 are no different from a client perspective
43 """
44 self.username = username
45 self.password = password
46 self.domain = domain
47 self.workstation = workstation
48 self.cbt_data = cbt_data
49 self._server_certificate_hash = None # deprecated for backwards compat
50 self.ntlm_compatibility = ntlm_compatibility
51 self.complete = False
53 # Setting up our flags so the challenge message returns the target info
54 # block if supported
55 self.negotiate_flags = NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO | \
56 NegotiateFlags.NTLMSSP_NEGOTIATE_128 | \
57 NegotiateFlags.NTLMSSP_NEGOTIATE_56 | \
58 NegotiateFlags.NTLMSSP_NEGOTIATE_UNICODE | \
59 NegotiateFlags.NTLMSSP_NEGOTIATE_VERSION | \
60 NegotiateFlags.NTLMSSP_NEGOTIATE_KEY_EXCH | \
61 NegotiateFlags.NTLMSSP_NEGOTIATE_ALWAYS_SIGN | \
62 NegotiateFlags.NTLMSSP_NEGOTIATE_SIGN | \
63 NegotiateFlags.NTLMSSP_NEGOTIATE_SEAL
65 # Setting the message types based on the ntlm_compatibility level
66 self._set_ntlm_compatibility_flags(self.ntlm_compatibility)
68 self._negotiate_message = None
69 self._challenge_message = None
70 self._authenticate_message = None
71 self._session_security = None
73 @property
74 def mic_present(self):
75 if self._authenticate_message:
76 return bool(self._authenticate_message.mic)
78 return False
80 @property
81 def session_key(self):
82 if self._authenticate_message:
83 return self._authenticate_message.exported_session_key
85 def reset_rc4_state(self, outgoing=True):
86 """ Resets the signing cipher for the incoming or outgoing cipher. For SPNEGO for calculating mechListMIC. """
87 if self._session_security:
88 self._session_security.reset_rc4_state(outgoing=outgoing)
90 def step(self, input_token=None):
91 if self._negotiate_message is None:
92 self._negotiate_message = NegotiateMessage(self.negotiate_flags,
93 self.domain,
94 self.workstation)
95 return self._negotiate_message.get_data()
96 else:
97 self._challenge_message = ChallengeMessage(input_token)
98 self._authenticate_message = AuthenticateMessage(
99 self.username, self.password, self.domain, self.workstation,
100 self._challenge_message, self.ntlm_compatibility,
101 server_certificate_hash=self._server_certificate_hash,
102 cbt_data=self.cbt_data
103 )
104 self._authenticate_message.add_mic(self._negotiate_message,
105 self._challenge_message)
107 flag_bytes = self._authenticate_message.negotiate_flags
108 flags = struct.unpack("<I", flag_bytes)[0]
109 if flags & NegotiateFlags.NTLMSSP_NEGOTIATE_SEAL or \
110 flags & NegotiateFlags.NTLMSSP_NEGOTIATE_SIGN:
111 self._session_security = SessionSecurity(
112 flags, self.session_key
113 )
115 self.complete = True
116 return self._authenticate_message.get_data()
118 def sign(self, data):
119 return self._session_security.get_signature(data)
121 def verify(self, data, signature):
122 self._session_security.verify_signature(data, signature)
124 def wrap(self, data):
125 if self._session_security is None:
126 raise NoAuthContextError("Cannot wrap data as no security context "
127 "has been established")
129 data, header = self._session_security.wrap(data)
130 return header + data
132 def unwrap(self, data):
133 if self._session_security is None:
134 raise NoAuthContextError("Cannot unwrap data as no security "
135 "context has been established")
136 header = data[0:16]
137 data = data[16:]
138 message = self._session_security.unwrap(data, header)
139 return message
141 def _set_ntlm_compatibility_flags(self, ntlm_compatibility):
142 if (ntlm_compatibility >= 0) and (ntlm_compatibility <= 5):
143 if ntlm_compatibility == 0:
144 self.negotiate_flags |= \
145 NegotiateFlags.NTLMSSP_NEGOTIATE_NTLM | \
146 NegotiateFlags.NTLMSSP_NEGOTIATE_LM_KEY
147 elif ntlm_compatibility == 1:
148 self.negotiate_flags |= \
149 NegotiateFlags.NTLMSSP_NEGOTIATE_NTLM | \
150 NegotiateFlags.NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY
151 else:
152 self.negotiate_flags |= \
153 NegotiateFlags.NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY
154 else:
155 raise Exception("Unknown ntlm_compatibility level - "
156 "expecting value between 0 and 5")
159# Deprecated in favour of NtlmContext - this current class is heavily geared
160# towards a HTTP API which is not always the case with NTLM. This is currently
161# just a thin wrapper over NtlmContext and will be removed in future ntlm-auth
162# versions
163class Ntlm(object):
165 def __init__(self, ntlm_compatibility=3):
166 self._context = NtlmContext(None, None,
167 ntlm_compatibility=ntlm_compatibility)
168 self._challenge_token = None
170 @property
171 def negotiate_flags(self):
172 return self._context.negotiate_flags
174 @negotiate_flags.setter
175 def negotiate_flags(self, value):
176 self._context.negotiate_flags = value
178 @property
179 def ntlm_compatibility(self):
180 return self._context.ntlm_compatibility
182 @ntlm_compatibility.setter
183 def ntlm_compatibility(self, value):
184 self._context.ntlm_compatibility = value
186 @property
187 def negotiate_message(self):
188 return self._context._negotiate_message
190 @negotiate_message.setter
191 def negotiate_message(self, value):
192 self._context._negotiate_message = value
194 @property
195 def challenge_message(self):
196 return self._context._challenge_message
198 @challenge_message.setter
199 def challenge_message(self, value):
200 self._context._challenge_message = value
202 @property
203 def authenticate_message(self):
204 return self._context._authenticate_message
206 @authenticate_message.setter
207 def authenticate_message(self, value):
208 self._context._authenticate_message = value
210 @property
211 def session_security(self):
212 return self._context._session_security
214 @session_security.setter
215 def session_security(self, value):
216 self._context._session_security = value
218 def create_negotiate_message(self, domain_name=None, workstation=None):
219 self._context.domain = domain_name
220 self._context.workstation = workstation
221 msg = self._context.step()
222 return base64.b64encode(msg)
224 def parse_challenge_message(self, msg2):
225 self._challenge_token = base64.b64decode(msg2)
227 def create_authenticate_message(self, user_name, password,
228 domain_name=None, workstation=None,
229 server_certificate_hash=None):
230 self._context.username = user_name
231 self._context.password = password
232 self._context.domain = domain_name
233 self._context.workstation = workstation
234 self._context._server_certificate_hash = server_certificate_hash
235 msg = self._context.step(self._challenge_token)
236 return base64.b64encode(msg)