Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests_ntlm2/dance.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

71 statements  

1import base64 

2import logging 

3 

4import ntlm_auth.ntlm 

5 

6from .core import NtlmCompatibility, fix_target_info 

7 

8 

9logger = logging.getLogger(__name__) 

10 

11 

12class HttpNtlmContext(ntlm_auth.ntlm.NtlmContext): 

13 """Thin wrapper over ntlm_auth.ntlm.NtlmContext for HTTP""" 

14 

15 def __init__( 

16 self, 

17 username, 

18 password, 

19 domain=None, 

20 workstation=None, 

21 cbt_data=None, 

22 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT, 

23 auth_type=None, 

24 ntlm_strict_mode=False, 

25 ): 

26 r""" 

27 Initialises a NTLM context to use when authenticating using the NTLM 

28 protocol. 

29 Initialises the NTLM context to use when sending and receiving messages 

30 to and from the server. You should be using this object as it supports 

31 NTLMv2 authenticate and it easier to use than before. It also brings in 

32 the ability to use signing and sealing with session_security and 

33 generate a MIC structure. 

34 

35 :param username: The username to authenticate with 

36 :param password: The password for the username 

37 :param domain: The domain part of the username (None if n/a) 

38 :param workstation: The localworkstation (None if n/a) 

39 :param cbt_data: A GssChannelBindingsStruct or None to bind channel 

40 data with the auth process. See: https://tools.ietf.org/html/rfc5929 

41 :param ntlm_compatibility: (Default 3) 

42 The Lan Manager Compatibility Level to use with the auth message 

43 This is set by an Administrator in the registry key 

44 'HKLM\SYSTEM\CurrentControlSet\Control\Lsa\LmCompatibilityLevel' 

45 The values correspond to the following; 

46 0 : LM and NTLMv1 

47 1 : LM, NTLMv1 and NTLMv1 with Extended Session Security 

48 2 : NTLMv1 and NTLMv1 with Extended Session Security 

49 3-5 : NTLMv2 Only 

50 Note: Values 3 to 5 are no different from a client perspective 

51 :param auth_type: either 'NTLM' or 'Negotiate' 

52 :param ntlm_strict_mode: If False, tries to Type 2 (ie challenge response) NTLM message 

53 that does not conform to the NTLM spec 

54 """ 

55 if auth_type not in ("NTLM", "Negotiate"): 

56 raise ValueError( 

57 f'Expected "NTLM" or "Negotiate" auth_type, got {auth_type}' 

58 ) 

59 self._auth_type = auth_type 

60 self._challenge_token = None 

61 self.ntlm_strict_mode = ntlm_strict_mode 

62 super().__init__( 

63 username, 

64 password, 

65 domain=domain, 

66 workstation=workstation, 

67 cbt_data=cbt_data, 

68 ntlm_compatibility=ntlm_compatibility, 

69 ) 

70 

71 @property 

72 def negotiate_message(self): 

73 return self._negotiate_message 

74 

75 @negotiate_message.setter 

76 def negotiate_message(self, value): 

77 self._negotiate_message = value 

78 

79 @property 

80 def challenge_message(self): 

81 return self._challenge_message 

82 

83 @challenge_message.setter 

84 def challenge_message(self, value): 

85 self._challenge_message = value 

86 

87 @property 

88 def authenticate_message(self): 

89 return self._authenticate_message 

90 

91 @authenticate_message.setter 

92 def authenticate_message(self, value): 

93 self._authenticate_message = value 

94 

95 @property 

96 def session_security(self): 

97 return self._session_security 

98 

99 @session_security.setter 

100 def session_security(self, value): 

101 self._session_security = value 

102 

103 def create_negotiate_message(self): 

104 msg = self.step() 

105 return base64.b64encode(msg) 

106 

107 def parse_challenge_message(self, msg2): 

108 challenge_msg = base64.b64decode(msg2) 

109 

110 if self.ntlm_strict_mode: 

111 self._challenge_token = challenge_msg 

112 else: 

113 fixed_challenge_msg = fix_target_info(challenge_msg) 

114 if fixed_challenge_msg != challenge_msg: 

115 logger.debug("original challenge: %s", base64.b64encode(challenge_msg)) 

116 logger.debug("modified challenge: %s", base64.b64encode(fixed_challenge_msg)) 

117 self._challenge_token = fixed_challenge_msg 

118 

119 def create_authenticate_message(self): 

120 msg = self.step(self._challenge_token) 

121 return base64.b64encode(msg) 

122 

123 def get_negotiate_header(self): 

124 negotiate_message = self.create_negotiate_message().decode("ascii") 

125 result = "{auth_type} {negotiate_message}".format( 

126 auth_type=self._auth_type, negotiate_message=negotiate_message 

127 ) 

128 return result 

129 

130 def set_challenge_from_header(self, raw_header_value): 

131 if not raw_header_value: 

132 return None 

133 

134 match_strings = ( 

135 f"{self._auth_type} ", 

136 "{}: {} ".format("Proxy-Authenticate", self._auth_type), 

137 "{}: {} ".format("WWW-Authenticate", self._auth_type), 

138 ) 

139 for header_value in raw_header_value.split(","): 

140 header_value = header_value.strip() 

141 for auth_strip in match_strings: 

142 if header_value.startswith(auth_strip): 

143 challenge = header_value.replace(auth_strip, "") 

144 return self.parse_challenge_message(challenge) 

145 return None 

146 

147 def get_authenticate_header(self): 

148 authenticate_message = self.create_authenticate_message() 

149 authenticate_message = authenticate_message.decode("ascii") 

150 return "{auth_type} {authenticate_message}".format( 

151 auth_type=self._auth_type, 

152 authenticate_message=authenticate_message 

153 )