Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/user_realm.py: 27%
89 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
1#------------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation.
4# All rights reserved.
5#
6# This code is licensed under the MIT License.
7#
8# Permission is hereby granted, free of charge, to any person obtaining a copy
9# of this software and associated documentation files(the "Software"), to deal
10# in the Software without restriction, including without limitation the rights
11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
12# copies of the Software, and to permit persons to whom the Software is
13# furnished to do so, subject to the following conditions :
14#
15# The above copyright notice and this permission notice shall be included in
16# all copies or substantial portions of the Software.
17#
18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24# THE SOFTWARE.
25#
26#------------------------------------------------------------------------------
27import json
29try:
30 from urllib.parse import quote, urlencode
31 from urllib.parse import urlunparse
32except ImportError:
33 from urllib import quote, urlencode #pylint: disable=no-name-in-module
34 from urlparse import urlunparse #pylint: disable=import-error
36import requests
38from . import constants
39from . import log
40from . import util
41from .adal_error import AdalError
43USER_REALM_PATH_TEMPLATE = 'common/UserRealm/<user>'
45ACCOUNT_TYPE = constants.UserRealm.account_type
46FEDERATION_PROTOCOL_TYPE = constants.UserRealm.federation_protocol_type
49class UserRealm(object):
51 def __init__(self, call_context, user_principle, authority_url):
53 self._log = log.Logger("UserRealm", call_context['log_context'])
54 self._call_context = call_context
55 self.api_version = '1.0'
56 self.federation_protocol = None
57 self.account_type = None
58 self.federation_metadata_url = None
59 self.federation_active_auth_url = None
60 self.cloud_audience_urn = None
61 self._user_principle = user_principle
62 self._authority_url = authority_url
64 def _get_user_realm_url(self):
66 url_components = list(util.copy_url(self._authority_url))
67 url_encoded_user = quote(self._user_principle, safe='~()*!.\'')
68 url_components[2] = '/' + USER_REALM_PATH_TEMPLATE.replace('<user>', url_encoded_user)
70 user_realm_query = {'api-version':self.api_version}
71 url_components[4] = urlencode(user_realm_query)
72 return util.copy_url(urlunparse(url_components))
74 @staticmethod
75 def _validate_constant_value(value_dic, value, case_sensitive=False):
77 if not value:
78 return False
80 if not case_sensitive:
81 value = value.lower()
83 return value if value in value_dic.values() else False
85 @staticmethod
86 def _validate_account_type(account_type):
87 return UserRealm._validate_constant_value(ACCOUNT_TYPE, account_type)
89 @staticmethod
90 def _validate_federation_protocol(protocol):
91 return UserRealm._validate_constant_value(FEDERATION_PROTOCOL_TYPE, protocol)
93 def _log_parsed_response(self):
95 self._log.debug(
96 'UserRealm response:\n'
97 ' AccountType: %(account_type)s\n'
98 ' FederationProtocol: %(federation_protocol)s\n'
99 ' FederationMetatdataUrl: %(federation_metadata_url)s\n'
100 ' FederationActiveAuthUrl: %(federation_active_auth_url)s',
101 {
102 "account_type": self.account_type,
103 "federation_protocol": self.federation_protocol,
104 "federation_metadata_url": self.federation_metadata_url,
105 "federation_active_auth_url": self.federation_active_auth_url,
106 })
108 def _parse_discovery_response(self, body):
110 self._log.debug("Discovery response:\n %(discovery_response)s",
111 {"discovery_response": body})
113 try:
114 response = json.loads(body)
115 except ValueError:
116 self._log.info(
117 "Parsing realm discovery response JSON failed for body: %(body)s",
118 {"body": body})
119 raise
121 account_type = UserRealm._validate_account_type(response['account_type'])
122 if not account_type:
123 raise AdalError('Cannot parse account_type: {}'.format(account_type))
124 self.account_type = account_type
126 if self.account_type == ACCOUNT_TYPE['Federated']:
127 protocol = UserRealm._validate_federation_protocol(response['federation_protocol'])
129 if not protocol:
130 raise AdalError('Cannot parse federation protocol: {}'.format(protocol))
132 self.federation_protocol = protocol
133 self.federation_metadata_url = response['federation_metadata_url']
134 self.federation_active_auth_url = response['federation_active_auth_url']
135 self.cloud_audience_urn = response.get('cloud_audience_urn', "urn:federation:MicrosoftOnline")
137 self._log_parsed_response()
139 def discover(self):
141 options = util.create_request_options(self, {'headers': {'Accept':'application/json'}})
142 user_realm_url = self._get_user_realm_url()
143 self._log.debug("Performing user realm discovery at: %(user_realm_url)s",
144 {"user_realm_url": user_realm_url.geturl()})
146 operation = 'User Realm Discovery'
147 resp = requests.get(user_realm_url.geturl(), headers=options['headers'],
148 proxies=self._call_context.get('proxies', None),
149 verify=self._call_context.get('verify_ssl', None))
150 util.log_return_correlation_id(self._log, operation, resp)
152 if resp.status_code == 429:
153 resp.raise_for_status() # Will raise requests.exceptions.HTTPError
154 if not util.is_http_success(resp.status_code):
155 return_error_string = u"{} request returned http error: {}".format(operation,
156 resp.status_code)
157 error_response = ""
158 if resp.text:
159 return_error_string = u"{} and server response: {}".format(return_error_string, resp.text)
160 try:
161 error_response = resp.json()
162 except ValueError:
163 pass
165 raise AdalError(return_error_string, error_response)
167 else:
168 self._parse_discovery_response(resp.text)