1import logging
2from urllib.parse import urlparse
3
4from requests.adapters import HTTPAdapter
5from requests.packages.urllib3.connection import HTTPConnection, HTTPSConnection
6from requests.packages.urllib3.poolmanager import pool_classes_by_scheme
7
8from .connection import DEFAULT_HTTP_VERSION
9from .connection import HTTPConnection as _HTTPConnection
10from .connection import HTTPSConnection as _HTTPSConnection
11from .core import NtlmCompatibility
12
13
14logger = logging.getLogger(__name__)
15
16
17class HttpProxyAdapter(HTTPAdapter):
18 def __init__(self, user_agent=None, *args, **kwargs):
19 self._user_agent = user_agent
20 super().__init__(*args, **kwargs)
21
22 def _add_host_header(self, request):
23 if request.headers.get("Host"):
24 if self._is_valid_host_header(request):
25 return
26 else:
27 self._remove_host_header(request)
28
29 parse_result = urlparse(request.url)
30 if parse_result.scheme == "http":
31 if parse_result.port == 80:
32 request.headers["Host"] = parse_result.hostname
33 else:
34 request.headers["Host"] = parse_result.netloc
35
36 @staticmethod
37 def _is_valid_host_header(request):
38 host = request.headers.get("Host")
39 if not host:
40 return False
41 parse_result = urlparse(request.url)
42 if parse_result.scheme == "https":
43 if host == parse_result.netloc and parse_result.port is not None:
44 return True
45 return False
46
47 @staticmethod
48 def _remove_host_header(request):
49 try:
50 del request.headers["Host"]
51 except KeyError:
52 pass
53
54 def add_headers(self, request, **kwargs):
55 super().add_headers(request, **kwargs)
56 self._add_host_header(request)
57
58 def proxy_headers(self, proxy):
59 headers = super().proxy_headers(proxy)
60 if self._user_agent and all(k.lower() != "user-agent" for k in headers):
61 headers["User-Agent"] = self._user_agent
62 logger.debug("proxy headers: %s", headers)
63 return headers
64
65
66class HttpNtlmAdapter(HttpProxyAdapter):
67 def __init__(
68 self,
69 ntlm_username,
70 ntlm_password,
71 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT,
72 ntlm_strict_mode=False,
73 proxy_tunnelling_http_version=DEFAULT_HTTP_VERSION,
74 *args,
75 **kwargs
76 ):
77 """
78 Thin wrapper around requests.adapters.HTTPAdapter
79 """
80 self._setup(
81 ntlm_username,
82 ntlm_password,
83 ntlm_compatibility,
84 ntlm_strict_mode,
85 proxy_tunnelling_http_version
86 )
87 super().__init__(*args, **kwargs)
88
89 def close(self):
90 self._teardown()
91 super().close()
92
93 @staticmethod
94 def _setup(username, password, ntlm_compatibility, ntlm_strict_mode, http_version):
95 pool_classes_by_scheme["http"].ConnectionCls = _HTTPConnection
96 pool_classes_by_scheme["https"].ConnectionCls = _HTTPSConnection
97 _HTTPSConnection.set_ntlm_auth_credentials(username, password)
98
99 if http_version:
100 _HTTPSConnection.set_http_version(http_version)
101
102 _HTTPSConnection.ntlm_compatibility = ntlm_compatibility
103 _HTTPConnection.ntlm_strict_mode = ntlm_strict_mode
104
105 @staticmethod
106 def _teardown():
107 pool_classes_by_scheme["http"].ConnectionCls = HTTPConnection
108 pool_classes_by_scheme["https"].ConnectionCls = HTTPSConnection
109 _HTTPSConnection.clear_ntlm_auth_credentials()
110 _HTTPSConnection.clear_http_version()