Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_ntlm2/dance.py: 42%
72 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
1import base64
2import logging
4import ntlm_auth.messages
5import ntlm_auth.ntlm
7from .core import NtlmCompatibility, fix_target_info
10logger = logging.getLogger(__name__)
13class HttpNtlmContext(ntlm_auth.ntlm.NtlmContext):
14 """Thin wrapper over ntlm_auth.ntlm.NtlmContext for HTTP"""
16 def __init__(
17 self,
18 username,
19 password,
20 domain=None,
21 workstation=None,
22 cbt_data=None,
23 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT,
24 auth_type=None,
25 ntlm_strict_mode=False,
26 ):
27 r"""
28 Initialises a NTLM context to use when authenticating using the NTLM
29 protocol.
30 Initialises the NTLM context to use when sending and receiving messages
31 to and from the server. You should be using this object as it supports
32 NTLMv2 authenticate and it easier to use than before. It also brings in
33 the ability to use signing and sealing with session_security and
34 generate a MIC structure.
36 :param username: The username to authenticate with
37 :param password: The password for the username
38 :param domain: The domain part of the username (None if n/a)
39 :param workstation: The localworkstation (None if n/a)
40 :param cbt_data: A GssChannelBindingsStruct or None to bind channel
41 data with the auth process. See: https://tools.ietf.org/html/rfc5929
42 :param ntlm_compatibility: (Default 3)
43 The Lan Manager Compatibility Level to use with the auth message
44 This is set by an Administrator in the registry key
45 'HKLM\SYSTEM\CurrentControlSet\Control\Lsa\LmCompatibilityLevel'
46 The values correspond to the following;
47 0 : LM and NTLMv1
48 1 : LM, NTLMv1 and NTLMv1 with Extended Session Security
49 2 : NTLMv1 and NTLMv1 with Extended Session Security
50 3-5 : NTLMv2 Only
51 Note: Values 3 to 5 are no different from a client perspective
52 :param auth_type: either 'NTLM' or 'Negotiate'
53 :param ntlm_strict_mode: If False, tries to Type 2 (ie challenge response) NTLM message
54 that does not conform to the NTLM spec
55 """
56 if auth_type not in ("NTLM", "Negotiate"):
57 raise ValueError(
58 'Expected "NTLM" or "Negotiate" auth_type, got {}'.format(auth_type)
59 )
60 self._auth_type = auth_type
61 self._challenge_token = None
62 self.ntlm_strict_mode = ntlm_strict_mode
63 super(HttpNtlmContext, self).__init__(
64 username,
65 password,
66 domain=domain,
67 workstation=workstation,
68 cbt_data=cbt_data,
69 ntlm_compatibility=ntlm_compatibility,
70 )
72 @property
73 def negotiate_message(self):
74 return self._negotiate_message
76 @negotiate_message.setter
77 def negotiate_message(self, value):
78 self._negotiate_message = value
80 @property
81 def challenge_message(self):
82 return self._challenge_message
84 @challenge_message.setter
85 def challenge_message(self, value):
86 self._challenge_message = value
88 @property
89 def authenticate_message(self):
90 return self._authenticate_message
92 @authenticate_message.setter
93 def authenticate_message(self, value):
94 self._authenticate_message = value
96 @property
97 def session_security(self):
98 return self._session_security
100 @session_security.setter
101 def session_security(self, value):
102 self._session_security = value
104 def create_negotiate_message(self):
105 msg = self.step()
106 return base64.b64encode(msg)
108 def parse_challenge_message(self, msg2):
109 challenge_msg = base64.b64decode(msg2)
111 if self.ntlm_strict_mode:
112 self._challenge_token = challenge_msg
113 else:
114 fixed_challenge_msg = fix_target_info(challenge_msg)
115 if fixed_challenge_msg != challenge_msg:
116 logger.debug("original challenge: %s", base64.b64encode(challenge_msg))
117 logger.debug("modified challenge: %s", base64.b64encode(fixed_challenge_msg))
118 self._challenge_token = fixed_challenge_msg
120 def create_authenticate_message(self):
121 msg = self.step(self._challenge_token)
122 return base64.b64encode(msg)
124 def get_negotiate_header(self):
125 negotiate_message = self.create_negotiate_message().decode("ascii")
126 result = u"{auth_type} {negotiate_message}".format(
127 auth_type=self._auth_type, negotiate_message=negotiate_message
128 )
129 return result
131 def set_challenge_from_header(self, raw_header_value):
132 if not raw_header_value:
133 return None
135 match_strings = (
136 "{} ".format(self._auth_type),
137 "{}: {} ".format("Proxy-Authenticate", self._auth_type),
138 "{}: {} ".format("WWW-Authenticate", self._auth_type),
139 )
140 for header_value in raw_header_value.split(","):
141 header_value = header_value.strip()
142 for auth_strip in match_strings:
143 if header_value.startswith(auth_strip):
144 challenge = header_value.replace(auth_strip, "")
145 return self.parse_challenge_message(challenge)
146 return None
148 def get_authenticate_header(self):
149 authenticate_message = self.create_authenticate_message()
150 authenticate_message = authenticate_message.decode("ascii")
151 return u"{auth_type} {authenticate_message}".format(
152 auth_type=self._auth_type,
153 authenticate_message=authenticate_message
154 )