Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/authority.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

127 statements  

1import json 

2try: 

3 from urllib.parse import urlparse 

4except ImportError: # Fall back to Python 2 

5 from urlparse import urlparse 

6import logging 

7 

8logger = logging.getLogger(__name__) 

9# Endpoints were copied from here 

10# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints 

11AZURE_US_GOVERNMENT = "login.microsoftonline.us" 

12DEPRECATED_AZURE_CHINA = "login.chinacloudapi.cn" 

13AZURE_PUBLIC = "login.microsoftonline.com" 

14AZURE_GOV_FR = "login.sovcloud-identity.fr" 

15AZURE_GOV_DE = "login.sovcloud-identity.de" 

16AZURE_GOV_SG = "login.sovcloud-identity.sg" 

17 

18WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net 

19WELL_KNOWN_AUTHORITY_HOSTS = frozenset([ 

20 WORLD_WIDE, 

21 "login.microsoft.com", 

22 "login.windows.net", 

23 "sts.windows.net", 

24 DEPRECATED_AZURE_CHINA, 

25 "login.partner.microsoftonline.cn", 

26 "login.microsoftonline.de", # deprecated 

27 'login-us.microsoftonline.com', 

28 AZURE_US_GOVERNMENT, 

29 "login.usgovcloudapi.net", 

30 AZURE_GOV_FR, 

31 AZURE_GOV_DE, 

32 AZURE_GOV_SG, 

33 ]) 

34 

35WELL_KNOWN_B2C_HOSTS = [ 

36 "b2clogin.com", 

37 "b2clogin.cn", 

38 "b2clogin.us", 

39 "b2clogin.de", 

40 "ciamlogin.com", 

41 ] 

42_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com" 

43 

44 

45def _get_instance_discovery_host(instance): 

46 return instance if instance in WELL_KNOWN_AUTHORITY_HOSTS else WORLD_WIDE 

47 

48 

49def _get_instance_discovery_endpoint(instance): 

50 return 'https://{}/common/discovery/instance'.format( 

51 _get_instance_discovery_host(instance)) 

52 

53 

54class AuthorityBuilder(object): 

55 def __init__(self, instance, tenant): 

56 """A helper to save caller from doing string concatenation. 

57 

58 Usage is documented in :func:`application.ClientApplication.__init__`. 

59 """ 

60 self._instance = instance.rstrip("/") 

61 self._tenant = tenant.strip("/") 

62 

63 def __str__(self): 

64 return "https://{}/{}".format(self._instance, self._tenant) 

65 

66 

67class Authority(object): 

68 """This class represents an (already-validated) authority. 

69 

70 Once constructed, it contains members named "*_endpoint" for this instance. 

71 TODO: It will also cache the previously-validated authority instances. 

72 """ 

73 _domains_without_user_realm_discovery = set([]) 

74 

75 def __init__( 

76 self, authority_url, http_client, 

77 validate_authority=True, 

78 instance_discovery=None, 

79 oidc_authority_url=None, 

80 ): 

81 """Creates an authority instance, and also validates it. 

82 

83 :param validate_authority: 

84 The Authority validation process actually checks two parts: 

85 instance (a.k.a. host) and tenant. We always do a tenant discovery. 

86 This parameter only controls whether an instance discovery will be 

87 performed. 

88 """ 

89 self._http_client = http_client 

90 self._oidc_authority_url = oidc_authority_url 

91 if oidc_authority_url: 

92 tenant_discovery_endpoint = self._initialize_oidc_authority( 

93 oidc_authority_url) 

94 else: 

95 tenant_discovery_endpoint = self._initialize_entra_authority( 

96 authority_url, validate_authority, instance_discovery) 

97 try: 

98 openid_config = tenant_discovery( 

99 tenant_discovery_endpoint, 

100 self._http_client) 

101 except ValueError: 

102 error_message = ( 

103 "Unable to get OIDC authority configuration for {url} " 

104 "because its OIDC Discovery endpoint is unavailable at " 

105 "{url}/.well-known/openid-configuration ".format(url=oidc_authority_url) 

106 if oidc_authority_url else 

107 "Unable to get authority configuration for {}. " 

108 "Authority would typically be in a format of " 

109 "https://login.microsoftonline.com/your_tenant " 

110 "or https://tenant_name.ciamlogin.com " 

111 "or https://tenant_name.b2clogin.com/tenant.onmicrosoft.com/policy. " 

112 .format(authority_url) 

113 ) + " Also please double check your tenant name or GUID is correct." 

114 raise ValueError(error_message) 

