Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_ntlm2/core.py: 60%
112 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
1import binascii
2import logging
3import struct
4import sys
5import warnings
7import ntlm_auth.constants
8from cryptography import x509
9from cryptography.exceptions import UnsupportedAlgorithm
10from cryptography.hazmat.backends import default_backend
11from cryptography.hazmat.primitives import hashes
12from ntlm_auth.gss_channel_bindings import GssChannelBindingsStruct
13from ntlm_auth.messages import ChallengeMessage
14from requests.packages.urllib3.response import HTTPResponse
17logger = logging.getLogger(__name__)
20class NtlmCompatibility(object):
21 # see Microsoft doc on compatibility levels here: https://bit.ly/2OWZVxp
22 LM_AND_NTLMv1 = 0
23 LM_AND_NTLMv1_WITH_ESS = 1
24 NTLMv1_WITH_ESS = 2
25 NTLMv2_DEFAULT = 3
26 NTLMv2_LEVEL4 = 4
27 NTLMv2_LEVEL5 = 5
30class UnknownSignatureAlgorithmOID(Warning):
31 pass
34def get_server_cert(response):
35 """
36 Get the certificate at the request_url and return it as a hash. Will
37 get the raw socket from the original response from the server. This
38 socket is then checked if it is an SSL socket and then used to get the
39 hash of the certificate. The certificate hash is then used with NTLMv2
40 authentication for Channel Binding Tokens support. If the raw object
41 is not a urllib3 HTTPReponse (default with requests) then no
42 certificate will be returned.
44 :param response: The original 401 response from the server
45 :return: The hash of the DER encoded certificate at the request_url or None if not a HTTPS url
46 """
47 raw_response = response.raw
49 if isinstance(raw_response, HTTPResponse):
50 try:
51 if sys.version_info > (3, 0):
52 socket = raw_response._fp.fp.raw._sock
53 else:
54 socket = raw_response._fp.fp._sock
55 except AttributeError:
56 return None
58 try:
59 server_certificate = socket.getpeercert(True)
60 except AttributeError:
61 logger.debug("unable to get server certificate")
62 else:
63 return get_certificate_hash_bytes(server_certificate)
64 else:
65 logger.warning(
66 "Requests is running with a non urllib3 backend,"
67 " cannot retrieve server certificate for CBT"
68 )
71def get_certificate_hash_bytes(certificate_der):
72 # https://tools.ietf.org/html/rfc5929#section-4.1
73 cert = x509.load_der_x509_certificate(certificate_der, default_backend())
75 try:
76 hash_algorithm = cert.signature_hash_algorithm
77 except UnsupportedAlgorithm as ex:
78 logger.exception("e=")
79 warnings.warn(
80 "Failed to get signature algorithm from certificate, "
81 "unable to pass channel bindings: %s" % str(ex),
82 UnknownSignatureAlgorithmOID,
83 )
84 return None
86 # if the cert signature algorithm is either md5 or sha1 then use sha256
87 # otherwise use the signature algorithm
88 if hash_algorithm.name in ["md5", "sha1"]:
89 digest = hashes.Hash(hashes.SHA256(), default_backend())
90 else:
91 digest = hashes.Hash(hash_algorithm, default_backend())
93 digest.update(certificate_der)
94 certificate_hash_bytes = digest.finalize()
95 logger.debug("peer/server cert hash: %s", binascii.hexlify(certificate_hash_bytes))
96 return certificate_hash_bytes
99def get_auth_type_from_header(header):
100 """
101 Given a WWW-Authenticate or Proxy-Authenticate header, returns the
102 authentication type to use. We prefer NTLM over Negotiate if the server
103 supports it.
104 """
105 if "ntlm" in header.lower():
106 return "NTLM"
107 elif "negotiate" in header.lower():
108 return "Negotiate"
109 return None
112def get_ntlm_credentials(username, password):
113 try:
114 domain, username = username.split("\\", 1)
115 except ValueError:
116 domain = ""
117 return username, password, domain
120def get_cbt_data(response):
121 """
122 Create Channel Binding for TLS data
124 See:
125 - https://tools.ietf.org/html/rfc5929
126 - https://github.com/jborean93/ntlm-auth#ntlmv2
127 - https://github.com/requests/requests-ntlm/pull/116#discussion_r325961121
128 - https://support.microsoft.com/en-za/help/976918/authentication-failure-from-non-windows-ntlm-or-kerberos-servers # noqa
130 :param response: HTTP Response object
131 """
133 cert_hash_bytes = get_server_cert(response)
134 if not cert_hash_bytes:
135 logger.debug("server cert not found, channel binding tokens (CBT) wont be used")
136 return None
138 channel_binding_type = b"tls-server-end-point" # https://tools.ietf.org/html/rfc5929#section-4
139 data_type = GssChannelBindingsStruct.APPLICATION_DATA
141 cbt_data = GssChannelBindingsStruct()
142 cbt_data[data_type] = b":".join([channel_binding_type, cert_hash_bytes])
143 logger.debug("cbt data: %s", cbt_data.get_data())
144 return cbt_data
147def is_challenge_message(msg):
148 try:
149 message_type = struct.unpack("<I", msg[8:12])[0]
150 return message_type == ntlm_auth.constants.MessageTypes.NTLM_CHALLENGE
151 except struct.error:
152 return False
155def is_challenge_message_valid(msg):
156 try:
157 _ = ChallengeMessage(msg)
158 return True
159 except struct.error:
160 return False
163def fix_target_info(challenge_msg):
164 if not is_challenge_message(challenge_msg):
165 return challenge_msg
167 if is_challenge_message_valid(challenge_msg):
168 return challenge_msg
170 signature = challenge_msg[:8]
171 if signature != ntlm_auth.constants.NTLM_SIGNATURE:
172 logger.warning("invalid signature: %r", signature)
173 return challenge_msg
175 negotiate_flags_raw = challenge_msg[20:24]
176 try:
177 negotiate_flags = struct.unpack("<I", negotiate_flags_raw)[0]
178 except struct.error:
179 logger.warning("Invalid Negotiate Flags: %s", negotiate_flags_raw)
180 return challenge_msg
182 if negotiate_flags & ntlm_auth.constants.NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO:
183 try:
184 negotiate_flags &= ~ntlm_auth.constants.NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO
185 return challenge_msg[:20] + struct.pack("<I", negotiate_flags) + challenge_msg[24:]
186 except struct.error:
187 return challenge_msg
188 return challenge_msg
191def noop():
192 pass