Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests_ntlm2/connection.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

186 statements  

1import logging 

2import re 

3import select 

4import socket 

5from http.client import PROXY_AUTHENTICATION_REQUIRED, LineTooLong 

6 

7from requests.packages.urllib3.connection import DummyConnection 

8from requests.packages.urllib3.connection import HTTPConnection as _HTTPConnection 

9from requests.packages.urllib3.connection import HTTPSConnection as _HTTPSConnection 

10from requests.packages.urllib3.connection import VerifiedHTTPSConnection as _VerifiedHTTPSConnection 

11 

12from .core import NtlmCompatibility, get_ntlm_credentials, noop 

13from .dance import HttpNtlmContext 

14 

15 

16IO_WAIT_TIMEOUT = 0.1 

17 

18 

19logger = logging.getLogger(__name__) 

20 

21# maximal line length when calling readline(). 

22_MAXLINE = 65536 

23 

24_ASSUMED_HTTP09_STATUS_LINES = ( 

25 ("HTTP/0.9", 200, ""), 

26 ("HTTP/0.9", 200, "OK"), 

27) 

28 

29_TRACKED_HEADERS = ( 

30 "proxy-authenticate", 

31 "proxy-support", 

32 "cache-control", 

33 "date", 

34 "server", 

35 "proxy-connection", 

36 "connection", 

37 "content-length", 

38 "content-type", 

39) 

40 

41HTTP_VERSION_11 = "HTTP/1.1" 

42HTTP_VERSION_10 = "HTTP/1.0" 

43DEFAULT_HTTP_VERSION = HTTP_VERSION_10 

44 

45 

46class HTTPConnection(_HTTPConnection): 

47 pass 

48 

49 

50class HTTPSConnection(_HTTPSConnection): 

51 pass 

52 

53 

54class VerifiedHTTPSConnection(_VerifiedHTTPSConnection): 

55 ntlm_compatibility = NtlmCompatibility.NTLMv2_DEFAULT 

56 ntlm_strict_mode = False 

57 

58 def __init__(self, *args, **kwargs): 

59 super().__init__(*args, **kwargs) 

60 self._continue_reading_headers = True 

61 if self.ntlm_compatibility is None: 

62 self.ntlm_compatibility = NtlmCompatibility.NTLMv2_DEFAULT 

63 

64 @classmethod 

65 def set_ntlm_auth_credentials(cls, username, password): 

66 cls._ntlm_credentials = get_ntlm_credentials(username, password) 

67 

68 @classmethod 

69 def set_http_version(cls, http_version): 

70 if http_version in (HTTP_VERSION_10, HTTP_VERSION_11): 

71 cls._http_version = http_version 

72 else: 

73 logger.debug( 

74 "unsupported http-version %r, setting the default %r", 

75 http_version, 

76 DEFAULT_HTTP_VERSION 

77 ) 

78 cls._http_version = DEFAULT_HTTP_VERSION 

79 

80 @classmethod 

81 def clear_http_version(cls): 

82 cls._http_version = None 

83 del cls._http_version 

84 

85 @classmethod 

86 def clear_ntlm_auth_credentials(cls): 

87 cls._ntlm_credentials = None 

88 del cls._ntlm_credentials 

89 

90 @staticmethod 

91 def _is_line_blank(line): 

92 # for sites which EOF without sending a trailer 

93 if not line or line in (b"\r\n", b"\n", b"") or not line.strip(): 

94 return True 

95 return False 

96 

97 @staticmethod 

98 def _read_response_line_if_ready(response): 

99 (ready, _, _) = select.select([response.fp], (), (), IO_WAIT_TIMEOUT) 

100 if ready: 

101 return response.fp.readline() 

102 

103 def _flush_response_buffer(self, response): 

104 line_count = 0 

105 while True: 

106 line_count += 1 

107 logger.debug("* reading line from response buffer") 

108 line = self._read_response_line_if_ready(response) 

109 logger.debug("< line %s: %r", line_count, line) 

110 if line is None or line in ("", b""): 

111 logger.debug("* finished draining the socket") 

112 break 

113 

114 def handle_http09_response(self, response): 

115 status_line_regex = re.compile( 

116 br"(?P<version>HTTP/\d\.\d)\s+(?P<status>\d+)\s+(?P<message>.+)", 

117 re.DOTALL 

118 ) 

119 

120 while True: 

121 line = response.fp.readline() 

122 if not line: 

123 self._continue_reading_headers = False 

124 break 

125 match = status_line_regex.search(line) 

126 if match: 

127 status_line = match.groupdict() 

128 logger.debug("< %r", "{version} {status} {message}".format(**status_line)) 

129 return status_line["version"], int(status_line["status"]), status_line["message"] 

130 return None 

131 

132 def _get_response(self): 

133 response = self.response_class(self.sock, method=self._method) 

134 version, code, message = response._read_status() 

135 

136 if (version, code, message) in _ASSUMED_HTTP09_STATUS_LINES: 

137 logger.warning("server response used outdated HTTP version: HTTP/0.9") 

138 status_line = self.handle_http09_response(response) 

139 if status_line: 

140 old_status_line = version, code, message 

141 version, code, message = status_line 

142 logger.info("changed status line from %s, to %s", old_status_line, status_line) 

143 else: 