115 self._issuer = openid_config.get('issuer') 

116 self.authorization_endpoint = openid_config['authorization_endpoint'] 

117 self.token_endpoint = openid_config['token_endpoint'] 

118 self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') 

119 _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID 

120 

121 # Validate the issuer if using OIDC authority 

122 if self._oidc_authority_url and not self.has_valid_issuer(): 

123 raise ValueError(( 

124 "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " 

125 "When using the 'oidc_authority' parameter in ClientApplication, the authority " 

126 "will be validated against the issuer from {auth}/.well-known/openid-configuration ." 

127 "If using a known Entra authority (e.g. login.microsoftonline.com) the " 

128 "'authority' parameter should be used instead of 'oidc_authority'. " 

129 "" 

130 ).format(iss=self._issuer, auth=oidc_authority_url)) 

131 def _initialize_oidc_authority(self, oidc_authority_url): 

132 authority, self.instance, tenant = canonicalize(oidc_authority_url) 

133 self.is_adfs = tenant.lower() == 'adfs' # As a convention 

134 self._is_b2c = True # Not exactly true, but 

135 # OIDC Authority was designed for CIAM which is the next gen of B2C. 

136 # Besides, application.py uses this to bypass broker. 

137 self._is_known_to_developer = True # Not really relevant, but application.py uses this to bypass authority validation 

138 return oidc_authority_url + "/.well-known/openid-configuration" 

139 

140 def _initialize_entra_authority( 

141 self, authority_url, validate_authority, instance_discovery): 

142 # :param instance_discovery: 

143 # By default, the known-to-Microsoft validation will use an 

144 # instance discovery endpoint located at ``login.microsoftonline.com``. 

145 # You can customize the endpoint by providing a url as a string. 

146 # Or you can turn this behavior off by passing in a False here. 

147 if isinstance(authority_url, AuthorityBuilder): 

148 authority_url = str(authority_url) 

149 authority, self.instance, tenant = canonicalize(authority_url) 

150 is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX) 

151 self.is_adfs = tenant.lower() == 'adfs' and not is_ciam 

152 parts = authority.path.split('/') 

