Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/authority.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import json
2try:
3 from urllib.parse import urlparse
4except ImportError: # Fall back to Python 2
5 from urlparse import urlparse
6import logging
9logger = logging.getLogger(__name__)
11# Endpoints were copied from here
12# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints
13AZURE_US_GOVERNMENT = "login.microsoftonline.us"
14AZURE_CHINA = "login.chinacloudapi.cn"
15AZURE_PUBLIC = "login.microsoftonline.com"
17WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net
18WELL_KNOWN_AUTHORITY_HOSTS = set([
19 WORLD_WIDE,
20 AZURE_CHINA,
21 'login-us.microsoftonline.com',
22 AZURE_US_GOVERNMENT,
23 ])
24WELL_KNOWN_B2C_HOSTS = [
25 "b2clogin.com",
26 "b2clogin.cn",
27 "b2clogin.us",
28 "b2clogin.de",
29 "ciamlogin.com",
30 ]
31_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com"
34class AuthorityBuilder(object):
35 def __init__(self, instance, tenant):
36 """A helper to save caller from doing string concatenation.
38 Usage is documented in :func:`application.ClientApplication.__init__`.
39 """
40 self._instance = instance.rstrip("/")
41 self._tenant = tenant.strip("/")
43 def __str__(self):
44 return "https://{}/{}".format(self._instance, self._tenant)
47class Authority(object):
48 """This class represents an (already-validated) authority.
50 Once constructed, it contains members named "*_endpoint" for this instance.
51 TODO: It will also cache the previously-validated authority instances.
52 """
53 _domains_without_user_realm_discovery = set([])
55 def __init__(
56 self, authority_url, http_client,
57 validate_authority=True,
58 instance_discovery=None,
59 oidc_authority_url=None,
60 ):
61 """Creates an authority instance, and also validates it.
63 :param validate_authority:
64 The Authority validation process actually checks two parts:
65 instance (a.k.a. host) and tenant. We always do a tenant discovery.
66 This parameter only controls whether an instance discovery will be
67 performed.
68 """
69 self._http_client = http_client
70 if oidc_authority_url:
71 logger.debug("Initializing with OIDC authority: %s", oidc_authority_url)
72 tenant_discovery_endpoint = self._initialize_oidc_authority(
73 oidc_authority_url)
74 else:
75 logger.debug("Initializing with Entra authority: %s", authority_url)
76 tenant_discovery_endpoint = self._initialize_entra_authority(
77 authority_url, validate_authority, instance_discovery)
78 try:
79 openid_config = tenant_discovery(
80 tenant_discovery_endpoint,
81 self._http_client)
82 except ValueError:
83 error_message = (
84 "Unable to get OIDC authority configuration for {url} "
85 "because its OIDC Discovery endpoint is unavailable at "
86 "{url}/.well-known/openid-configuration ".format(url=oidc_authority_url)
87 if oidc_authority_url else
88 "Unable to get authority configuration for {}. "
89 "Authority would typically be in a format of "
90 "https://login.microsoftonline.com/your_tenant "
91 "or https://tenant_name.ciamlogin.com "
92 "or https://tenant_name.b2clogin.com/tenant.onmicrosoft.com/policy. "
93 .format(authority_url)
94 ) + " Also please double check your tenant name or GUID is correct."
95 raise ValueError(error_message)
96 openid_config.pop("issuer", None) # Not used in MSAL.py, so remove it therefore no need to validate it
97 logger.debug(
98 'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config)
99 self.authorization_endpoint = openid_config['authorization_endpoint']
100 self.token_endpoint = openid_config['token_endpoint']
101 self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
102 _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID
104 def _initialize_oidc_authority(self, oidc_authority_url):
105 authority, self.instance, tenant = canonicalize(oidc_authority_url)
106 self.is_adfs = tenant.lower() == 'adfs' # As a convention
107 self._is_b2c = True # Not exactly true, but
108 # OIDC Authority was designed for CIAM which is the next gen of B2C.
109 # Besides, application.py uses this to bypass broker.
110 self._is_known_to_developer = True # Not really relevant, but application.py uses this to bypass authority validation
111 return oidc_authority_url + "/.well-known/openid-configuration"
113 def _initialize_entra_authority(
114 self, authority_url, validate_authority, instance_discovery):
115 # :param instance_discovery:
116 # By default, the known-to-Microsoft validation will use an
117 # instance discovery endpoint located at ``login.microsoftonline.com``.
118 # You can customize the endpoint by providing a url as a string.
119 # Or you can turn this behavior off by passing in a False here.
120 if isinstance(authority_url, AuthorityBuilder):
121 authority_url = str(authority_url)
122 authority, self.instance, tenant = canonicalize(authority_url)
123 is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX)
124 self.is_adfs = tenant.lower() == 'adfs' and not is_ciam
125 parts = authority.path.split('/')
126 self._is_b2c = any(
127 self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS
128 ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_"))
129 self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority
130 is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS
131 instance_discovery_endpoint = 'https://{}/common/discovery/instance'.format( # Note: This URL seemingly returns V1 endpoint only
132 WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too
133 # See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103
134 # and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33
135 ) if instance_discovery in (None, True) else instance_discovery
136 if instance_discovery_endpoint and not (
137 is_known_to_microsoft or self._is_known_to_developer):
138 payload = _instance_discovery(
139 "https://{}{}/oauth2/v2.0/authorize".format(
140 self.instance, authority.path),
141 self._http_client,
142 instance_discovery_endpoint)
143 if payload.get("error") == "invalid_instance":
144 raise ValueError(
145 "invalid_instance: "
146 "The authority you provided, %s, is not whitelisted. "
147 "If it is indeed your legit customized domain name, "
148 "you can turn off this check by passing in "
149 "instance_discovery=False"
150 % authority_url)
151 tenant_discovery_endpoint = payload['tenant_discovery_endpoint']
152 else:
153 tenant_discovery_endpoint = authority._replace(
154 path="{prefix}{version}/.well-known/openid-configuration".format(
155 prefix=tenant if is_ciam and len(authority.path) <= 1 # Path-less CIAM
156 else authority.path, # In B2C, it is "/tenant/policy"
157 version="" if self.is_adfs else "/v2.0",
158 )
159 ).geturl() # Keeping original port and query. Query is useful for test.
160 return tenant_discovery_endpoint
162 def user_realm_discovery(self, username, correlation_id=None, response=None):
163 # It will typically return a dict containing "ver", "account_type",
164 # "federation_protocol", "cloud_audience_urn",
165 # "federation_metadata_url", "federation_active_auth_url", etc.
166 if self.instance not in self.__class__._domains_without_user_realm_discovery:
167 resp = response or self._http_client.get(
168 "https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
169 netloc=self.instance, username=username),
170 headers={'Accept': 'application/json',
171 'client-request-id': correlation_id},)
172 if resp.status_code != 404:
173 resp.raise_for_status()
174 return json.loads(resp.text)
175 self.__class__._domains_without_user_realm_discovery.add(self.instance)
176 return {} # This can guide the caller to fall back normal ROPC flow
179def canonicalize(authority_or_auth_endpoint):
180 # Returns (url_parsed_result, hostname_in_lowercase, tenant)
181 authority = urlparse(authority_or_auth_endpoint)
182 if authority.scheme == "https" and authority.hostname:
183 parts = authority.path.split("/")
184 first_part = parts[1] if len(parts) >= 2 and parts[1] else None
185 if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM
186 # Use path in CIAM authority. It will be validated by OIDC Discovery soon
187 tenant = first_part if first_part else "{}.onmicrosoft.com".format(
188 # Fallback to sub domain name. This variation may not be advertised
189 authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0])
190 return authority, authority.hostname, tenant
191 # AAD
192 if len(parts) >= 2 and parts[1]:
193 return authority, authority.hostname, parts[1]
194 raise ValueError(
195 "Your given address (%s) should consist of "
196 "an https url with hostname and a minimum of one segment in a path: e.g. "
197 "https://login.microsoftonline.com/{tenant} "
198 "or https://{tenant_name}.ciamlogin.com/{tenant} "
199 "or https://{tenant_name}.b2clogin.com/{tenant_name}.onmicrosoft.com/policy"
200 % authority_or_auth_endpoint)
202def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs):
203 resp = http_client.get(
204 instance_discovery_endpoint,
205 params={'authorization_endpoint': url, 'api-version': '1.0'},
206 **kwargs)
207 return json.loads(resp.text)
209def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
210 # Returns Openid Configuration
211 resp = http_client.get(tenant_discovery_endpoint, **kwargs)
212 if resp.status_code == 200:
213 return json.loads(resp.text) # It could raise ValueError
214 if 400 <= resp.status_code < 500:
215 # Nonexist tenant would hit this path
216 # e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration
217 raise ValueError("OIDC Discovery failed on {}. HTTP status: {}, Error: {}".format(
218 tenant_discovery_endpoint,
219 resp.status_code,
220 resp.text, # Expose it as-is b/c OIDC defines no error response format
221 ))
222 # Transient network error would hit this path
223 resp.raise_for_status()
224 raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op
225 "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))