Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_ntlm2/adapters.py: 37%
71 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
1import logging
3from requests.adapters import HTTPAdapter
4from requests.packages.urllib3.connection import HTTPConnection, HTTPSConnection
5from requests.packages.urllib3.poolmanager import pool_classes_by_scheme
6from six.moves.urllib.parse import urlparse
8from .connection import DEFAULT_HTTP_VERSION
9from .connection import HTTPConnection as _HTTPConnection
10from .connection import HTTPSConnection as _HTTPSConnection
11from .core import NtlmCompatibility
14logger = logging.getLogger(__name__)
17class HttpProxyAdapter(HTTPAdapter):
18 def __init__(self, user_agent=None, *args, **kwargs):
19 self._user_agent = user_agent
20 super(HttpProxyAdapter, self).__init__(*args, **kwargs)
22 def _add_host_header(self, request):
23 if request.headers.get("Host"):
24 if self._is_valid_host_header(request):
25 return
26 else:
27 self._remove_host_header(request)
29 parse_result = urlparse(request.url)
30 if parse_result.scheme == "http":
31 if parse_result.port == 80:
32 request.headers["Host"] = parse_result.hostname
33 else:
34 request.headers["Host"] = parse_result.netloc
36 @staticmethod
37 def _is_valid_host_header(request):
38 host = request.headers.get("Host")
39 if not host:
40 return False
41 parse_result = urlparse(request.url)
42 if parse_result.scheme == "https":
43 if host == parse_result.netloc and parse_result.port is not None:
44 return True
45 return False
47 @staticmethod
48 def _remove_host_header(request):
49 try:
50 del request.headers["Host"]
51 except KeyError:
52 pass
54 def add_headers(self, request, **kwargs):
55 super(HttpProxyAdapter, self).add_headers(request, **kwargs)
56 self._add_host_header(request)
58 def proxy_headers(self, proxy):
59 headers = super(HttpProxyAdapter, self).proxy_headers(proxy)
60 if self._user_agent and all(k.lower() != "user-agent" for k in headers):
61 headers["User-Agent"] = self._user_agent
62 logger.debug("proxy headers: %s", headers)
63 return headers
66class HttpNtlmAdapter(HttpProxyAdapter):
67 def __init__(
68 self,
69 ntlm_username,
70 ntlm_password,
71 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT,
72 ntlm_strict_mode=False,
73 proxy_tunnelling_http_version=DEFAULT_HTTP_VERSION,
74 *args,
75 **kwargs
76 ):
77 """
78 Thin wrapper around requests.adapters.HTTPAdapter
79 """
80 self._setup(
81 ntlm_username,
82 ntlm_password,
83 ntlm_compatibility,
84 ntlm_strict_mode,
85 proxy_tunnelling_http_version
86 )
87 super(HttpNtlmAdapter, self).__init__(*args, **kwargs)
89 def close(self):
90 self._teardown()
91 super(HttpNtlmAdapter, self).close()
93 @staticmethod
94 def _setup(username, password, ntlm_compatibility, ntlm_strict_mode, http_version):
95 pool_classes_by_scheme["http"].ConnectionCls = _HTTPConnection
96 pool_classes_by_scheme["https"].ConnectionCls = _HTTPSConnection
97 _HTTPSConnection.set_ntlm_auth_credentials(username, password)
99 if http_version:
100 _HTTPSConnection.set_http_version(http_version)
102 _HTTPSConnection.ntlm_compatibility = ntlm_compatibility
103 _HTTPConnection.ntlm_strict_mode = ntlm_strict_mode
105 @staticmethod
106 def _teardown():
107 pool_classes_by_scheme["http"].ConnectionCls = HTTPConnection
108 pool_classes_by_scheme["https"].ConnectionCls = HTTPSConnection
109 _HTTPSConnection.clear_ntlm_auth_credentials()
110 _HTTPSConnection.clear_http_version()