Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/ntlm_auth/ntlm.py: 38%

126 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 07:03 +0000

1# Copyright: (c) 2018, Jordan Borean (@jborean93) <jborean93@gmail.com> 

2# MIT License (see LICENSE or https://opensource.org/licenses/MIT) 

3 

4import base64 

5import struct 

6 

7from ntlm_auth.constants import NegotiateFlags 

8from ntlm_auth.exceptions import NoAuthContextError 

9from ntlm_auth.messages import AuthenticateMessage, ChallengeMessage, \ 

10 NegotiateMessage 

11from ntlm_auth.session_security import SessionSecurity 

12 

13 

14class NtlmContext(object): 

15 

16 def __init__(self, username, password, domain=None, workstation=None, 

17 cbt_data=None, ntlm_compatibility=3): 

18 r""" 

19 Initialises a NTLM context to use when authenticating using the NTLM 

20 protocol. 

21 Initialises the NTLM context to use when sending and receiving messages 

22 to and from the server. You should be using this object as it supports 

23 NTLMv2 authenticate and it easier to use than before. It also brings in 

24 the ability to use signing and sealing with session_security and 

25 generate a MIC structure. 

26 

27 :param username: The username to authenticate with 

28 :param password: The password for the username 

29 :param domain: The domain part of the username (None if n/a) 

30 :param workstation: The localworkstation (None if n/a) 

31 :param cbt_data: A GssChannelBindingsStruct or None to bind channel 

32 data with the auth process 

33 :param ntlm_compatibility: (Default 3) 

34 The Lan Manager Compatibility Level to use with the auth message 

35 This is set by an Administrator in the registry key 

36 'HKLM\SYSTEM\CurrentControlSet\Control\Lsa\LmCompatibilityLevel' 

37 The values correspond to the following; 

38 0 : LM and NTLMv1 

39 1 : LM, NTLMv1 and NTLMv1 with Extended Session Security 

40 2 : NTLMv1 and NTLMv1 with Extended Session Security 

41 3-5 : NTLMv2 Only 

42 Note: Values 3 to 5 are no different from a client perspective 

43 """ 

44 self.username = username 

45 self.password = password 

46 self.domain = domain 

47 self.workstation = workstation 

48 self.cbt_data = cbt_data 

49 self._server_certificate_hash = None # deprecated for backwards compat 

50 self.ntlm_compatibility = ntlm_compatibility 

51 self.complete = False 

52 

53 # Setting up our flags so the challenge message returns the target info 

54 # block if supported 

55 self.negotiate_flags = NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO | \ 

56 NegotiateFlags.NTLMSSP_NEGOTIATE_128 | \ 

57 NegotiateFlags.NTLMSSP_NEGOTIATE_56 | \ 

58 NegotiateFlags.NTLMSSP_NEGOTIATE_UNICODE | \ 

59 NegotiateFlags.NTLMSSP_NEGOTIATE_VERSION | \ 

60 NegotiateFlags.NTLMSSP_NEGOTIATE_KEY_EXCH | \ 

61 NegotiateFlags.NTLMSSP_NEGOTIATE_ALWAYS_SIGN | \ 

62 NegotiateFlags.NTLMSSP_NEGOTIATE_SIGN | \ 

63 NegotiateFlags.NTLMSSP_NEGOTIATE_SEAL 

64 

65 # Setting the message types based on the ntlm_compatibility level 

66 self._set_ntlm_compatibility_flags(self.ntlm_compatibility) 

67 

68 self._negotiate_message = None 

69 self._challenge_message = None 

70 self._authenticate_message = None 

71 self._session_security = None 

72 

73 @property 

74 def mic_present(self): 

75 if self._authenticate_message: 

76 return bool(self._authenticate_message.mic) 

77 

78 return False 

79 

80 @property 

81 def session_key(self): 

82 if self._authenticate_message: 

83 return self._authenticate_message.exported_session_key 

84 

85 def reset_rc4_state(self, outgoing=True): 

86 """ Resets the signing cipher for the incoming or outgoing cipher. For SPNEGO for calculating mechListMIC. """ 

87 if self._session_security: 

88 self._session_security.reset_rc4_state(outgoing=outgoing) 

89 

90 def step(self, input_token=None): 

91 if self._negotiate_message is None: 

92 self._negotiate_message = NegotiateMessage(self.negotiate_flags, 

93 self.domain, 

94 self.workstation) 

95 return self._negotiate_message.get_data() 

96 else: 

97 self._challenge_message = ChallengeMessage(input_token) 

98 self._authenticate_message = AuthenticateMessage( 

99 self.username, self.password, self.domain, self.workstation, 

100 self._challenge_message, self.ntlm_compatibility, 

101 server_certificate_hash=self._server_certificate_hash, 

102 cbt_data=self.cbt_data 

103 ) 

