Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_ntlm2/core.py: 60%

112 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:20 +0000

1import binascii 

2import logging 

3import struct 

4import sys 

5import warnings 

6 

7import ntlm_auth.constants 

8from cryptography import x509 

9from cryptography.exceptions import UnsupportedAlgorithm 

10from cryptography.hazmat.backends import default_backend 

11from cryptography.hazmat.primitives import hashes 

12from ntlm_auth.gss_channel_bindings import GssChannelBindingsStruct 

13from ntlm_auth.messages import ChallengeMessage 

14from requests.packages.urllib3.response import HTTPResponse 

15 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class NtlmCompatibility(object): 

21 # see Microsoft doc on compatibility levels here: https://bit.ly/2OWZVxp 

22 LM_AND_NTLMv1 = 0 

23 LM_AND_NTLMv1_WITH_ESS = 1 

24 NTLMv1_WITH_ESS = 2 

25 NTLMv2_DEFAULT = 3 

26 NTLMv2_LEVEL4 = 4 

27 NTLMv2_LEVEL5 = 5 

28 

29 

30class UnknownSignatureAlgorithmOID(Warning): 

31 pass 

32 

33 

34def get_server_cert(response): 

35 """ 

36 Get the certificate at the request_url and return it as a hash. Will 

37 get the raw socket from the original response from the server. This 

38 socket is then checked if it is an SSL socket and then used to get the 

39 hash of the certificate. The certificate hash is then used with NTLMv2 

40 authentication for Channel Binding Tokens support. If the raw object 

41 is not a urllib3 HTTPReponse (default with requests) then no 

42 certificate will be returned. 

43 

44 :param response: The original 401 response from the server 

45 :return: The hash of the DER encoded certificate at the request_url or None if not a HTTPS url 

46 """ 

47 raw_response = response.raw 

48 

49 if isinstance(raw_response, HTTPResponse): 

50 try: 

51 if sys.version_info > (3, 0): 

52 socket = raw_response._fp.fp.raw._sock 

53 else: 

54 socket = raw_response._fp.fp._sock 

55 except AttributeError: 

56 return None 

57 

58 try: 

59 server_certificate = socket.getpeercert(True) 

60 except AttributeError: 

61 logger.debug("unable to get server certificate") 

62 else: 

63 return get_certificate_hash_bytes(server_certificate) 

64 else: 

65 logger.warning( 

66 "Requests is running with a non urllib3 backend," 

67 " cannot retrieve server certificate for CBT" 

68 ) 

69 

70 

71def get_certificate_hash_bytes(certificate_der): 

72 # https://tools.ietf.org/html/rfc5929#section-4.1 

73 cert = x509.load_der_x509_certificate(certificate_der, default_backend()) 

74 

75 try: 

76 hash_algorithm = cert.signature_hash_algorithm 

77 except UnsupportedAlgorithm as ex: 

78 logger.exception("e=") 

79 warnings.warn( 

80 "Failed to get signature algorithm from certificate, " 

81 "unable to pass channel bindings: %s" % str(ex), 

82 UnknownSignatureAlgorithmOID, 

83 ) 

84 return None 

85 

86 # if the cert signature algorithm is either md5 or sha1 then use sha256 

87 # otherwise use the signature algorithm 

88 if hash_algorithm.name in ["md5", "sha1"]: 

89 digest = hashes.Hash(hashes.SHA256(), default_backend()) 

90 else: 

91 digest = hashes.Hash(hash_algorithm, default_backend()) 

92 

93 digest.update(certificate_der) 

94 certificate_hash_bytes = digest.finalize() 

95 logger.debug("peer/server cert hash: %s", binascii.hexlify(certificate_hash_bytes)) 

96 return certificate_hash_bytes 

97 

98 

99def get_auth_type_from_header(header): 

100 """ 

101 Given a WWW-Authenticate or Proxy-Authenticate header, returns the 

102 authentication type to use. We prefer NTLM over Negotiate if the server 

103 supports it. 

104 """ 

105 if "ntlm" in header.lower(): 

106 return "NTLM" 

107 elif "negotiate" in header.lower(): 

108 return "Negotiate" 

109 return None 

110 

111 

112def get_ntlm_credentials(username, password): 

113 try: 

114 domain, username = username.split("\\", 1) 

115 except ValueError: 

116 domain = "" 

117 return username, password, domain 

118 

119 

120def get_cbt_data(response): 

121 """ 

122 Create Channel Binding for TLS data 

123 

124 See: 

125 - https://tools.ietf.org/html/rfc5929 

126 - https://github.com/jborean93/ntlm-auth#ntlmv2 

127 - https://github.com/requests/requests-ntlm/pull/116#discussion_r325961121 

128 - https://support.microsoft.com/en-za/help/976918/authentication-failure-from-non-windows-ntlm-or-kerberos-servers # noqa 

129 

130 :param response: HTTP Response object 

131 """ 

132 

133 cert_hash_bytes = get_server_cert(response) 

134 if not cert_hash_bytes: 

135 logger.debug("server cert not found, channel binding tokens (CBT) wont be used") 

136 return None 

137 

138 channel_binding_type = b"tls-server-end-point" # https://tools.ietf.org/html/rfc5929#section-4 

139 data_type = GssChannelBindingsStruct.APPLICATION_DATA 

140 

141 cbt_data = GssChannelBindingsStruct() 

142 cbt_data[data_type] = b":".join([channel_binding_type, cert_hash_bytes]) 

143 logger.debug("cbt data: %s", cbt_data.get_data()) 

144 return cbt_data 

145 

146 

147def is_challenge_message(msg): 

148 try: 

149 message_type = struct.unpack("<I", msg[8:12])[0] 

150 return message_type == ntlm_auth.constants.MessageTypes.NTLM_CHALLENGE 

151 except struct.error: 

152 return False 

153 

154 

155def is_challenge_message_valid(msg): 

156 try: 

157 _ = ChallengeMessage(msg) 

158 return True 

159 except struct.error: 

160 return False 

161 

162 

163def fix_target_info(challenge_msg): 

164 if not is_challenge_message(challenge_msg): 

165 return challenge_msg 

166 

167 if is_challenge_message_valid(challenge_msg): 

168 return challenge_msg 

169 

170 signature = challenge_msg[:8] 

171 if signature != ntlm_auth.constants.NTLM_SIGNATURE: 

172 logger.warning("invalid signature: %r", signature) 

173 return challenge_msg 

174 

175 negotiate_flags_raw = challenge_msg[20:24] 

176 try: 

177 negotiate_flags = struct.unpack("<I", negotiate_flags_raw)[0] 

178 except struct.error: 

179 logger.warning("Invalid Negotiate Flags: %s", negotiate_flags_raw) 

180 return challenge_msg 

181 

182 if negotiate_flags & ntlm_auth.constants.NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO: 

183 try: 

184 negotiate_flags &= ~ntlm_auth.constants.NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO 

185 return challenge_msg[:20] + struct.pack("<I", negotiate_flags) + challenge_msg[24:] 

186 except struct.error: 

187 return challenge_msg 

188 return challenge_msg 

189 

190 

191def noop(): 

192 pass