Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/authority.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

128 statements  

1import json 

2try: 

3 from urllib.parse import urlparse 

4except ImportError: # Fall back to Python 2 

5 from urlparse import urlparse 

6import logging 

7 

8logger = logging.getLogger(__name__) 

9# Endpoints were copied from here 

10# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints 

11AZURE_US_GOVERNMENT = "login.microsoftonline.us" 

12DEPRECATED_AZURE_CHINA = "login.chinacloudapi.cn" 

13AZURE_PUBLIC = "login.microsoftonline.com" 

14AZURE_GOV_FR = "login.sovcloud-identity.fr" 

15AZURE_GOV_DE = "login.sovcloud-identity.de" 

16AZURE_GOV_SG = "login.sovcloud-identity.sg" 

17 

18WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net 

19WELL_KNOWN_AUTHORITY_HOSTS = frozenset([ 

20 WORLD_WIDE, 

21 "login.microsoft.com", 

22 "login.windows.net", 

23 "sts.windows.net", 

24 DEPRECATED_AZURE_CHINA, 

25 "login.partner.microsoftonline.cn", 

26 "login.microsoftonline.de", # deprecated 

27 'login-us.microsoftonline.com', 

28 AZURE_US_GOVERNMENT, 

29 "login.usgovcloudapi.net", 

30 AZURE_GOV_FR, 

31 AZURE_GOV_DE, 

32 AZURE_GOV_SG, 

33 ]) 

34 

35WELL_KNOWN_B2C_HOSTS = [ 

36 "b2clogin.com", 

37 "b2clogin.cn", 

38 "b2clogin.us", 

39 "b2clogin.de", 

40 "ciamlogin.com", 

41 ] 

42_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com" 

43 

44 

45def _get_instance_discovery_host(instance): 

46 return instance if instance in WELL_KNOWN_AUTHORITY_HOSTS else WORLD_WIDE 

47 

48 

49def _get_instance_discovery_endpoint(instance): 

50 return 'https://{}/common/discovery/instance'.format( 

51 _get_instance_discovery_host(instance)) 

52 

53 

54class AuthorityBuilder(object): 

55 def __init__(self, instance, tenant): 

56 """A helper to save caller from doing string concatenation. 

57 

58 Usage is documented in :func:`application.ClientApplication.__init__`. 

59 """ 

60 self._instance = instance.rstrip("/") 

61 self._tenant = tenant.strip("/") 

62 

63 def __str__(self): 

64 return "https://{}/{}".format(self._instance, self._tenant) 

65 

66 

67class Authority(object): 

68 """This class represents an (already-validated) authority. 

69 

70 Once constructed, it contains members named "*_endpoint" for this instance. 

71 TODO: It will also cache the previously-validated authority instances. 

72 """ 

73 _domains_without_user_realm_discovery = set([]) 

74 

75 def __init__( 

76 self, authority_url, http_client, 

77 validate_authority=True, 

78 instance_discovery=None, 

79 oidc_authority_url=None, 

80 ): 

81 """Creates an authority instance, and also validates it. 

82 

83 :param validate_authority: 

84 The Authority validation process actually checks two parts: 

85 instance (a.k.a. host) and tenant. We always do a tenant discovery. 

86 This parameter only controls whether an instance discovery will be 

87 performed. 

88 """ 

89 self._http_client = http_client 

90 self._oidc_authority_url = oidc_authority_url 

91 if oidc_authority_url: 

92 logger.debug("Initializing with OIDC authority: %s", oidc_authority_url) 

93 tenant_discovery_endpoint = self._initialize_oidc_authority( 

94 oidc_authority_url) 

95 else: 

96 tenant_discovery_endpoint = self._initialize_entra_authority( 

97 authority_url, validate_authority, instance_discovery) 

98 try: 

99 openid_config = tenant_discovery( 

100 tenant_discovery_endpoint, 

101 self._http_client) 

102 except ValueError: 

103 error_message = ( 

104 "Unable to get OIDC authority configuration for {url} " 

105 "because its OIDC Discovery endpoint is unavailable at " 

106 "{url}/.well-known/openid-configuration ".format(url=oidc_authority_url) 

107 if oidc_authority_url else 

108 "Unable to get authority configuration for {}. " 

109 "Authority would typically be in a format of " 

110 "https://login.microsoftonline.com/your_tenant " 

111 "or https://tenant_name.ciamlogin.com " 

112 "or https://tenant_name.b2clogin.com/tenant.onmicrosoft.com/policy. " 

113 .format(authority_url) 

114 ) + " Also please double check your tenant name or GUID is correct." 