144 logger.warning("could not handle HTTP/0.9 server response") 

145 logger.debug("HTTP/0.9: version=%s", version) 

146 logger.debug("HTTP/0.9: code=%s", code) 

147 logger.debug("HTTP/0.9: message=%s", message) 

148 else: 

149 logger.debug("< %r", f"{version} {code} {message}") 

150 return version, code, message, response 

151 

152 def _get_http_version(self): 

153 if getattr(self, "_http_version", None): 

154 return self._http_version 

155 return DEFAULT_HTTP_VERSION 

156 

157 def _get_header_bytes(self, proxy_auth_header=None): 

158 host, port = self._get_hostport(self._tunnel_host, self._tunnel_port) 

159 http_connect_string = "CONNECT {host}:{port} {http_version}\r\n".format( 

160 host=host, 

161 port=port, 

162 http_version=self._get_http_version() 

163 ) 

164 logger.debug("> %r", http_connect_string) 

165 header_bytes = http_connect_string 

166 if proxy_auth_header: 

167 self._tunnel_headers["Proxy-Authorization"] = proxy_auth_header 

168 self._tunnel_headers["Proxy-Connection"] = "Keep-Alive" 

169 self._tunnel_headers["Host"] = f"{host}:{port}" 

170 

171 for header in sorted(self._tunnel_headers): 

172 value = self._tunnel_headers[header] 

173 header_byte = "{}: {}\r\n".format(header, value) 

174 logger.debug("> %r", header_byte) 

175 header_bytes += header_byte 

176 header_bytes += "\r\n" 

177 return header_bytes.encode("latin1") 

178 

179 def _tunnel(self): 

180 username, password, domain = self._ntlm_credentials 

181 logger.debug("* attempting to open tunnel using HTTP CONNECT") 

182 logger.debug("* username=%r, domain=%r", username, domain) 

183 

184 try: 

185 workstation = socket.gethostname().upper() 

186 except (AttributeError, TypeError, ValueError): 

187 workstation = None 

188 

189 logger.debug("* workstation=%r", workstation) 

190 

191 ntlm_context = HttpNtlmContext( 

192 username, 

193 password, 

194 domain=domain, 

195 workstation=workstation, 

196 auth_type="NTLM", 

197 ntlm_compatibility=self.ntlm_compatibility, 

198 ntlm_strict_mode=self.ntlm_strict_mode 

199 ) 

200 

201 negotiate_header = ntlm_context.get_negotiate_header() 

202 header_bytes = self._get_header_bytes(proxy_auth_header=negotiate_header) 

203 logger.debug("* sending the NEGOTIATE message") 

204 self.send(header_bytes) 

205 version, code, message, response = self._get_response() 

206 

207 if code == PROXY_AUTHENTICATION_REQUIRED: 

208 authenticate_hdr = None 

209 match_string = "Proxy-Authenticate: NTLM " 

210 last_line_is_blank = False 

211 line_count = 1 

212 while True: 

213 line_count += 1 

214 logger.debug("* reading line number %s", line_count) 

215 if last_line_is_blank: 

216 line = self._read_response_line_if_ready(response) 

217 else: 

218 line = response.fp.readline() 

219 logger.debug("* read line: %r", line) 

220 this_line_is_blank = self._is_line_blank(line) 

221 if last_line_is_blank and this_line_is_blank: 

222 self._flush_response_buffer(response) 

223 break 

224 

225 if this_line_is_blank: 

226 last_line_is_blank = True 

227 continue 

228 else: 

229 last_line_is_blank = False 

230 

231 if line is not None and line.decode("utf-8").startswith(match_string): 

232 logger.debug("< %r", line) 

233 line = line.decode("utf-8") 

234 ntlm_context.set_challenge_from_header(line) 

235 authenticate_hdr = ntlm_context.get_authenticate_header() 

236 self._flush_response_buffer(response) 

237 break 

238 

239 if line is not None and len(line) > _MAXLINE: 

240 raise LineTooLong("header line") 

241 

242 logger.debug("< %r", line) 

243 

244 if not authenticate_hdr: 

245 logger.warning("* the NTLM challenge header was not found!") 

246 header_bytes = self._get_header_bytes(proxy_auth_header=authenticate_hdr) 

247 logger.debug("* sending the NTLM authenticate message") 

248 self.send(header_bytes) 

249 version, code, message, response = self._get_response() 

250 

251 if code != 200: 

252 logger.error("* HTTP %s: failed to establish NTLM proxy tunnel", code) 

253 self.close() 

254 raise OSError( 

255 "Tunnel connection failed: %d %s" % (code, message.strip()) 

256 ) 

257 while self._continue_reading_headers: 

258 line = response.fp.readline() 

259 if line is not None and len(line) > _MAXLINE: 

260 raise LineTooLong("header line") 

261 if line is None or self._is_line_blank(line): 

262 break 

263 

264 logger.debug("* successfully established proxy tunnel!") 

265 

266 

267try: 

268 noop() # for testing purposes 

269 import ssl # noqa 

270 

271 # Make a copy for testing. 

272 UnverifiedHTTPSConnection = HTTPSConnection 

273 HTTPSConnection = VerifiedHTTPSConnection 

274except ImportError: 

275 HTTPSConnection = DummyConnection