Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/authority.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1import json 

2try: 

3 from urllib.parse import urlparse 

4except ImportError: # Fall back to Python 2 

5 from urlparse import urlparse 

6import logging 

7 

8 

9logger = logging.getLogger(__name__) 

10 

11# Endpoints were copied from here 

12# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints 

13AZURE_US_GOVERNMENT = "login.microsoftonline.us" 

14AZURE_CHINA = "login.chinacloudapi.cn" 

15AZURE_PUBLIC = "login.microsoftonline.com" 

16 

17WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net 

18WELL_KNOWN_AUTHORITY_HOSTS = set([ 

19 WORLD_WIDE, 

20 AZURE_CHINA, 

21 'login-us.microsoftonline.com', 

22 AZURE_US_GOVERNMENT, 

23 ]) 

24WELL_KNOWN_B2C_HOSTS = [ 

25 "b2clogin.com", 

26 "b2clogin.cn", 

27 "b2clogin.us", 

28 "b2clogin.de", 

29 "ciamlogin.com", 

30 ] 

31_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com" 

32 

33 

34class AuthorityBuilder(object): 

35 def __init__(self, instance, tenant): 

36 """A helper to save caller from doing string concatenation. 

37 

38 Usage is documented in :func:`application.ClientApplication.__init__`. 

39 """ 

40 self._instance = instance.rstrip("/") 

41 self._tenant = tenant.strip("/") 

42 

43 def __str__(self): 

44 return "https://{}/{}".format(self._instance, self._tenant) 

45 

46 

47class Authority(object): 

48 """This class represents an (already-validated) authority. 

49 

50 Once constructed, it contains members named "*_endpoint" for this instance. 

51 TODO: It will also cache the previously-validated authority instances. 

52 """ 

53 _domains_without_user_realm_discovery = set([]) 

54 

55 def __init__( 

56 self, authority_url, http_client, 

57 validate_authority=True, 

58 instance_discovery=None, 

59 oidc_authority_url=None, 

60 ): 

61 """Creates an authority instance, and also validates it. 

62 

63 :param validate_authority: 

64 The Authority validation process actually checks two parts: 

65 instance (a.k.a. host) and tenant. We always do a tenant discovery. 

66 This parameter only controls whether an instance discovery will be 

67 performed. 

68 """ 

69 self._http_client = http_client 

70 if oidc_authority_url: 

71 logger.debug("Initializing with OIDC authority: %s", oidc_authority_url) 

72 tenant_discovery_endpoint = self._initialize_oidc_authority( 

73 oidc_authority_url) 

74 else: 

75 logger.debug("Initializing with Entra authority: %s", authority_url) 

76 tenant_discovery_endpoint = self._initialize_entra_authority( 

77 authority_url, validate_authority, instance_discovery) 

78 try: 

79 openid_config = tenant_discovery( 

80 tenant_discovery_endpoint, 

81 self._http_client) 

82 except ValueError: 

83 error_message = ( 

84 "Unable to get OIDC authority configuration for {url} " 

85 "because its OIDC Discovery endpoint is unavailable at " 

86 "{url}/.well-known/openid-configuration ".format(url=oidc_authority_url) 

87 if oidc_authority_url else 

88 "Unable to get authority configuration for {}. " 

89 "Authority would typically be in a format of " 

90 "https://login.microsoftonline.com/your_tenant " 

91 "or https://tenant_name.ciamlogin.com " 

92 "or https://tenant_name.b2clogin.com/tenant.onmicrosoft.com/policy. " 

93 .format(authority_url) 

94 ) + " Also please double check your tenant name or GUID is correct." 

95 raise ValueError(error_message) 

96 logger.debug( 

97 'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config) 

98 self.authorization_endpoint = openid_config['authorization_endpoint'] 

99 self.token_endpoint = openid_config['token_endpoint'] 

100 self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') 

101 _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID 

102 

103 def _initialize_oidc_authority(self, oidc_authority_url): 

104 authority, self.instance, tenant = canonicalize(oidc_authority_url) 

105 self.is_adfs = tenant.lower() == 'adfs' # As a convention 

106 self._is_b2c = True # Not exactly true, but 

107 # OIDC Authority was designed for CIAM which is the next gen of B2C. 

108 # Besides, application.py uses this to bypass broker. 

109 self._is_known_to_developer = True # Not really relevant, but application.py uses this to bypass authority validation 

110 return oidc_authority_url + "/.well-known/openid-configuration" 

111 

112 def _initialize_entra_authority( 

113 self, authority_url, validate_authority, instance_discovery): 

114 # :param instance_discovery: 

115 # By default, the known-to-Microsoft validation will use an 

116 # instance discovery endpoint located at ``login.microsoftonline.com``. 

117 # You can customize the endpoint by providing a url as a string. 

118 # Or you can turn this behavior off by passing in a False here. 

119 if isinstance(authority_url, AuthorityBuilder): 

120 authority_url = str(authority_url) 

121 authority, self.instance, tenant = canonicalize(authority_url) 

122 is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX) 

