Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests_ntlm2/adapters.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

71 statements  

1import logging 

2from urllib.parse import urlparse 

3 

4from requests.adapters import HTTPAdapter 

5from requests.packages.urllib3.connection import HTTPConnection, HTTPSConnection 

6from requests.packages.urllib3.poolmanager import pool_classes_by_scheme 

7 

8from .connection import DEFAULT_HTTP_VERSION 

9from .connection import HTTPConnection as _HTTPConnection 

10from .connection import HTTPSConnection as _HTTPSConnection 

11from .core import NtlmCompatibility 

12 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class HttpProxyAdapter(HTTPAdapter): 

18 def __init__(self, user_agent=None, *args, **kwargs): 

19 self._user_agent = user_agent 

20 super().__init__(*args, **kwargs) 

21 

22 def _add_host_header(self, request): 

23 if request.headers.get("Host"): 

24 if self._is_valid_host_header(request): 

25 return 

26 else: 

27 self._remove_host_header(request) 

28 

29 parse_result = urlparse(request.url) 

30 if parse_result.scheme == "http": 

31 if parse_result.port == 80: 

32 request.headers["Host"] = parse_result.hostname 

33 else: 

34 request.headers["Host"] = parse_result.netloc 

35 

36 @staticmethod 

37 def _is_valid_host_header(request): 

38 host = request.headers.get("Host") 

39 if not host: 

40 return False 

41 parse_result = urlparse(request.url) 

42 if parse_result.scheme == "https": 

43 if host == parse_result.netloc and parse_result.port is not None: 

44 return True 

45 return False 

46 

47 @staticmethod 

48 def _remove_host_header(request): 

49 try: 

50 del request.headers["Host"] 

51 except KeyError: 

52 pass 

53 

54 def add_headers(self, request, **kwargs): 

55 super().add_headers(request, **kwargs) 

56 self._add_host_header(request) 

57 

58 def proxy_headers(self, proxy): 

59 headers = super().proxy_headers(proxy) 

60 if self._user_agent and all(k.lower() != "user-agent" for k in headers): 

61 headers["User-Agent"] = self._user_agent 

62 logger.debug("proxy headers: %s", headers) 

63 return headers 

64 

65 

66class HttpNtlmAdapter(HttpProxyAdapter): 

67 def __init__( 

68 self, 

69 ntlm_username, 

70 ntlm_password, 

71 ntlm_compatibility=NtlmCompatibility.NTLMv2_DEFAULT, 

72 ntlm_strict_mode=False, 

73 proxy_tunnelling_http_version=DEFAULT_HTTP_VERSION, 

74 *args, 

75 **kwargs 

76 ): 

77 """ 

78 Thin wrapper around requests.adapters.HTTPAdapter 

79 """ 

80 self._setup( 

81 ntlm_username, 

82 ntlm_password, 

83 ntlm_compatibility, 

84 ntlm_strict_mode, 

85 proxy_tunnelling_http_version 

86 ) 

87 super().__init__(*args, **kwargs) 

88 

89 def close(self): 

90 self._teardown() 

91 super().close() 

92 

93 @staticmethod 

94 def _setup(username, password, ntlm_compatibility, ntlm_strict_mode, http_version): 

95 pool_classes_by_scheme["http"].ConnectionCls = _HTTPConnection 

96 pool_classes_by_scheme["https"].ConnectionCls = _HTTPSConnection 

97 _HTTPSConnection.set_ntlm_auth_credentials(username, password) 

98 

99 if http_version: 

100 _HTTPSConnection.set_http_version(http_version) 

101 

102 _HTTPSConnection.ntlm_compatibility = ntlm_compatibility 

103 _HTTPConnection.ntlm_strict_mode = ntlm_strict_mode 

104 

105 @staticmethod 

106 def _teardown(): 

107 pool_classes_by_scheme["http"].ConnectionCls = HTTPConnection 

108 pool_classes_by_scheme["https"].ConnectionCls = HTTPSConnection 

109 _HTTPSConnection.clear_ntlm_auth_credentials() 

110 _HTTPSConnection.clear_http_version()