115 raise ValueError(error_message) 

116 self._issuer = openid_config.get('issuer') 

117 self.authorization_endpoint = openid_config['authorization_endpoint'] 

118 self.token_endpoint = openid_config['token_endpoint'] 

119 self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') 

120 _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID 

121 

122 # Validate the issuer if using OIDC authority 

123 if self._oidc_authority_url and not self.has_valid_issuer(): 

124 raise ValueError(( 

125 "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " 

126 "When using the 'oidc_authority' parameter in ClientApplication, the authority " 

127 "will be validated against the issuer from {auth}/.well-known/openid-configuration ." 

128 "If using a known Entra authority (e.g. login.microsoftonline.com) the " 

129 "'authority' parameter should be used instead of 'oidc_authority'. " 

130 "" 

131 ).format(iss=self._issuer, auth=oidc_authority_url)) 

132 def _initialize_oidc_authority(self, oidc_authority_url): 

133 authority, self.instance, tenant = canonicalize(oidc_authority_url) 

134 self.is_adfs = tenant.lower() == 'adfs' # As a convention 

135 self._is_b2c = True # Not exactly true, but 

136 # OIDC Authority was designed for CIAM which is the next gen of B2C. 

137 # Besides, application.py uses this to bypass broker. 

138 self._is_known_to_developer = True # Not really relevant, but application.py uses this to bypass authority validation 

139 return oidc_authority_url + "/.well-known/openid-configuration" 

140 

141 def _initialize_entra_authority( 

142 self, authority_url, validate_authority, instance_discovery): 

143 # :param instance_discovery: 

144 # By default, the known-to-Microsoft validation will use an 

145 # instance discovery endpoint located at ``login.microsoftonline.com``. 

146 # You can customize the endpoint by providing a url as a string. 

147 # Or you can turn this behavior off by passing in a False here. 

148 if isinstance(authority_url, AuthorityBuilder): 

149 authority_url = str(authority_url) 

150 authority, self.instance, tenant = canonicalize(authority_url) 

151 is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX) 

152 self.is_adfs = tenant.lower() == 'adfs' and not is_ciam 

153 parts = authority.path.split('/') 

