1import binascii
2import logging
3import struct
4import warnings
5from typing import Optional, Tuple
6
7import ntlm_auth.constants
8from cryptography import x509
9from cryptography.exceptions import UnsupportedAlgorithm
10from cryptography.hazmat.backends import default_backend
11from cryptography.hazmat.primitives import hashes
12from ntlm_auth.gss_channel_bindings import GssChannelBindingsStruct
13from ntlm_auth.messages import ChallengeMessage
14from requests.packages.urllib3.response import HTTPResponse
15
16
17logger = logging.getLogger(__name__)
18
19
20class NtlmCompatibility:
21 # see Microsoft doc on compatibility levels here: https://bit.ly/2OWZVxp
22 LM_AND_NTLMv1 = 0
23 LM_AND_NTLMv1_WITH_ESS = 1
24 NTLMv1_WITH_ESS = 2
25 NTLMv2_DEFAULT = 3
26 NTLMv2_LEVEL4 = 4
27 NTLMv2_LEVEL5 = 5
28
29
30class UnknownSignatureAlgorithmOID(Warning):
31 pass
32
33
34def get_server_cert(response):
35 """
36 Get the certificate at the request_url and return it as a hash. Will
37 get the raw socket from the original response from the server. This
38 socket is then checked if it is an SSL socket and then used to get the
39 hash of the certificate. The certificate hash is then used with NTLMv2
40 authentication for Channel Binding Tokens support. If the raw object
41 is not urllib3 HTTPResponse (default with requests) then no certificate
42 will be returned.
43
44 :param response: The original 401 response from the server
45 :return: The hash of the DER encoded certificate at the request_url or None
46 if not an HTTPS url.
47 """
48 raw_response = response.raw
49
50 if isinstance(raw_response, HTTPResponse):
51 try:
52 socket = raw_response._fp.fp.raw._sock
53 except AttributeError:
54 return None
55
56 try:
57 server_certificate = socket.getpeercert(True)
58 except AttributeError:
59 logger.debug("unable to get server certificate")
60 else:
61 return get_certificate_hash_bytes(server_certificate)
62 else:
63 logger.warning(
64 "Requests is running with a non urllib3 backend,"
65 " cannot retrieve server certificate for CBT"
66 )
67 return None
68
69
70def get_certificate_hash_bytes(certificate_der: bytes) -> Optional[bytes]:
71 # https://tools.ietf.org/html/rfc5929#section-4.1
72 cert = x509.load_der_x509_certificate(certificate_der, default_backend())
73
74 try:
75 hash_algorithm = cert.signature_hash_algorithm
76 except UnsupportedAlgorithm as ex:
77 logger.exception("e=")
78 warnings.warn(
79 "Failed to get signature algorithm from certificate, "
80 "unable to pass channel bindings: %s" % str(ex),
81 UnknownSignatureAlgorithmOID,
82 )
83 return None
84
85 # if the cert signature algorithm is either md5 or sha1 then use sha256
86 # otherwise use the signature algorithm
87 if hash_algorithm.name in ["md5", "sha1"]:
88 digest = hashes.Hash(hashes.SHA256(), default_backend())
89 else:
90 digest = hashes.Hash(hash_algorithm, default_backend())
91
92 digest.update(certificate_der)
93 certificate_hash_bytes = digest.finalize()
94 logger.debug("peer/server cert hash: %s", binascii.hexlify(certificate_hash_bytes))
95 return certificate_hash_bytes
96
97
98def get_auth_type_from_header(header: str) -> Optional[str]:
99 """
100 Given a WWW-Authenticate or Proxy-Authenticate header, returns the
101 authentication type to use. We prefer NTLM over Negotiate if the server
102 supports it.
103 """
104 if "ntlm" in header.lower():
105 return "NTLM"
106 elif "negotiate" in header.lower():
107 return "Negotiate"
108 return None
109
110
111def get_ntlm_credentials(username: str, password: str) -> Tuple[str, str, str]:
112 try:
113 domain, username = username.split("\\", 1)
114 except ValueError:
115 domain = ""
116 return username, password, domain
117
118
119def get_cbt_data(response):
120 """
121 Create Channel Binding for TLS data
122
123 See:
124 - https://tools.ietf.org/html/rfc5929
125 - https://github.com/jborean93/ntlm-auth#ntlmv2
126 - https://github.com/requests/requests-ntlm/pull/116#discussion_r325961121
127 - https://support.microsoft.com/en-za/help/976918/authentication-failure-from-non-windows-ntlm-or-kerberos-servers # noqa
128
129 :param response: HTTP Response object
130 """
131
132 cert_hash_bytes = get_server_cert(response)
133 if not cert_hash_bytes:
134 logger.debug("server cert not found, channel binding tokens (CBT) wont be used")
135 return None
136
137 channel_binding_type = b"tls-server-end-point" # https://tools.ietf.org/html/rfc5929#section-4
138 data_type = GssChannelBindingsStruct.APPLICATION_DATA
139
140 cbt_data = GssChannelBindingsStruct()
141 cbt_data[data_type] = b":".join([channel_binding_type, cert_hash_bytes])
142 logger.debug("cbt data: %s", cbt_data.get_data())
143 return cbt_data
144
145
146def is_challenge_message(msg: bytes) -> bool:
147 try:
148 message_type = struct.unpack("<I", msg[8:12])[0]
149 return message_type == ntlm_auth.constants.MessageTypes.NTLM_CHALLENGE
150 except struct.error:
151 return False
152
153
154def is_challenge_message_valid(msg: bytes) -> bool:
155 try:
156 _ = ChallengeMessage(msg)
157 return True
158 except struct.error:
159 return False
160
161
162def fix_target_info(challenge_msg: bytes) -> bytes:
163 if not is_challenge_message(challenge_msg):
164 return challenge_msg
165
166 if is_challenge_message_valid(challenge_msg):
167 return challenge_msg
168
169 signature = challenge_msg[:8]
170 if signature != ntlm_auth.constants.NTLM_SIGNATURE:
171 logger.warning("invalid signature: %r", signature)
172 return challenge_msg
173
174 negotiate_flags_raw = challenge_msg[20:24]
175 try:
176 negotiate_flags = struct.unpack("<I", negotiate_flags_raw)[0]
177 except struct.error:
178 logger.warning("Invalid Negotiate Flags: %s", negotiate_flags_raw)
179 return challenge_msg
180
181 if negotiate_flags & ntlm_auth.constants.NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO:
182 try:
183 negotiate_flags &= ~ntlm_auth.constants.NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO
184 return challenge_msg[:20] + struct.pack("<I", negotiate_flags) + challenge_msg[24:]
185 except struct.error:
186 return challenge_msg
187 return challenge_msg
188
189
190def noop():
191 pass