123 self.is_adfs = tenant.lower() == 'adfs' and not is_ciam 

124 parts = authority.path.split('/') 

125 self._is_b2c = any( 

126 self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS 

127 ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_")) 

128 self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority 

129 is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS 

130 instance_discovery_endpoint = 'https://{}/common/discovery/instance'.format( # Note: This URL seemingly returns V1 endpoint only 

131 WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too 

132 # See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103 

133 # and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33 

134 ) if instance_discovery in (None, True) else instance_discovery 

135 if instance_discovery_endpoint and not ( 

136 is_known_to_microsoft or self._is_known_to_developer): 

137 payload = _instance_discovery( 

138 "https://{}{}/oauth2/v2.0/authorize".format( 

139 self.instance, authority.path), 

140 self._http_client, 

141 instance_discovery_endpoint) 

142 if payload.get("error") == "invalid_instance": 

143 raise ValueError( 

144 "invalid_instance: " 

145 "The authority you provided, %s, is not whitelisted. " 

146 "If it is indeed your legit customized domain name, " 

147 "you can turn off this check by passing in " 

148 "instance_discovery=False" 

149 % authority_url) 

150 tenant_discovery_endpoint = payload['tenant_discovery_endpoint'] 

151 else: 

152 tenant_discovery_endpoint = authority._replace( 

153 path="{prefix}{version}/.well-known/openid-configuration".format( 

154 prefix=tenant if is_ciam and len(authority.path) <= 1 # Path-less CIAM 

155 else authority.path, # In B2C, it is "/tenant/policy" 

156 version="" if self.is_adfs else "/v2.0", 

157 ) 

158 ).geturl() # Keeping original port and query. Query is useful for test. 

159 return tenant_discovery_endpoint 

160 

161 def user_realm_discovery(self, username, correlation_id=None, response=None): 

162 # It will typically return a dict containing "ver", "account_type", 

163 # "federation_protocol", "cloud_audience_urn", 

164 # "federation_metadata_url", "federation_active_auth_url", etc. 

165 if self.instance not in self.__class__._domains_without_user_realm_discovery: 

166 resp = response or self._http_client.get( 

167 "https://{netloc}/common/userrealm/{username}?api-version=1.0".format( 

168 netloc=self.instance, username=username), 

169 headers={'Accept': 'application/json', 

170 'client-request-id': correlation_id},) 

171 if resp.status_code != 404: 

172 resp.raise_for_status() 

173 return json.loads(resp.text) 

174 self.__class__._domains_without_user_realm_discovery.add(self.instance) 

175 return {} # This can guide the caller to fall back normal ROPC flow 

176 

177 

178def canonicalize(authority_or_auth_endpoint): 

179 # Returns (url_parsed_result, hostname_in_lowercase, tenant) 

180 authority = urlparse(authority_or_auth_endpoint) 

181 if authority.scheme == "https": 

182 parts = authority.path.split("/") 

183 first_part = parts[1] if len(parts) >= 2 and parts[1] else None 

184 if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM 

185 # Use path in CIAM authority. It will be validated by OIDC Discovery soon 

186 tenant = first_part if first_part else "{}.onmicrosoft.com".format( 

187 # Fallback to sub domain name. This variation may not be advertised 

188 authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0]) 

189 return authority, authority.hostname, tenant 

190 # AAD 

191 if len(parts) >= 2 and parts[1]: 

192 return authority, authority.hostname, parts[1] 

193 raise ValueError( 

194 "Your given address (%s) should consist of " 

195 "an https url with a minimum of one segment in a path: e.g. " 

196 "https://login.microsoftonline.com/{tenant} " 

197 "or https://{tenant_name}.ciamlogin.com/{tenant} " 

198 "or https://{tenant_name}.b2clogin.com/{tenant_name}.onmicrosoft.com/policy" 

199 % authority_or_auth_endpoint) 

200 

201def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs): 

202 resp = http_client.get( 

203 instance_discovery_endpoint, 

204 params={'authorization_endpoint': url, 'api-version': '1.0'}, 

205 **kwargs) 

206 return json.loads(resp.text) 

207 

208def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs): 

209 # Returns Openid Configuration 

210 resp = http_client.get(tenant_discovery_endpoint, **kwargs) 

211 if resp.status_code == 200: 

212 return json.loads(resp.text) # It could raise ValueError 

213 if 400 <= resp.status_code < 500: 

214 # Nonexist tenant would hit this path 

215 # e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration 

216 raise ValueError("OIDC Discovery failed on {}. HTTP status: {}, Error: {}".format( 

217 tenant_discovery_endpoint, 

218 resp.status_code, 

219 resp.text, # Expose it as-is b/c OIDC defines no error response format 

220 )) 

221 # Transient network error would hit this path 

222 resp.raise_for_status() 

223 raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op 

224 "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text)) 

225