154 self._is_b2c = any( 

155 self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS 

156 ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_")) 

157 self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority 

158 is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS 

159 instance_discovery_endpoint = _get_instance_discovery_endpoint( # Note: This URL seemingly returns V1 endpoint only 

160 self.instance 

161 ) if instance_discovery in (None, True) else instance_discovery 

162 if instance_discovery_endpoint and not ( 

163 is_known_to_microsoft or self._is_known_to_developer): 

164 payload = _instance_discovery( 

165 "https://{}{}/oauth2/v2.0/authorize".format( 

166 self.instance, authority.path), 

167 self._http_client, 

168 instance_discovery_endpoint) 

169 if payload.get("error") == "invalid_instance": 

170 raise ValueError( 

171 "invalid_instance: " 

172 "The authority you provided, %s, is not known. " 

173 "If it is a valid domain name known to you, " 

174 "you can turn off this check by passing in " 

175 "instance_discovery=False" 

176 % authority_url) 

177 tenant_discovery_endpoint = payload['tenant_discovery_endpoint'] 

178 else: 

179 tenant_discovery_endpoint = authority._replace( 

180 path="{prefix}{version}/.well-known/openid-configuration".format( 

181 prefix=tenant if is_ciam and len(authority.path) <= 1 # Path-less CIAM 

182 else authority.path, # In B2C, it is "/tenant/policy" 

183 version="" if self.is_adfs else "/v2.0", 

184 ) 

185 ).geturl() # Keeping original port and query. Query is useful for test. 

186 return tenant_discovery_endpoint 

187 

188 def user_realm_discovery(self, username, correlation_id=None, response=None): 

189 # It will typically return a dict containing "ver", "account_type", 

190 # "federation_protocol", "cloud_audience_urn", 

191 # "federation_metadata_url", "federation_active_auth_url", etc. 

192 if self.instance not in self.__class__._domains_without_user_realm_discovery: 

193 resp = response or self._http_client.get( 

194 "https://{netloc}/common/userrealm/{username}?api-version=1.0".format( 

195 netloc=self.instance, username=username), 

196 headers={'Accept': 'application/json', 

197 'client-request-id': correlation_id},) 

198 if resp.status_code != 404: 

199 resp.raise_for_status() 

200 return json.loads(resp.text) 

201 self.__class__._domains_without_user_realm_discovery.add(self.instance) 

202 return {} # This can guide the caller to fall back normal ROPC flow 

203 

204 def has_valid_issuer(self): 

205 """ 

206 Returns True if the issuer from OIDC discovery is valid for this authority. 

207 

208 An issuer is valid if one of the following is true: 

209 - It exactly matches the authority URL (with/without trailing slash) 

210 - It has the same scheme and host as the authority (path can be different) 

211 - The issuer host is a well-known Microsoft authority host 

212 - The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com) 

213 - For CIAM, hosts that end with well-known B2C hosts (e.g., tenant.b2clogin.com) are accepted as valid issuers 

214 """ 

215 if not self._issuer or not self._oidc_authority_url: 

216 return False 

217 

218 # Case 1: Exact match (most common case, normalized for trailing slashes) 

219 if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"): 

220 return True 

221 

222 issuer_parsed = urlparse(self._issuer) 

223 authority_parsed = urlparse(self._oidc_authority_url) 

224 issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None 

225 

226 if not issuer_host: 

227 return False 

228 

229 # Case 2: Issuer is from a trusted Microsoft host - O(1) lookup 

230 if issuer_host in WELL_KNOWN_AUTHORITY_HOSTS: 

231 return True 

232 

233 # Case 3: Regional variant check - O(1) lookup 

234 # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com" 

235 dot_index = issuer_host.find(".") 

236 if dot_index > 0: 

237 potential_base = issuer_host[dot_index + 1:] 

238 if "." not in issuer_host[:dot_index]: 

239 # 3a: Base host is a trusted Microsoft host 

240 if potential_base in WELL_KNOWN_AUTHORITY_HOSTS: 

241 return True 

242 # 3b: Issuer has a region prefix on the authority host 

243 # e.g. issuer=us.someweb.com, authority=someweb.com 

244 authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else "" 

245 if potential_base == authority_host: 

246 return True 

247 

248 # Case 4: Same scheme and host (path can differ) 

249 if (authority_parsed.scheme == issuer_parsed.scheme and 

250 authority_parsed.netloc == issuer_parsed.netloc): 

251 return True 

252 

253 # Case 5: Check if issuer host ends with any well-known B2C host (e.g., tenant.b2clogin.com) 

254 if any(issuer_host.endswith(h) for h in WELL_KNOWN_B2C_HOSTS): 

255 return True 

256 

257 return False 

258 

259def canonicalize(authority_or_auth_endpoint): 

260 # Returns (url_parsed_result, hostname_in_lowercase, tenant) 

261 authority = urlparse(authority_or_auth_endpoint) 

262 if authority.scheme == "https" and authority.hostname: 

263 parts = authority.path.split("/") 

264 first_part = parts[1] if len(parts) >= 2 and parts[1] else None 

265 if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM 

266 # Use path in CIAM authority. It will be validated by OIDC Discovery soon 

267 tenant = first_part if first_part else "{}.onmicrosoft.com".format( 

268 # Fallback to sub domain name. This variation may not be advertised 

269 authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0]) 

270 return authority, authority.hostname, tenant 

271 # AAD 

272 if len(parts) >= 2 and parts[1]: 

273 return authority, authority.hostname, parts[1] 

274 raise ValueError( 

275 "Your given address (%s) should consist of " 

276 "an https url with hostname and a minimum of one segment in a path: e.g. " 

277 "https://login.microsoftonline.com/{tenant} " 

278 "or https://{tenant_name}.ciamlogin.com/{tenant} " 

279 "or https://{tenant_name}.b2clogin.com/{tenant_name}.onmicrosoft.com/policy" 

280 % authority_or_auth_endpoint) 

281 

282def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs): 

283 resp = http_client.get( 

284 instance_discovery_endpoint, 

285 params={'authorization_endpoint': url, 'api-version': '1.0'}, 

286 **kwargs) 

287 return json.loads(resp.text) 

288 

289def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs): 

290 # Returns Openid Configuration 

291 resp = http_client.get(tenant_discovery_endpoint, **kwargs) 

292 if resp.status_code == 200: 

293 return json.loads(resp.text) # It could raise ValueError 

294 if 400 <= resp.status_code < 500: 

295 # Nonexist tenant would hit this path 

296 # e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration 

297 raise ValueError("OIDC Discovery failed on {}. HTTP status: {}, Error: {}".format( 

298 tenant_discovery_endpoint, 

299 resp.status_code, 

300 resp.text, # Expose it as-is b/c OIDC defines no error response format 

301 )) 

302 # Transient network error would hit this path 

303 resp.raise_for_status() 

304 raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op 

305 "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))