Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/authority.py: 36%
80 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:13 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:13 +0000
1import json
2try:
3 from urllib.parse import urlparse
4except ImportError: # Fall back to Python 2
5 from urlparse import urlparse
6import logging
8from .exceptions import MsalServiceError
11logger = logging.getLogger(__name__)
13# Endpoints were copied from here
14# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints
15AZURE_US_GOVERNMENT = "login.microsoftonline.us"
16AZURE_CHINA = "login.chinacloudapi.cn"
17AZURE_PUBLIC = "login.microsoftonline.com"
19WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net
20WELL_KNOWN_AUTHORITY_HOSTS = set([
21 WORLD_WIDE,
22 AZURE_CHINA,
23 'login-us.microsoftonline.com',
24 AZURE_US_GOVERNMENT,
25 ])
26WELL_KNOWN_B2C_HOSTS = [
27 "b2clogin.com",
28 "b2clogin.cn",
29 "b2clogin.us",
30 "b2clogin.de",
31 ]
34class AuthorityBuilder(object):
35 def __init__(self, instance, tenant):
36 """A helper to save caller from doing string concatenation.
38 Usage is documented in :func:`application.ClientApplication.__init__`.
39 """
40 self._instance = instance.rstrip("/")
41 self._tenant = tenant.strip("/")
43 def __str__(self):
44 return "https://{}/{}".format(self._instance, self._tenant)
47class Authority(object):
48 """This class represents an (already-validated) authority.
50 Once constructed, it contains members named "*_endpoint" for this instance.
51 TODO: It will also cache the previously-validated authority instances.
52 """
53 _domains_without_user_realm_discovery = set([])
55 @property
56 def http_client(self): # Obsolete. We will remove this eventually
57 warnings.warn(
58 "authority.http_client might be removed in MSAL Python 1.21+", DeprecationWarning)
59 return self._http_client
61 def __init__(
62 self, authority_url, http_client,
63 validate_authority=True,
64 instance_discovery=None,
65 ):
66 """Creates an authority instance, and also validates it.
68 :param validate_authority:
69 The Authority validation process actually checks two parts:
70 instance (a.k.a. host) and tenant. We always do a tenant discovery.
71 This parameter only controls whether an instance discovery will be
72 performed.
73 """
74 # :param instance_discovery:
75 # By default, the known-to-Microsoft validation will use an
76 # instance discovery endpoint located at ``login.microsoftonline.com``.
77 # You can customize the endpoint by providing a url as a string.
78 # Or you can turn this behavior off by passing in a False here.
79 self._http_client = http_client
80 if isinstance(authority_url, AuthorityBuilder):
81 authority_url = str(authority_url)
82 authority, self.instance, tenant = canonicalize(authority_url)
83 self.is_adfs = tenant.lower() == 'adfs'
84 parts = authority.path.split('/')
85 self._is_b2c = any(
86 self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS
87 ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_"))
88 self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority
89 is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS
90 instance_discovery_endpoint = 'https://{}/common/discovery/instance'.format( # Note: This URL seemingly returns V1 endpoint only
91 WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too
92 # See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103
93 # and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33
94 ) if instance_discovery in (None, True) else instance_discovery
95 if instance_discovery_endpoint and not (
96 is_known_to_microsoft or self._is_known_to_developer):
97 payload = _instance_discovery(
98 "https://{}{}/oauth2/v2.0/authorize".format(
99 self.instance, authority.path),
100 self._http_client,
101 instance_discovery_endpoint)
102 if payload.get("error") == "invalid_instance":
103 raise ValueError(
104 "invalid_instance: "
105 "The authority you provided, %s, is not whitelisted. "
106 "If it is indeed your legit customized domain name, "
107 "you can turn off this check by passing in "
108 "validate_authority=False"
109 % authority_url)
110 tenant_discovery_endpoint = payload['tenant_discovery_endpoint']
111 else:
112 tenant_discovery_endpoint = (
113 'https://{}:{}{}{}/.well-known/openid-configuration'.format(
114 self.instance,
115 443 if authority.port is None else authority.port,
116 authority.path, # In B2C scenario, it is "/tenant/policy"
117 "" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint
118 ))
119 try:
120 openid_config = tenant_discovery(
121 tenant_discovery_endpoint,
122 self._http_client)
123 except ValueError:
124 raise ValueError(
125 "Unable to get authority configuration for {}. "
126 "Authority would typically be in a format of "
127 "https://login.microsoftonline.com/your_tenant "
128 "Also please double check your tenant name or GUID is correct.".format(
129 authority_url))
130 logger.debug("openid_config = %s", openid_config)
131 self.authorization_endpoint = openid_config['authorization_endpoint']
132 self.token_endpoint = openid_config['token_endpoint']
133 self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
134 _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID
136 def user_realm_discovery(self, username, correlation_id=None, response=None):
137 # It will typically return a dict containing "ver", "account_type",
138 # "federation_protocol", "cloud_audience_urn",
139 # "federation_metadata_url", "federation_active_auth_url", etc.
140 if self.instance not in self.__class__._domains_without_user_realm_discovery:
141 resp = response or self._http_client.get(
142 "https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
143 netloc=self.instance, username=username),
144 headers={'Accept': 'application/json',
145 'client-request-id': correlation_id},)
146 if resp.status_code != 404:
147 resp.raise_for_status()
148 return json.loads(resp.text)
149 self.__class__._domains_without_user_realm_discovery.add(self.instance)
150 return {} # This can guide the caller to fall back normal ROPC flow
153def canonicalize(authority_url):
154 # Returns (url_parsed_result, hostname_in_lowercase, tenant)
155 authority = urlparse(authority_url)
156 parts = authority.path.split("/")
157 if authority.scheme != "https" or len(parts) < 2 or not parts[1]:
158 raise ValueError(
159 "Your given address (%s) should consist of "
160 "an https url with a minimum of one segment in a path: e.g. "
161 "https://login.microsoftonline.com/<tenant> "
162 "or https://<tenant_name>.b2clogin.com/<tenant_name>.onmicrosoft.com/policy"
163 % authority_url)
164 return authority, authority.hostname, parts[1]
166def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs):
167 resp = http_client.get(
168 instance_discovery_endpoint,
169 params={'authorization_endpoint': url, 'api-version': '1.0'},
170 **kwargs)
171 return json.loads(resp.text)
173def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
174 # Returns Openid Configuration
175 resp = http_client.get(tenant_discovery_endpoint, **kwargs)
176 if resp.status_code == 200:
177 payload = json.loads(resp.text) # It could raise ValueError
178 if 'authorization_endpoint' in payload and 'token_endpoint' in payload:
179 return payload # Happy path
180 raise ValueError("OIDC Discovery does not provide enough information")
181 if 400 <= resp.status_code < 500:
182 # Nonexist tenant would hit this path
183 # e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration
184 raise ValueError(
185 "OIDC Discovery endpoint rejects our request. Error: {}".format(
186 resp.text # Expose it as-is b/c OIDC defines no error response format
187 ))
188 # Transient network error would hit this path
189 resp.raise_for_status()
190 raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op
191 "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))