Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests_ntlm2/core.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

111 statements  

1import binascii 

2import logging 

3import struct 

4import warnings 

5from typing import Optional, Tuple 

6 

7import ntlm_auth.constants 

8from cryptography import x509 

9from cryptography.exceptions import UnsupportedAlgorithm 

10from cryptography.hazmat.backends import default_backend 

11from cryptography.hazmat.primitives import hashes 

12from ntlm_auth.gss_channel_bindings import GssChannelBindingsStruct 

13from ntlm_auth.messages import ChallengeMessage 

14from requests.packages.urllib3.response import HTTPResponse 

15 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class NtlmCompatibility: 

21 # see Microsoft doc on compatibility levels here: https://bit.ly/2OWZVxp 

22 LM_AND_NTLMv1 = 0 

23 LM_AND_NTLMv1_WITH_ESS = 1 

24 NTLMv1_WITH_ESS = 2 

25 NTLMv2_DEFAULT = 3 

26 NTLMv2_LEVEL4 = 4 

27 NTLMv2_LEVEL5 = 5 

28 

29 

30class UnknownSignatureAlgorithmOID(Warning): 

31 pass 

32 

33 

34def get_server_cert(response): 

35 """ 

36 Get the certificate at the request_url and return it as a hash. Will 

37 get the raw socket from the original response from the server. This 

38 socket is then checked if it is an SSL socket and then used to get the 

39 hash of the certificate. The certificate hash is then used with NTLMv2 

40 authentication for Channel Binding Tokens support. If the raw object 

41 is not urllib3 HTTPResponse (default with requests) then no certificate 

42 will be returned. 

43 

44 :param response: The original 401 response from the server 

45 :return: The hash of the DER encoded certificate at the request_url or None 

46 if not an HTTPS url. 

47 """ 

48 raw_response = response.raw 

49 

50 if isinstance(raw_response, HTTPResponse): 

51 try: 

52 socket = raw_response._fp.fp.raw._sock 

53 except AttributeError: 

54 return None 

55 

56 try: 

57 server_certificate = socket.getpeercert(True) 

58 except AttributeError: 

59 logger.debug("unable to get server certificate") 

60 else: 

61 return get_certificate_hash_bytes(server_certificate) 

62 else: 

63 logger.warning( 

64 "Requests is running with a non urllib3 backend," 

65 " cannot retrieve server certificate for CBT" 

66 ) 

67 return None 

68 

69 

70def get_certificate_hash_bytes(certificate_der: bytes) -> Optional[bytes]: 

71 # https://tools.ietf.org/html/rfc5929#section-4.1 

72 cert = x509.load_der_x509_certificate(certificate_der, default_backend()) 

73 

74 try: 

75 hash_algorithm = cert.signature_hash_algorithm 

76 except UnsupportedAlgorithm as ex: 

77 logger.exception("e=") 

78 warnings.warn( 

79 "Failed to get signature algorithm from certificate, " 

80 "unable to pass channel bindings: %s" % str(ex), 

81 UnknownSignatureAlgorithmOID, 

82 ) 

83 return None 

84 

85 # if the cert signature algorithm is either md5 or sha1 then use sha256 

86 # otherwise use the signature algorithm 

87 if hash_algorithm.name in ["md5", "sha1"]: 

88 digest = hashes.Hash(hashes.SHA256(), default_backend()) 

89 else: 

90 digest = hashes.Hash(hash_algorithm, default_backend()) 

91 

92 digest.update(certificate_der) 

93 certificate_hash_bytes = digest.finalize() 

94 logger.debug("peer/server cert hash: %s", binascii.hexlify(certificate_hash_bytes)) 

95 return certificate_hash_bytes 

96 

97 

98def get_auth_type_from_header(header: str) -> Optional[str]: 

99 """ 

100 Given a WWW-Authenticate or Proxy-Authenticate header, returns the 

101 authentication type to use. We prefer NTLM over Negotiate if the server 

102 supports it. 

103 """ 

104 if "ntlm" in header.lower(): 

105 return "NTLM" 

106 elif "negotiate" in header.lower(): 

107 return "Negotiate" 

108 return None 

109 

110 

111def get_ntlm_credentials(username: str, password: str) -> Tuple[str, str, str]: 

112 try: 

113 domain, username = username.split("\\", 1) 

114 except ValueError: 

115 domain = "" 

116 return username, password, domain 

117 

118 

119def get_cbt_data(response): 

120 """ 

121 Create Channel Binding for TLS data 

122 

123 See: 

124 - https://tools.ietf.org/html/rfc5929 

125 - https://github.com/jborean93/ntlm-auth#ntlmv2 

126 - https://github.com/requests/requests-ntlm/pull/116#discussion_r325961121 

127 - https://support.microsoft.com/en-za/help/976918/authentication-failure-from-non-windows-ntlm-or-kerberos-servers # noqa 

128 

129 :param response: HTTP Response object 

130 """ 

131 

132 cert_hash_bytes = get_server_cert(response) 

133 if not cert_hash_bytes: 

134 logger.debug("server cert not found, channel binding tokens (CBT) wont be used") 

135 return None 

136 

137 channel_binding_type = b"tls-server-end-point" # https://tools.ietf.org/html/rfc5929#section-4 

138 data_type = GssChannelBindingsStruct.APPLICATION_DATA 

139 

140 cbt_data = GssChannelBindingsStruct() 

141 cbt_data[data_type] = b":".join([channel_binding_type, cert_hash_bytes]) 

142 logger.debug("cbt data: %s", cbt_data.get_data()) 

143 return cbt_data 

144 

145 

146def is_challenge_message(msg: bytes) -> bool: 

147 try: 

148 message_type = struct.unpack("<I", msg[8:12])[0] 

149 return message_type == ntlm_auth.constants.MessageTypes.NTLM_CHALLENGE 

150 except struct.error: 

151 return False 

152 

153 

154def is_challenge_message_valid(msg: bytes) -> bool: 

155 try: 

156 _ = ChallengeMessage(msg) 

157 return True 

158 except struct.error: 

159 return False 

160 

161 

162def fix_target_info(challenge_msg: bytes) -> bytes: 

163 if not is_challenge_message(challenge_msg): 

164 return challenge_msg 

165 

166 if is_challenge_message_valid(challenge_msg): 

167 return challenge_msg 

168 

169 signature = challenge_msg[:8] 

170 if signature != ntlm_auth.constants.NTLM_SIGNATURE: 

171 logger.warning("invalid signature: %r", signature) 

172 return challenge_msg 

173 

174 negotiate_flags_raw = challenge_msg[20:24] 

175 try: 

176 negotiate_flags = struct.unpack("<I", negotiate_flags_raw)[0] 

177 except struct.error: 

178 logger.warning("Invalid Negotiate Flags: %s", negotiate_flags_raw) 

179 return challenge_msg 

180 

181 if negotiate_flags & ntlm_auth.constants.NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO: 

182 try: 

183 negotiate_flags &= ~ntlm_auth.constants.NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO 

184 return challenge_msg[:20] + struct.pack("<I", negotiate_flags) + challenge_msg[24:] 

185 except struct.error: 

186 return challenge_msg 

187 return challenge_msg 

188 

189 

190def noop(): 

191 pass