Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/authority.py: 19%
108 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
1#------------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation.
4# All rights reserved.
5#
6# This code is licensed under the MIT License.
7#
8# Permission is hereby granted, free of charge, to any person obtaining a copy
9# of this software and associated documentation files(the "Software"), to deal
10# in the Software without restriction, including without limitation the rights
11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
12# copies of the Software, and to permit persons to whom the Software is
13# furnished to do so, subject to the following conditions :
14#
15# The above copyright notice and this permission notice shall be included in
16# all copies or substantial portions of the Software.
17#
18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24# THE SOFTWARE.
25#
26#------------------------------------------------------------------------------
28try:
29 from urllib.parse import quote, urlparse
30except ImportError:
31 from urllib import quote # pylint: disable=no-name-in-module
32 from urlparse import urlparse # pylint: disable=import-error,ungrouped-imports
34import requests
36from .constants import AADConstants
37from .adal_error import AdalError
38from . import log
39from . import util
41class Authority(object):
43 def __init__(self, authority_url, validate_authority=True):
45 self._log = None
46 self._call_context = None
47 self._url = urlparse(authority_url)
49 self._validate_authority_url()
50 self._validated = not validate_authority
52 self._host = None
53 self._tenant = None
54 self._parse_authority()
56 self._authorization_endpoint = None
57 self.token_endpoint = None
58 self.device_code_endpoint = None
59 self.is_adfs_authority = self._tenant.lower() == 'adfs'
61 @property
62 def url(self):
63 return self._url.geturl()
65 def _whitelisted(self): # testing if self._url.hostname is a dsts whitelisted domain
66 # Add dSTS domains to whitelist based on based on domain
67 # https://microsoft.sharepoint.com/teams/AzureSecurityCompliance/Security/SitePages/dSTS%20Fundamentals.aspx
68 return ".dsts." in self._url.hostname
70 def _validate_authority_url(self):
72 if self._url.scheme != 'https':
73 raise ValueError("The authority url must be an https endpoint.")
75 if self._url.query:
76 raise ValueError("The authority url must not have a query string.")
78 path_parts = [part for part in self._url.path.split('/') if part]
79 if (len(path_parts) > 1) and (not self._whitelisted()): #if dsts host, path_parts will be 2
80 raise ValueError(
81 "The path of authority_url (also known as tenant) is invalid, "
82 "it should either be a domain name (e.g. mycompany.onmicrosoft.com) "
83 "or a tenant GUID id. "
84 'Your tenant input was "%s" and your entire authority_url was "%s".'
85 % ('/'.join(path_parts), self._url.geturl()))
86 elif len(path_parts) == 1:
87 self._url = urlparse(self._url.geturl().rstrip('/'))
89 def _parse_authority(self):
90 self._host = self._url.hostname
92 path_parts = self._url.path.split('/')
93 try:
94 self._tenant = path_parts[1]
95 except IndexError:
96 raise ValueError("Could not determine tenant.")
98 def _perform_static_instance_discovery(self):
100 self._log.debug("Performing static instance discovery")
102 if self._whitelisted(): # testing if self._url.hostname is a dsts whitelisted domain
103 self._log.debug("Authority validated via static instance discovery")
104 return True
105 try:
106 AADConstants.WELL_KNOWN_AUTHORITY_HOSTS.index(self._url.hostname)
107 except ValueError:
108 return False
110 self._log.debug("Authority validated via static instance discovery")
111 return True
113 def _create_authority_url(self):
114 return "https://{}/{}{}".format(self._url.hostname,
115 self._tenant,
116 AADConstants.AUTHORIZE_ENDPOINT_PATH)
118 def _create_instance_discovery_endpoint_from_template(self, authority_host):
120 discovery_endpoint = AADConstants.INSTANCE_DISCOVERY_ENDPOINT_TEMPLATE
121 discovery_endpoint = discovery_endpoint.replace('{authorize_host}', authority_host)
122 discovery_endpoint = discovery_endpoint.replace('{authorize_endpoint}',
123 quote(self._create_authority_url(),
124 safe='~()*!.\''))
125 return urlparse(discovery_endpoint)
127 def _perform_dynamic_instance_discovery(self):
128 discovery_endpoint = self._create_instance_discovery_endpoint_from_template(
129 AADConstants.WORLD_WIDE_AUTHORITY)
130 get_options = util.create_request_options(self)
131 operation = "Instance Discovery"
132 self._log.debug("Attempting instance discover at: %(discovery_endpoint)s",
133 {"discovery_endpoint": discovery_endpoint.geturl()})
135 try:
136 resp = requests.get(discovery_endpoint.geturl(), headers=get_options['headers'],
137 verify=self._call_context.get('verify_ssl', None),
138 proxies=self._call_context.get('proxies', None))
139 util.log_return_correlation_id(self._log, operation, resp)
140 except Exception:
141 self._log.exception("%(operation)s request failed",
142 {"operation": operation})
143 raise
145 if resp.status_code == 429:
146 resp.raise_for_status() # Will raise requests.exceptions.HTTPError
147 if not util.is_http_success(resp.status_code):
148 return_error_string = u"{} request returned http error: {}".format(operation,
149 resp.status_code)
150 error_response = ""
151 if resp.text:
152 return_error_string = u"{} and server response: {}".format(return_error_string,
153 resp.text)
154 try:
155 error_response = resp.json()
156 except ValueError:
157 pass
159 raise AdalError(return_error_string, error_response)
161 else:
162 discovery_resp = resp.json()
163 if discovery_resp.get('tenant_discovery_endpoint'):
164 return discovery_resp['tenant_discovery_endpoint']
165 else:
166 raise AdalError('Failed to parse instance discovery response')
168 def _validate_via_instance_discovery(self):
169 valid = self._perform_static_instance_discovery()
170 if not valid:
171 self._perform_dynamic_instance_discovery()
173 def _get_oauth_endpoints(self):
175 if (not self.token_endpoint) or (not self.device_code_endpoint):
176 self.token_endpoint = self._url.geturl() + AADConstants.TOKEN_ENDPOINT_PATH
177 self.device_code_endpoint = self._url.geturl() + AADConstants.DEVICE_ENDPOINT_PATH
179 def validate(self, call_context):
181 self._log = log.Logger('Authority', call_context['log_context'])
182 self._call_context = call_context
184 if not self._validated:
185 self._log.debug("Performing instance discovery: %(authority)s",
186 {"authority": self._url.geturl()})
187 self._validate_via_instance_discovery()
188 self._validated = True
189 else:
190 self._log.debug(
191 "Instance discovery/validation has either already been completed or is turned off: %(authority)s",
192 {"authority": self._url.geturl()})
194 self._get_oauth_endpoints()