Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/mex.py: 17%
172 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
1#------------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation.
4# All rights reserved.
5#
6# This code is licensed under the MIT License.
7#
8# Permission is hereby granted, free of charge, to any person obtaining a copy
9# of this software and associated documentation files(the "Software"), to deal
10# in the Software without restriction, including without limitation the rights
11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
12# copies of the Software, and to permit persons to whom the Software is
13# furnished to do so, subject to the following conditions :
14#
15# The above copyright notice and this permission notice shall be included in
16# all copies or substantial portions of the Software.
17#
18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24# THE SOFTWARE.
25#
26#------------------------------------------------------------------------------
28try:
29 from urllib.parse import urlparse
30except ImportError:
31 from urlparse import urlparse # pylint: disable=import-error
33try:
34 from xml.etree import cElementTree as ET
35except ImportError:
36 from xml.etree import ElementTree as ET
38import requests
40from . import log
41from . import util
42from . import xmlutil
43from .constants import XmlNamespaces, WSTrustVersion
44from .adal_error import AdalError
46TRANSPORT_BINDING_XPATH = 'wsp:ExactlyOne/wsp:All/sp:TransportBinding'
47TRANSPORT_BINDING_2005_XPATH = 'wsp:ExactlyOne/wsp:All/sp2005:TransportBinding' #pylint: disable=invalid-name
49SOAP_ACTION_XPATH = 'wsdl:operation/soap12:operation'
50RST_SOAP_ACTION_13 = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue'
51RST_SOAP_ACTION_2005 = 'http://schemas.xmlsoap.org/ws/2005/02/trust/RST/Issue' #pylint: disable=invalid-name
52SOAP_TRANSPORT_XPATH = 'soap12:binding'
53SOAP_HTTP_TRANSPORT_VALUE = 'http://schemas.xmlsoap.org/soap/http'
55PORT_XPATH = 'wsdl:service/wsdl:port'
56ADDRESS_XPATH = 'wsa10:EndpointReference/wsa10:Address'
58def _url_is_secure(endpoint_url):
59 parsed = urlparse(endpoint_url)
60 return parsed.scheme == 'https'
62class Mex(object):
64 def __init__(self, call_context, url):
66 self._log = log.Logger("MEX", call_context.get('log_context'))
67 self._call_context = call_context
68 self._url = url
69 self._dom = None
70 self._parents = None
71 self._mex_doc = None
72 self.username_password_policy = {}
73 self._log.debug("Mex created with url: %(mex_url)s",
74 {"mex_url": self._url})
76 def discover(self):
77 options = util.create_request_options(self, {'headers': {'Content-Type': 'application/soap+xml'}})
79 try:
80 operation = "Mex Get"
81 resp = requests.get(self._url, headers=options['headers'],
82 verify=self._call_context.get('verify_ssl', None),
83 proxies=self._call_context.get('proxies', None))
84 util.log_return_correlation_id(self._log, operation, resp)
85 except Exception:
86 self._log.exception(
87 "%(operation)s request failed", {"operation": operation})
88 raise
90 if resp.status_code == 429:
91 resp.raise_for_status() # Will raise requests.exceptions.HTTPError
92 if not util.is_http_success(resp.status_code):
93 return_error_string = u"{} request returned http error: {}".format(operation, resp.status_code)
94 error_response = ""
95 if resp.text:
96 return_error_string = u"{} and server response: {}".format(return_error_string, resp.text)
97 try:
98 error_response = resp.json()
99 except ValueError:
100 pass
101 raise AdalError(return_error_string, error_response)
102 else:
103 try:
104 self._mex_doc = resp.text
105 #options = {'errorHandler':self._log.error}
106 self._dom = ET.fromstring(self._mex_doc)
107 self._parents = {c:p for p in self._dom.iter() for c in p}
108 self._parse()
109 except Exception:
110 self._log.info('Failed to parse mex response in to DOM')
111 raise
113 def _check_policy(self, policy_node):
114 policy_id = policy_node.attrib["{{{}}}Id".format(XmlNamespaces.namespaces['wsu'])]
116 # Try with Transport Binding XPath
117 transport_binding_nodes = xmlutil.xpath_find(policy_node, TRANSPORT_BINDING_XPATH)
119 # If unsuccessful, try again with 2005 XPath
120 if not transport_binding_nodes:
121 transport_binding_nodes = xmlutil.xpath_find(policy_node, TRANSPORT_BINDING_2005_XPATH)
123 # If we did not find any binding, this is potentially bad.
124 if not transport_binding_nodes:
125 self._log.debug(
126 "Potential policy did not match required transport binding: %(policy_id)s",
127 {"policy_id": policy_id})
128 else:
129 self._log.debug("Found matching policy id: %(policy_id)s",
130 {"policy_id": policy_id})
132 return policy_id
134 def _select_username_password_polices(self, xpath):
136 policies = {}
137 username_token_nodes = xmlutil.xpath_find(self._dom, xpath)
138 if not username_token_nodes:
139 self._log.warn("No username token policy nodes found.")
140 return
142 for node in username_token_nodes:
143 policy_node = self._parents[self._parents[self._parents[self._parents[self._parents[self._parents[self._parents[node]]]]]]]
144 policy_id = self._check_policy(policy_node)
145 if policy_id:
146 id_ref = '#' + policy_id
147 policies[id_ref] = {policy_id:id_ref}
149 return policies if policies else None
151 def _check_soap_action_and_transport(self, binding_node):
153 soap_action = ""
154 soap_transport = ""
155 name = binding_node.get('name')
157 soap_transport_attributes = ""
158 soap_action_attributes = xmlutil.xpath_find(binding_node, SOAP_ACTION_XPATH)[0].attrib['soapAction']
160 if soap_action_attributes:
161 soap_action = soap_action_attributes
162 soap_transport_attributes = xmlutil.xpath_find(binding_node, SOAP_TRANSPORT_XPATH)[0].attrib['transport']
164 if soap_transport_attributes:
165 soap_transport = soap_transport_attributes
167 if soap_transport == SOAP_HTTP_TRANSPORT_VALUE:
168 if soap_action == RST_SOAP_ACTION_13:
169 self._log.debug(
170 'found binding matching Action and Transport: %(binding_node)s',
171 {"binding_node": name})
172 return WSTrustVersion.WSTRUST13
173 elif soap_action == RST_SOAP_ACTION_2005:
174 self._log.debug(
175 'found binding matching Action and Transport: %(binding_node)s',
176 {"binding_node": name})
177 return WSTrustVersion.WSTRUST2005
179 self._log.debug(
180 'binding node did not match soap Action or Transport: %(binding_node)s',
181 {"binding_node": name})
182 return WSTrustVersion.UNDEFINED
184 def _get_matching_bindings(self, policies):
186 bindings = {}
187 binding_policy_ref_nodes = xmlutil.xpath_find(self._dom, 'wsdl:binding/wsp:PolicyReference')
189 for node in binding_policy_ref_nodes:
190 uri = node.get('URI')
191 policy = policies.get(uri)
192 if policy:
193 binding_node = self._parents[node]
194 binding_name = binding_node.get('name')
196 version = self._check_soap_action_and_transport(binding_node)
197 if version != WSTrustVersion.UNDEFINED:
198 bindings[binding_name] = {
199 'url': uri,
200 'version': version
201 }
203 return bindings if bindings else None
205 def _get_ports_for_policy_bindings(self, bindings, policies):
207 port_nodes = xmlutil.xpath_find(self._dom, PORT_XPATH)
208 if not port_nodes:
209 self._log.warn("No ports found")
211 for node in port_nodes:
212 binding_id = node.get('binding')
213 binding_id = binding_id.split(':')[-1]
215 trust_policy = bindings.get(binding_id)
216 if trust_policy:
217 binding_policy = policies.get(trust_policy.get('url'))
218 if binding_policy and not binding_policy.get('url', None):
219 binding_policy['version'] = trust_policy['version']
220 address_node = node.find(ADDRESS_XPATH, XmlNamespaces.namespaces)
221 if address_node is None:
222 raise AdalError("No address nodes on port")
224 address = xmlutil.find_element_text(address_node)
225 if _url_is_secure(address):
226 binding_policy['url'] = address
227 else:
228 self._log.warn(
229 "Skipping insecure endpoint: %(mex_endpoint)s",
230 {"mex_endpoint": address})
232 def _select_single_matching_policy(self, policies):
234 matching_policies = [p for p in policies.values() if p.get('url')]
235 if not matching_policies:
236 self._log.warn("No policies found with a url.")
237 return
239 wstrust13_policy = None
240 wstrust2005_policy = None
241 for policy in matching_policies:
242 version = policy.get('version', None)
243 if version == WSTrustVersion.WSTRUST13:
244 wstrust13_policy = policy
245 elif version == WSTrustVersion.WSTRUST2005:
246 wstrust2005_policy = policy
248 if wstrust13_policy is None and wstrust2005_policy is None:
249 self._log.warn('No policies found for either wstrust13 or wstrust2005')
251 self.username_password_policy = wstrust13_policy or wstrust2005_policy
253 def _parse(self):
254 policies = self._select_username_password_polices(
255 'wsp:Policy/wsp:ExactlyOne/wsp:All/sp:SignedEncryptedSupportingTokens/wsp:Policy/sp:UsernameToken/wsp:Policy/sp:WssUsernameToken10')
257 xpath2005 = 'wsp:Policy/wsp:ExactlyOne/wsp:All/sp2005:SignedSupportingTokens/wsp:Policy/sp2005:UsernameToken/wsp:Policy/sp2005:WssUsernameToken10'
258 if policies:
259 policies2005 = self._select_username_password_polices(xpath2005)
260 if policies2005:
261 policies.update(policies2005)
262 else:
263 policies = self._select_username_password_polices(xpath2005)
265 if not policies:
266 raise AdalError("No matching policies.")
269 bindings = self._get_matching_bindings(policies)
270 if not bindings:
271 raise AdalError("No matching bindings.")
273 self._get_ports_for_policy_bindings(bindings, policies)
274 self._select_single_matching_policy(policies)
276 if not self._url:
277 raise AdalError("No ws-trust endpoints match requirements.")