104 self._authenticate_message.add_mic(self._negotiate_message, 

105 self._challenge_message) 

106 

107 flag_bytes = self._authenticate_message.negotiate_flags 

108 flags = struct.unpack("<I", flag_bytes)[0] 

109 if flags & NegotiateFlags.NTLMSSP_NEGOTIATE_SEAL or \ 

110 flags & NegotiateFlags.NTLMSSP_NEGOTIATE_SIGN: 

111 self._session_security = SessionSecurity( 

112 flags, self.session_key 

113 ) 

114 

115 self.complete = True 

116 return self._authenticate_message.get_data() 

117 

118 def sign(self, data): 

119 return self._session_security.get_signature(data) 

120 

121 def verify(self, data, signature): 

122 self._session_security.verify_signature(data, signature) 

123 

124 def wrap(self, data): 

125 if self._session_security is None: 

126 raise NoAuthContextError("Cannot wrap data as no security context " 

127 "has been established") 

128 

129 data, header = self._session_security.wrap(data) 

130 return header + data 

131 

132 def unwrap(self, data): 

133 if self._session_security is None: 

134 raise NoAuthContextError("Cannot unwrap data as no security " 

135 "context has been established") 

136 header = data[0:16] 

137 data = data[16:] 

138 message = self._session_security.unwrap(data, header) 

139 return message 

140 

141 def _set_ntlm_compatibility_flags(self, ntlm_compatibility): 

142 if (ntlm_compatibility >= 0) and (ntlm_compatibility <= 5): 

143 if ntlm_compatibility == 0: 

144 self.negotiate_flags |= \ 

145 NegotiateFlags.NTLMSSP_NEGOTIATE_NTLM | \ 

146 NegotiateFlags.NTLMSSP_NEGOTIATE_LM_KEY 

147 elif ntlm_compatibility == 1: 

148 self.negotiate_flags |= \ 

149 NegotiateFlags.NTLMSSP_NEGOTIATE_NTLM | \ 

150 NegotiateFlags.NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY 

151 else: 

152 self.negotiate_flags |= \ 

153 NegotiateFlags.NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY 

154 else: 

155 raise Exception("Unknown ntlm_compatibility level - " 

156 "expecting value between 0 and 5") 

157 

158 

159# Deprecated in favour of NtlmContext - this current class is heavily geared 

160# towards a HTTP API which is not always the case with NTLM. This is currently 

161# just a thin wrapper over NtlmContext and will be removed in future ntlm-auth 

162# versions 

163class Ntlm(object): 

164 

165 def __init__(self, ntlm_compatibility=3): 

166 self._context = NtlmContext(None, None, 

167 ntlm_compatibility=ntlm_compatibility) 

168 self._challenge_token = None 

169 

170 @property 

171 def negotiate_flags(self): 

172 return self._context.negotiate_flags 

173 

174 @negotiate_flags.setter 

175 def negotiate_flags(self, value): 

176 self._context.negotiate_flags = value 

177 

178 @property 

179 def ntlm_compatibility(self): 

180 return self._context.ntlm_compatibility 

181 

182 @ntlm_compatibility.setter 

183 def ntlm_compatibility(self, value): 

184 self._context.ntlm_compatibility = value 

185 

186 @property 

187 def negotiate_message(self): 

188 return self._context._negotiate_message 

189 

190 @negotiate_message.setter 

191 def negotiate_message(self, value): 

192 self._context._negotiate_message = value 

193 

194 @property 

195 def challenge_message(self): 

196 return self._context._challenge_message 

197 

198 @challenge_message.setter 

199 def challenge_message(self, value): 

200 self._context._challenge_message = value 

201 

202 @property 

203 def authenticate_message(self): 

204 return self._context._authenticate_message 

205 

206 @authenticate_message.setter 

207 def authenticate_message(self, value): 

208 self._context._authenticate_message = value 

209 

210 @property 

211 def session_security(self): 

212 return self._context._session_security 

213 

214 @session_security.setter 

215 def session_security(self, value): 

216 self._context._session_security = value 

217 

218 def create_negotiate_message(self, domain_name=None, workstation=None): 

219 self._context.domain = domain_name 

220 self._context.workstation = workstation 

221 msg = self._context.step() 

222 return base64.b64encode(msg) 

223 

224 def parse_challenge_message(self, msg2): 

225 self._challenge_token = base64.b64decode(msg2) 

226 

227 def create_authenticate_message(self, user_name, password, 

228 domain_name=None, workstation=None, 

229 server_certificate_hash=None): 

230 self._context.username = user_name 

231 self._context.password = password 

232 self._context.domain = domain_name 

233 self._context.workstation = workstation 

234 self._context._server_certificate_hash = server_certificate_hash 

235 msg = self._context.step(self._challenge_token) 

236 return base64.b64encode(msg)