Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/authority.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import json
2try:
3 from urllib.parse import urlparse
4except ImportError: # Fall back to Python 2
5 from urlparse import urlparse
6import logging
8logger = logging.getLogger(__name__)
9# Endpoints were copied from here
10# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints
11AZURE_US_GOVERNMENT = "login.microsoftonline.us"
12DEPRECATED_AZURE_CHINA = "login.chinacloudapi.cn"
13AZURE_PUBLIC = "login.microsoftonline.com"
14AZURE_GOV_FR = "login.sovcloud-identity.fr"
15AZURE_GOV_DE = "login.sovcloud-identity.de"
16AZURE_GOV_SG = "login.sovcloud-identity.sg"
18WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net
19WELL_KNOWN_AUTHORITY_HOSTS = frozenset([
20 WORLD_WIDE,
21 "login.microsoft.com",
22 "login.windows.net",
23 "sts.windows.net",
24 DEPRECATED_AZURE_CHINA,
25 "login.partner.microsoftonline.cn",
26 "login.microsoftonline.de", # deprecated
27 'login-us.microsoftonline.com',
28 AZURE_US_GOVERNMENT,
29 "login.usgovcloudapi.net",
30 AZURE_GOV_FR,
31 AZURE_GOV_DE,
32 AZURE_GOV_SG,
33 ])
35WELL_KNOWN_B2C_HOSTS = [
36 "b2clogin.com",
37 "b2clogin.cn",
38 "b2clogin.us",
39 "b2clogin.de",
40 "ciamlogin.com",
41 ]
42_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com"
45def _get_instance_discovery_host(instance):
46 return instance if instance in WELL_KNOWN_AUTHORITY_HOSTS else WORLD_WIDE
49def _get_instance_discovery_endpoint(instance):
50 return 'https://{}/common/discovery/instance'.format(
51 _get_instance_discovery_host(instance))
54class AuthorityBuilder(object):
55 def __init__(self, instance, tenant):
56 """A helper to save caller from doing string concatenation.
58 Usage is documented in :func:`application.ClientApplication.__init__`.
59 """
60 self._instance = instance.rstrip("/")
61 self._tenant = tenant.strip("/")
63 def __str__(self):
64 return "https://{}/{}".format(self._instance, self._tenant)
67class Authority(object):
68 """This class represents an (already-validated) authority.
70 Once constructed, it contains members named "*_endpoint" for this instance.
71 TODO: It will also cache the previously-validated authority instances.
72 """
73 _domains_without_user_realm_discovery = set([])
75 def __init__(
76 self, authority_url, http_client,
77 validate_authority=True,
78 instance_discovery=None,
79 oidc_authority_url=None,
80 ):
81 """Creates an authority instance, and also validates it.
83 :param validate_authority:
84 The Authority validation process actually checks two parts:
85 instance (a.k.a. host) and tenant. We always do a tenant discovery.
86 This parameter only controls whether an instance discovery will be
87 performed.
88 """
89 self._http_client = http_client
90 self._oidc_authority_url = oidc_authority_url
91 if oidc_authority_url:
92 tenant_discovery_endpoint = self._initialize_oidc_authority(
93 oidc_authority_url)
94 else:
95 tenant_discovery_endpoint = self._initialize_entra_authority(
96 authority_url, validate_authority, instance_discovery)
97 try:
98 openid_config = tenant_discovery(
99 tenant_discovery_endpoint,
100 self._http_client)
101 except ValueError:
102 error_message = (
103 "Unable to get OIDC authority configuration for {url} "
104 "because its OIDC Discovery endpoint is unavailable at "
105 "{url}/.well-known/openid-configuration ".format(url=oidc_authority_url)
106 if oidc_authority_url else
107 "Unable to get authority configuration for {}. "
108 "Authority would typically be in a format of "
109 "https://login.microsoftonline.com/your_tenant "
110 "or https://tenant_name.ciamlogin.com "
111 "or https://tenant_name.b2clogin.com/tenant.onmicrosoft.com/policy. "
112 .format(authority_url)
113 ) + " Also please double check your tenant name or GUID is correct."
114 raise ValueError(error_message)
115 self._issuer = openid_config.get('issuer')
116 self.authorization_endpoint = openid_config['authorization_endpoint']
117 self.token_endpoint = openid_config['token_endpoint']
118 self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
119 _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID
121 # Validate the issuer if using OIDC authority
122 if self._oidc_authority_url and not self.has_valid_issuer():
123 raise ValueError((
124 "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. "
125 "When using the 'oidc_authority' parameter in ClientApplication, the authority "
126 "will be validated against the issuer from {auth}/.well-known/openid-configuration ."
127 "If using a known Entra authority (e.g. login.microsoftonline.com) the "
128 "'authority' parameter should be used instead of 'oidc_authority'. "
129 ""
130 ).format(iss=self._issuer, auth=oidc_authority_url))
131 def _initialize_oidc_authority(self, oidc_authority_url):
132 authority, self.instance, tenant = canonicalize(oidc_authority_url)
133 self.is_adfs = tenant.lower() == 'adfs' # As a convention
134 self._is_b2c = True # Not exactly true, but
135 # OIDC Authority was designed for CIAM which is the next gen of B2C.
136 # Besides, application.py uses this to bypass broker.
137 self._is_known_to_developer = True # Not really relevant, but application.py uses this to bypass authority validation
138 return oidc_authority_url + "/.well-known/openid-configuration"
140 def _initialize_entra_authority(
141 self, authority_url, validate_authority, instance_discovery):
142 # :param instance_discovery:
143 # By default, the known-to-Microsoft validation will use an
144 # instance discovery endpoint located at ``login.microsoftonline.com``.
145 # You can customize the endpoint by providing a url as a string.
146 # Or you can turn this behavior off by passing in a False here.
147 if isinstance(authority_url, AuthorityBuilder):
148 authority_url = str(authority_url)
149 authority, self.instance, tenant = canonicalize(authority_url)
150 is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX)
151 self.is_adfs = tenant.lower() == 'adfs' and not is_ciam
152 parts = authority.path.split('/')
153 self._is_b2c = any(
154 self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS
155 ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_"))
156 self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority
157 is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS
158 instance_discovery_endpoint = _get_instance_discovery_endpoint( # Note: This URL seemingly returns V1 endpoint only
159 self.instance
160 ) if instance_discovery in (None, True) else instance_discovery
161 if instance_discovery_endpoint and not (
162 is_known_to_microsoft or self._is_known_to_developer):
163 payload = _instance_discovery(
164 "https://{}{}/oauth2/v2.0/authorize".format(
165 self.instance, authority.path),
166 self._http_client,
167 instance_discovery_endpoint)
168 if payload.get("error") == "invalid_instance":
169 raise ValueError(
170 "invalid_instance: "
171 "The authority you provided, %s, is not known. "
172 "If it is a valid domain name known to you, "
173 "you can turn off this check by passing in "
174 "instance_discovery=False"
175 % authority_url)
176 tenant_discovery_endpoint = payload['tenant_discovery_endpoint']
177 else:
178 tenant_discovery_endpoint = authority._replace(
179 path="{prefix}{version}/.well-known/openid-configuration".format(
180 prefix=tenant if is_ciam and len(authority.path) <= 1 # Path-less CIAM
181 else authority.path, # In B2C, it is "/tenant/policy"
182 version="" if self.is_adfs else "/v2.0",
183 )
184 ).geturl() # Keeping original port and query. Query is useful for test.
185 return tenant_discovery_endpoint
187 def user_realm_discovery(self, username, correlation_id=None, response=None):
188 # It will typically return a dict containing "ver", "account_type",
189 # "federation_protocol", "cloud_audience_urn",
190 # "federation_metadata_url", "federation_active_auth_url", etc.
191 if self.instance not in self.__class__._domains_without_user_realm_discovery:
192 resp = response or self._http_client.get(
193 "https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
194 netloc=self.instance, username=username),
195 headers={'Accept': 'application/json',
196 'client-request-id': correlation_id},)
197 if resp.status_code != 404:
198 resp.raise_for_status()
199 return json.loads(resp.text)
200 self.__class__._domains_without_user_realm_discovery.add(self.instance)
201 return {} # This can guide the caller to fall back normal ROPC flow
203 def has_valid_issuer(self):
204 """
205 Returns True if the issuer from OIDC discovery is valid for this authority.
207 An issuer is valid if one of the following is true:
208 - It exactly matches the authority URL (with/without trailing slash)
209 - It has the same scheme and host as the authority (path can be different)
210 - The issuer host is a well-known Microsoft authority host
211 - The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com)
212 - For CIAM, hosts that end with well-known B2C hosts (e.g., tenant.b2clogin.com) are accepted as valid issuers
213 """
214 if not self._issuer or not self._oidc_authority_url:
215 return False
217 # Case 1: Exact match (most common case, normalized for trailing slashes)
218 if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"):
219 return True
221 issuer_parsed = urlparse(self._issuer)
222 authority_parsed = urlparse(self._oidc_authority_url)
223 issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None
225 if not issuer_host:
226 return False
228 # Case 2: Issuer is from a trusted Microsoft host - O(1) lookup
229 if issuer_host in WELL_KNOWN_AUTHORITY_HOSTS:
230 return True
232 # Case 3: Regional variant check - O(1) lookup
233 # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com"
234 dot_index = issuer_host.find(".")
235 if dot_index > 0:
236 potential_base = issuer_host[dot_index + 1:]
237 if "." not in issuer_host[:dot_index]:
238 # 3a: Base host is a trusted Microsoft host
239 if potential_base in WELL_KNOWN_AUTHORITY_HOSTS:
240 return True
241 # 3b: Issuer has a region prefix on the authority host
242 # e.g. issuer=us.someweb.com, authority=someweb.com
243 authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else ""
244 if potential_base == authority_host:
245 return True
247 # Case 4: Same scheme and host (path can differ)
248 if (authority_parsed.scheme == issuer_parsed.scheme and
249 authority_parsed.netloc == issuer_parsed.netloc):
250 return True
252 # Case 5: Check if issuer host is a subdomain of a well-known B2C host
253 # e.g., tenant.b2clogin.com matches .b2clogin.com
254 # but fakeb2clogin.com does not
255 if any(issuer_host.endswith("." + h) for h in WELL_KNOWN_B2C_HOSTS):
256 return True
258 return False
260def canonicalize(authority_or_auth_endpoint):
261 # Returns (url_parsed_result, hostname_in_lowercase, tenant)
262 authority = urlparse(authority_or_auth_endpoint)
263 if authority.scheme == "https" and authority.hostname:
264 parts = authority.path.split("/")
265 first_part = parts[1] if len(parts) >= 2 and parts[1] else None
266 if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM
267 # Use path in CIAM authority. It will be validated by OIDC Discovery soon
268 tenant = first_part if first_part else "{}.onmicrosoft.com".format(
269 # Fallback to sub domain name. This variation may not be advertised
270 authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0])
271 return authority, authority.hostname, tenant
272 # AAD
273 if len(parts) >= 2 and parts[1]:
274 return authority, authority.hostname, parts[1]
275 raise ValueError(
276 "Your given address (%s) should consist of "
277 "an https url with hostname and a minimum of one segment in a path: e.g. "
278 "https://login.microsoftonline.com/{tenant} "
279 "or https://{tenant_name}.ciamlogin.com/{tenant} "
280 "or https://{tenant_name}.b2clogin.com/{tenant_name}.onmicrosoft.com/policy"
281 % authority_or_auth_endpoint)
283def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs):
284 resp = http_client.get(
285 instance_discovery_endpoint,
286 params={'authorization_endpoint': url, 'api-version': '1.0'},
287 **kwargs)
288 return json.loads(resp.text)
290def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
291 # Returns Openid Configuration
292 resp = http_client.get(tenant_discovery_endpoint, **kwargs)
293 if resp.status_code == 200:
294 return json.loads(resp.text) # It could raise ValueError
295 if 400 <= resp.status_code < 500:
296 # Nonexist tenant would hit this path
297 # e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration
298 raise ValueError("OIDC Discovery failed on {}. HTTP status: {}, Error: {}".format(
299 tenant_discovery_endpoint,
300 resp.status_code,
301 resp.text, # Expose it as-is b/c OIDC defines no error response format
302 ))
303 # Transient network error would hit this path
304 resp.raise_for_status()
305 raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op
306 "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))