Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_ntlm2/dance.py: 42%

72 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:20 +0000

1import base64 

2import logging 

3 

4import ntlm_auth.messages 

5import ntlm_auth.ntlm 

6 

7from .core import NtlmCompatibility, fix_target_info 

8 

9 

10logger = logging.getLogger(__name__) 

11 

12 

13class HttpNtlmContext(ntlm_auth.ntlm.NtlmContext): 

14 """Thin wrapper over ntlm_auth.ntlm.NtlmContext for HTTP""" 

15 

16 def __init__( 

17 self, 

18 username, 

19 password, 

20 domain=None, 

21 workstation=None, 

22 cbt_data=None, 

23 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT, 

24 auth_type=None, 

25 ntlm_strict_mode=False, 

26 ): 

27 r""" 

28 Initialises a NTLM context to use when authenticating using the NTLM 

29 protocol. 

30 Initialises the NTLM context to use when sending and receiving messages 

31 to and from the server. You should be using this object as it supports 

32 NTLMv2 authenticate and it easier to use than before. It also brings in 

33 the ability to use signing and sealing with session_security and 

34 generate a MIC structure. 

35 

36 :param username: The username to authenticate with 

37 :param password: The password for the username 

38 :param domain: The domain part of the username (None if n/a) 

39 :param workstation: The localworkstation (None if n/a) 

40 :param cbt_data: A GssChannelBindingsStruct or None to bind channel 

41 data with the auth process. See: https://tools.ietf.org/html/rfc5929 

42 :param ntlm_compatibility: (Default 3) 

43 The Lan Manager Compatibility Level to use with the auth message 

44 This is set by an Administrator in the registry key 

45 'HKLM\SYSTEM\CurrentControlSet\Control\Lsa\LmCompatibilityLevel' 

46 The values correspond to the following; 

47 0 : LM and NTLMv1 

48 1 : LM, NTLMv1 and NTLMv1 with Extended Session Security 

49 2 : NTLMv1 and NTLMv1 with Extended Session Security 

50 3-5 : NTLMv2 Only 

51 Note: Values 3 to 5 are no different from a client perspective 

52 :param auth_type: either 'NTLM' or 'Negotiate' 

53 :param ntlm_strict_mode: If False, tries to Type 2 (ie challenge response) NTLM message 

54 that does not conform to the NTLM spec 

55 """ 

56 if auth_type not in ("NTLM", "Negotiate"): 

57 raise ValueError( 

58 'Expected "NTLM" or "Negotiate" auth_type, got {}'.format(auth_type) 

59 ) 

60 self._auth_type = auth_type 

61 self._challenge_token = None 

62 self.ntlm_strict_mode = ntlm_strict_mode 

63 super(HttpNtlmContext, self).__init__( 

64 username, 

65 password, 

66 domain=domain, 

67 workstation=workstation, 

68 cbt_data=cbt_data, 

69 ntlm_compatibility=ntlm_compatibility, 

70 ) 

71 

72 @property 

73 def negotiate_message(self): 

74 return self._negotiate_message 

75 

76 @negotiate_message.setter 

77 def negotiate_message(self, value): 

78 self._negotiate_message = value 

79 

80 @property 

81 def challenge_message(self): 

82 return self._challenge_message 

83 

84 @challenge_message.setter 

85 def challenge_message(self, value): 

86 self._challenge_message = value 

87 

88 @property 

89 def authenticate_message(self): 

90 return self._authenticate_message 

91 

92 @authenticate_message.setter 

93 def authenticate_message(self, value): 

94 self._authenticate_message = value 

95 

96 @property 

97 def session_security(self): 

98 return self._session_security 

99 

100 @session_security.setter 

101 def session_security(self, value): 

102 self._session_security = value 

103 

104 def create_negotiate_message(self): 

105 msg = self.step() 

106 return base64.b64encode(msg) 

107 

108 def parse_challenge_message(self, msg2): 

109 challenge_msg = base64.b64decode(msg2) 

110 

111 if self.ntlm_strict_mode: 

112 self._challenge_token = challenge_msg 

113 else: 

114 fixed_challenge_msg = fix_target_info(challenge_msg) 

115 if fixed_challenge_msg != challenge_msg: 

116 logger.debug("original challenge: %s", base64.b64encode(challenge_msg)) 

117 logger.debug("modified challenge: %s", base64.b64encode(fixed_challenge_msg)) 

118 self._challenge_token = fixed_challenge_msg 

119 

120 def create_authenticate_message(self): 

121 msg = self.step(self._challenge_token) 

122 return base64.b64encode(msg) 

123 

124 def get_negotiate_header(self): 

125 negotiate_message = self.create_negotiate_message().decode("ascii") 

126 result = u"{auth_type} {negotiate_message}".format( 

127 auth_type=self._auth_type, negotiate_message=negotiate_message 

128 ) 

129 return result 

130 

131 def set_challenge_from_header(self, raw_header_value): 

132 if not raw_header_value: 

133 return None 

134 

135 match_strings = ( 

136 "{} ".format(self._auth_type), 

137 "{}: {} ".format("Proxy-Authenticate", self._auth_type), 

138 "{}: {} ".format("WWW-Authenticate", self._auth_type), 

139 ) 

140 for header_value in raw_header_value.split(","): 

141 header_value = header_value.strip() 

142 for auth_strip in match_strings: 

143 if header_value.startswith(auth_strip): 

144 challenge = header_value.replace(auth_strip, "") 

145 return self.parse_challenge_message(challenge) 

146 return None 

147 

148 def get_authenticate_header(self): 

149 authenticate_message = self.create_authenticate_message() 

150 authenticate_message = authenticate_message.decode("ascii") 

151 return u"{auth_type} {authenticate_message}".format( 

152 auth_type=self._auth_type, 

153 authenticate_message=authenticate_message 

154 )