153 self._is_b2c = any( 

154 self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS 

155 ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_")) 

156 self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority 

157 is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS 

158 instance_discovery_endpoint = _get_instance_discovery_endpoint( # Note: This URL seemingly returns V1 endpoint only 

159 self.instance 

160 ) if instance_discovery in (None, True) else instance_discovery 

161 if instance_discovery_endpoint and not ( 

162 is_known_to_microsoft or self._is_known_to_developer): 

163 payload = _instance_discovery( 

164 "https://{}{}/oauth2/v2.0/authorize".format( 

165 self.instance, authority.path), 

166 self._http_client, 

167 instance_discovery_endpoint) 

168 if payload.get("error") == "invalid_instance": 

169 raise ValueError( 

170 "invalid_instance: " 

171 "The authority you provided, %s, is not known. " 

172 "If it is a valid domain name known to you, " 

173 "you can turn off this check by passing in " 

174 "instance_discovery=False" 

175 % authority_url) 

176 tenant_discovery_endpoint = payload['tenant_discovery_endpoint'] 

177 else: 

178 tenant_discovery_endpoint = authority._replace( 

179 path="{prefix}{version}/.well-known/openid-configuration".format( 

180 prefix=tenant if is_ciam and len(authority.path) <= 1 # Path-less CIAM 

181 else authority.path, # In B2C, it is "/tenant/policy" 

182 version="" if self.is_adfs else "/v2.0", 

183 ) 

184 ).geturl() # Keeping original port and query. Query is useful for test. 

185 return tenant_discovery_endpoint 

186 

187 def user_realm_discovery(self, username, correlation_id=None, response=None): 

188 # It will typically return a dict containing "ver", "account_type", 

189 # "federation_protocol", "cloud_audience_urn", 

190 # "federation_metadata_url", "federation_active_auth_url", etc. 

191 if self.instance not in self.__class__._domains_without_user_realm_discovery: 

192 resp = response or self._http_client.get( 

193 "https://{netloc}/common/userrealm/{username}?api-version=1.0".format( 

194 netloc=self.instance, username=username), 

195 headers={'Accept': 'application/json', 

196 'client-request-id': correlation_id},) 

197 if resp.status_code != 404: 

198 resp.raise_for_status() 

199 return json.loads(resp.text) 

200 self.__class__._domains_without_user_realm_discovery.add(self.instance) 

201 return {} # This can guide the caller to fall back normal ROPC flow 

202 

203 def has_valid_issuer(self): 

204 """ 

205 Returns True if the issuer from OIDC discovery is valid for this authority. 

206 

207 An issuer is valid if one of the following is true: 

208 - It exactly matches the authority URL (with/without trailing slash) 

209 - It has the same scheme and host as the authority (path can be different) 

210 - The issuer host is a well-known Microsoft authority host 

211 - The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com) 

212 - For CIAM, hosts that end with well-known B2C hosts (e.g., tenant.b2clogin.com) are accepted as valid issuers 

213 """ 

214 if not self._issuer or not self._oidc_authority_url: 

215 return False 

216 

217 # Case 1: Exact match (most common case, normalized for trailing slashes) 

218 if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"): 

219 return True 

220 

221 issuer_parsed = urlparse(self._issuer) 

222 authority_parsed = urlparse(self._oidc_authority_url) 

223 issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None 

224 

225 if not issuer_host: 

226 return False 

227 

228 # Case 2: Issuer is from a trusted Microsoft host - O(1) lookup 

229 if issuer_host in WELL_KNOWN_AUTHORITY_HOSTS: 

230 return True 

231 

232 # Case 3: Regional variant check - O(1) lookup 

233 # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com" 

234 dot_index = issuer_host.find(".") 

235 if dot_index > 0: 

236 potential_base = issuer_host[dot_index + 1:] 

237 if "." not in issuer_host[:dot_index]: 

238 # 3a: Base host is a trusted Microsoft host 

239 if potential_base in WELL_KNOWN_AUTHORITY_HOSTS: 

240 return True 

241 # 3b: Issuer has a region prefix on the authority host 

242 # e.g. issuer=us.someweb.com, authority=someweb.com 

243 authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else "" 

244 if potential_base == authority_host: 

245 return True 

246 

247 # Case 4: Same scheme and host (path can differ) 

248 if (authority_parsed.scheme == issuer_parsed.scheme and 

249 authority_parsed.netloc == issuer_parsed.netloc): 

250 return True 

251 

252 # Case 5: Check if issuer host is a subdomain of a well-known B2C host 

253 # e.g., tenant.b2clogin.com matches .b2clogin.com 

254 # but fakeb2clogin.com does not 

255 if any(issuer_host.endswith("." + h) for h in WELL_KNOWN_B2C_HOSTS): 

256 return True 

257 

258 return False 

259 

260def canonicalize(authority_or_auth_endpoint): 

261 # Returns (url_parsed_result, hostname_in_lowercase, tenant) 

262 authority = urlparse(authority_or_auth_endpoint) 

263 if authority.scheme == "https" and authority.hostname: 

264 parts = authority.path.split("/") 

265 first_part = parts[1] if len(parts) >= 2 and parts[1] else None 

266 if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM 

267 # Use path in CIAM authority. It will be validated by OIDC Discovery soon 

268 tenant = first_part if first_part else "{}.onmicrosoft.com".format( 

269 # Fallback to sub domain name. This variation may not be advertised 

270 authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0]) 

271 return authority, authority.hostname, tenant 

272 # AAD 

273 if len(parts) >= 2 and parts[1]: 

274 return authority, authority.hostname, parts[1] 

275 raise ValueError( 

276 "Your given address (%s) should consist of " 

277 "an https url with hostname and a minimum of one segment in a path: e.g. " 

278 "https://login.microsoftonline.com/{tenant} " 

279 "or https://{tenant_name}.ciamlogin.com/{tenant} " 

280 "or https://{tenant_name}.b2clogin.com/{tenant_name}.onmicrosoft.com/policy" 

281 % authority_or_auth_endpoint) 

282 

283def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs): 

284 resp = http_client.get( 

285 instance_discovery_endpoint, 

286 params={'authorization_endpoint': url, 'api-version': '1.0'}, 

287 **kwargs) 

288 return json.loads(resp.text) 

289 

290def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs): 

291 # Returns Openid Configuration 

292 resp = http_client.get(tenant_discovery_endpoint, **kwargs) 

293 if resp.status_code == 200: 

294 return json.loads(resp.text) # It could raise ValueError 

295 if 400 <= resp.status_code < 500: 

296 # Nonexist tenant would hit this path 

297 # e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration 

298 raise ValueError("OIDC Discovery failed on {}. HTTP status: {}, Error: {}".format( 

299 tenant_discovery_endpoint, 

300 resp.status_code, 

301 resp.text, # Expose it as-is b/c OIDC defines no error response format 

302 )) 

303 # Transient network error would hit this path 

304 resp.raise_for_status() 

305 raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op 

306 "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))