Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/authority.py: 19%

108 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:05 +0000

1#------------------------------------------------------------------------------ 

2# 

3# Copyright (c) Microsoft Corporation. 

4# All rights reserved. 

5# 

6# This code is licensed under the MIT License. 

7# 

8# Permission is hereby granted, free of charge, to any person obtaining a copy 

9# of this software and associated documentation files(the "Software"), to deal 

10# in the Software without restriction, including without limitation the rights 

11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell 

12# copies of the Software, and to permit persons to whom the Software is 

13# furnished to do so, subject to the following conditions : 

14# 

15# The above copyright notice and this permission notice shall be included in 

16# all copies or substantial portions of the Software. 

17# 

18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE 

21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 

24# THE SOFTWARE. 

25# 

26#------------------------------------------------------------------------------ 

27 

28try: 

29 from urllib.parse import quote, urlparse 

30except ImportError: 

31 from urllib import quote # pylint: disable=no-name-in-module 

32 from urlparse import urlparse # pylint: disable=import-error,ungrouped-imports 

33 

34import requests 

35 

36from .constants import AADConstants 

37from .adal_error import AdalError 

38from . import log 

39from . import util 

40 

41class Authority(object): 

42 

43 def __init__(self, authority_url, validate_authority=True): 

44 

45 self._log = None 

46 self._call_context = None 

47 self._url = urlparse(authority_url) 

48 

49 self._validate_authority_url() 

50 self._validated = not validate_authority 

51 

52 self._host = None 

53 self._tenant = None 

54 self._parse_authority() 

55 

56 self._authorization_endpoint = None 

57 self.token_endpoint = None 

58 self.device_code_endpoint = None 

59 self.is_adfs_authority = self._tenant.lower() == 'adfs' 

60 

61 @property 

62 def url(self): 

63 return self._url.geturl() 

64 

65 def _whitelisted(self): # testing if self._url.hostname is a dsts whitelisted domain 

66 # Add dSTS domains to whitelist based on based on domain 

67 # https://microsoft.sharepoint.com/teams/AzureSecurityCompliance/Security/SitePages/dSTS%20Fundamentals.aspx 

68 return ".dsts." in self._url.hostname 

69 

70 def _validate_authority_url(self): 

71 

72 if self._url.scheme != 'https': 

73 raise ValueError("The authority url must be an https endpoint.") 

74 

75 if self._url.query: 

76 raise ValueError("The authority url must not have a query string.") 

77 

78 path_parts = [part for part in self._url.path.split('/') if part] 

79 if (len(path_parts) > 1) and (not self._whitelisted()): #if dsts host, path_parts will be 2 

80 raise ValueError( 

81 "The path of authority_url (also known as tenant) is invalid, " 

82 "it should either be a domain name (e.g. mycompany.onmicrosoft.com) " 

83 "or a tenant GUID id. " 

84 'Your tenant input was "%s" and your entire authority_url was "%s".' 

85 % ('/'.join(path_parts), self._url.geturl())) 

86 elif len(path_parts) == 1: 

87 self._url = urlparse(self._url.geturl().rstrip('/')) 

88 

89 def _parse_authority(self): 

90 self._host = self._url.hostname 

91 

92 path_parts = self._url.path.split('/') 

93 try: 

94 self._tenant = path_parts[1] 

95 except IndexError: 

96 raise ValueError("Could not determine tenant.") 

97 

98 def _perform_static_instance_discovery(self): 

99 

100 self._log.debug("Performing static instance discovery") 

101 

102 if self._whitelisted(): # testing if self._url.hostname is a dsts whitelisted domain 

103 self._log.debug("Authority validated via static instance discovery") 

104 return True 

105 try: 

106 AADConstants.WELL_KNOWN_AUTHORITY_HOSTS.index(self._url.hostname) 

107 except ValueError: 

108 return False 

109 

110 self._log.debug("Authority validated via static instance discovery") 

111 return True 

112 

113 def _create_authority_url(self): 

114 return "https://{}/{}{}".format(self._url.hostname, 

115 self._tenant, 

116 AADConstants.AUTHORIZE_ENDPOINT_PATH) 

117 

118 def _create_instance_discovery_endpoint_from_template(self, authority_host): 

119 

120 discovery_endpoint = AADConstants.INSTANCE_DISCOVERY_ENDPOINT_TEMPLATE 

121 discovery_endpoint = discovery_endpoint.replace('{authorize_host}', authority_host) 

122 discovery_endpoint = discovery_endpoint.replace('{authorize_endpoint}', 

123 quote(self._create_authority_url(), 

124 safe='~()*!.\'')) 

125 return urlparse(discovery_endpoint) 

126 

127 def _perform_dynamic_instance_discovery(self): 

128 discovery_endpoint = self._create_instance_discovery_endpoint_from_template( 

129 AADConstants.WORLD_WIDE_AUTHORITY) 

130 get_options = util.create_request_options(self) 

131 operation = "Instance Discovery" 

132 self._log.debug("Attempting instance discover at: %(discovery_endpoint)s", 

133 {"discovery_endpoint": discovery_endpoint.geturl()}) 

134 

135 try: 

136 resp = requests.get(discovery_endpoint.geturl(), headers=get_options['headers'], 

137 verify=self._call_context.get('verify_ssl', None), 

138 proxies=self._call_context.get('proxies', None)) 

139 util.log_return_correlation_id(self._log, operation, resp) 

140 except Exception: 

141 self._log.exception("%(operation)s request failed", 

142 {"operation": operation}) 

143 raise 

144 

145 if resp.status_code == 429: 

146 resp.raise_for_status() # Will raise requests.exceptions.HTTPError 

147 if not util.is_http_success(resp.status_code): 

148 return_error_string = u"{} request returned http error: {}".format(operation, 

149 resp.status_code) 

150 error_response = "" 

151 if resp.text: 

152 return_error_string = u"{} and server response: {}".format(return_error_string, 

153 resp.text) 

154 try: 

155 error_response = resp.json() 

156 except ValueError: 

157 pass 

158 

159 raise AdalError(return_error_string, error_response) 

160 

161 else: 

162 discovery_resp = resp.json() 

163 if discovery_resp.get('tenant_discovery_endpoint'): 

164 return discovery_resp['tenant_discovery_endpoint'] 

165 else: 

166 raise AdalError('Failed to parse instance discovery response') 

167 

168 def _validate_via_instance_discovery(self): 

169 valid = self._perform_static_instance_discovery() 

170 if not valid: 

171 self._perform_dynamic_instance_discovery() 

172 

173 def _get_oauth_endpoints(self): 

174 

175 if (not self.token_endpoint) or (not self.device_code_endpoint): 

176 self.token_endpoint = self._url.geturl() + AADConstants.TOKEN_ENDPOINT_PATH 

177 self.device_code_endpoint = self._url.geturl() + AADConstants.DEVICE_ENDPOINT_PATH 

178 

179 def validate(self, call_context): 

180 

181 self._log = log.Logger('Authority', call_context['log_context']) 

182 self._call_context = call_context 

183 

184 if not self._validated: 

185 self._log.debug("Performing instance discovery: %(authority)s", 

186 {"authority": self._url.geturl()}) 

187 self._validate_via_instance_discovery() 

188 self._validated = True 

189 else: 

190 self._log.debug( 

191 "Instance discovery/validation has either already been completed or is turned off: %(authority)s", 

192 {"authority": self._url.geturl()}) 

193 

194 self._get_oauth_endpoints()