Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/wstrust_request.py: 27%
77 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
1#------------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation.
4# All rights reserved.
5#
6# This code is licensed under the MIT License.
7#
8# Permission is hereby granted, free of charge, to any person obtaining a copy
9# of this software and associated documentation files(the "Software"), to deal
10# in the Software without restriction, including without limitation the rights
11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
12# copies of the Software, and to permit persons to whom the Software is
13# furnished to do so, subject to the following conditions :
14#
15# The above copyright notice and this permission notice shall be included in
16# all copies or substantial portions of the Software.
17#
18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24# THE SOFTWARE.
25#
26#------------------------------------------------------------------------------
28import uuid
29from datetime import datetime, timedelta
31import requests
33from . import log
34from . import util
35from . import wstrust_response
36from .adal_error import AdalError
37from .constants import WSTrustVersion
39_USERNAME_PLACEHOLDER = '{UsernamePlaceHolder}'
40_PASSWORD_PLACEHOLDER = '{PasswordPlaceHolder}'
42class WSTrustRequest(object):
44 def __init__(self, call_context, wstrust_endpoint_url, applies_to, wstrust_endpoint_version):
45 self._log = log.Logger('WSTrustRequest', call_context['log_context'])
46 self._call_context = call_context
47 self._wstrust_endpoint_url = wstrust_endpoint_url
48 self._applies_to = applies_to
49 self._wstrust_endpoint_version = wstrust_endpoint_version
51 @staticmethod
52 def _build_security_header():
54 time_now = datetime.utcnow()
55 expire_time = time_now + timedelta(minutes=10)
57 time_now_str = time_now.isoformat()[:-3] + 'Z'
58 expire_time_str = expire_time.isoformat()[:-3] + 'Z'
60 security_header_xml = ("<wsse:Security s:mustUnderstand='1' xmlns:wsse='http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'>"
61 "<wsu:Timestamp wsu:Id=\'_0\'>"
62 "<wsu:Created>" + time_now_str + "</wsu:Created>"
63 "<wsu:Expires>" + expire_time_str + "</wsu:Expires>"
64 "</wsu:Timestamp>"
65 "<wsse:UsernameToken wsu:Id='ADALUsernameToken'>"
66 "<wsse:Username>" + _USERNAME_PLACEHOLDER + "</wsse:Username>"
67 "<wsse:Password>" + _PASSWORD_PLACEHOLDER + "</wsse:Password>"
68 "</wsse:UsernameToken>"
69 "</wsse:Security>")
71 return security_header_xml
73 @staticmethod
74 def _populate_rst_username_password(template, username, password):
75 password = WSTrustRequest._escape_password(password)
76 return template.replace(_USERNAME_PLACEHOLDER, username).replace(_PASSWORD_PLACEHOLDER, password)
78 @staticmethod
79 def _escape_password(password):
80 return password.replace('&', '&').replace('"', '"').replace("'", ''').replace('<', '<').replace('>', '>')
82 def _build_rst(self, username, password):
83 message_id = str(uuid.uuid4())
85 schema_location = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd'
86 soap_action = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue'
87 rst_trust_namespace = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512'
88 key_type = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/Bearer'
89 request_type = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/Issue'
91 if self._wstrust_endpoint_version == WSTrustVersion.WSTRUST2005:
92 soap_action = 'http://schemas.xmlsoap.org/ws/2005/02/trust/RST/Issue'
93 rst_trust_namespace = 'http://schemas.xmlsoap.org/ws/2005/02/trust'
94 key_type = 'http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey'
95 request_type = 'http://schemas.xmlsoap.org/ws/2005/02/trust/Issue'
97 rst_template = ("<s:Envelope xmlns:s='http://www.w3.org/2003/05/soap-envelope' xmlns:wsa='http://www.w3.org/2005/08/addressing' xmlns:wsu='{}'>".format(schema_location) +
98 "<s:Header>" +
99 "<wsa:Action s:mustUnderstand='1'>{}</wsa:Action>".format(soap_action) +
100 "<wsa:messageID>urn:uuid:{}</wsa:messageID>".format(message_id) +
101 "<wsa:ReplyTo>" +
102 "<wsa:Address>http://www.w3.org/2005/08/addressing/anonymous</wsa:Address>" +
103 "</wsa:ReplyTo>" +
104 "<wsa:To s:mustUnderstand='1'>{}</wsa:To>".format(self._wstrust_endpoint_url) +
105 WSTrustRequest._build_security_header() +
106 "</s:Header>" +
107 "<s:Body>" +
108 "<wst:RequestSecurityToken xmlns:wst='{}'>".format(rst_trust_namespace) +
109 "<wsp:AppliesTo xmlns:wsp='http://schemas.xmlsoap.org/ws/2004/09/policy'>" +
110 "<wsa:EndpointReference>" +
111 "<wsa:Address>{}</wsa:Address>".format(self._applies_to) +
112 "</wsa:EndpointReference>" +
113 "</wsp:AppliesTo>" +
114 "<wst:KeyType>{}</wst:KeyType>".format(key_type) +
115 "<wst:RequestType>{}</wst:RequestType>".format(request_type) +
116 "</wst:RequestSecurityToken>" +
117 "</s:Body>" +
118 "</s:Envelope>")
120 self._log.debug('Created RST: \n %(rst_template)s',
121 {"rst_template": rst_template})
122 return WSTrustRequest._populate_rst_username_password(rst_template, username, password)
124 def _handle_rstr(self, body):
125 wstrust_resp = wstrust_response.WSTrustResponse(self._call_context, body, self._wstrust_endpoint_version)
126 wstrust_resp.parse()
127 return wstrust_resp
129 def acquire_token(self, username, password):
130 if self._wstrust_endpoint_version == WSTrustVersion.UNDEFINED:
131 raise AdalError('Unsupported wstrust endpoint version. Current support version is wstrust2005 or wstrust13.')
133 rst = self._build_rst(username, password)
134 if self._wstrust_endpoint_version == WSTrustVersion.WSTRUST2005:
135 soap_action = 'http://schemas.xmlsoap.org/ws/2005/02/trust/RST/Issue'
136 else:
137 soap_action = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue'
139 headers = {'headers': {'Content-type':'application/soap+xml; charset=utf-8',
140 'SOAPAction': soap_action},
141 'body': rst}
142 options = util.create_request_options(self, headers)
143 self._log.debug("Sending RST to: %(wstrust_endpoint)s",
144 {"wstrust_endpoint": self._wstrust_endpoint_url})
146 operation = "WS-Trust RST"
147 resp = requests.post(self._wstrust_endpoint_url, headers=options['headers'], data=rst,
148 allow_redirects=True,
149 verify=self._call_context.get('verify_ssl', None),
150 proxies=self._call_context.get('proxies', None),
151 timeout=self._call_context.get('timeout', None))
153 util.log_return_correlation_id(self._log, operation, resp)
155 if resp.status_code == 429:
156 resp.raise_for_status() # Will raise requests.exceptions.HTTPError
157 if not util.is_http_success(resp.status_code):
158 return_error_string = u"{} request returned http error: {}".format(operation, resp.status_code)
159 error_response = ""
160 if resp.text:
161 return_error_string = u"{} and server response: {}".format(return_error_string, resp.text)
162 try:
163 error_response = resp.json()
164 except ValueError:
165 pass
167 raise AdalError(return_error_string, error_response)
168 else:
169 return self._handle_rstr(resp.text)