Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/wstrust_request.py: 27%

77 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:05 +0000

1#------------------------------------------------------------------------------ 

2# 

3# Copyright (c) Microsoft Corporation.  

4# All rights reserved. 

5#  

6# This code is licensed under the MIT License. 

7#  

8# Permission is hereby granted, free of charge, to any person obtaining a copy 

9# of this software and associated documentation files(the "Software"), to deal 

10# in the Software without restriction, including without limitation the rights 

11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell 

12# copies of the Software, and to permit persons to whom the Software is 

13# furnished to do so, subject to the following conditions : 

14#  

15# The above copyright notice and this permission notice shall be included in 

16# all copies or substantial portions of the Software. 

17#  

18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE 

21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 

24# THE SOFTWARE. 

25# 

26#------------------------------------------------------------------------------ 

27 

28import uuid 

29from datetime import datetime, timedelta 

30 

31import requests 

32 

33from . import log 

34from . import util 

35from . import wstrust_response 

36from .adal_error import AdalError 

37from .constants import WSTrustVersion 

38 

39_USERNAME_PLACEHOLDER = '{UsernamePlaceHolder}' 

40_PASSWORD_PLACEHOLDER = '{PasswordPlaceHolder}' 

41 

42class WSTrustRequest(object): 

43 

44 def __init__(self, call_context, wstrust_endpoint_url, applies_to, wstrust_endpoint_version): 

45 self._log = log.Logger('WSTrustRequest', call_context['log_context']) 

46 self._call_context = call_context 

47 self._wstrust_endpoint_url = wstrust_endpoint_url 

48 self._applies_to = applies_to 

49 self._wstrust_endpoint_version = wstrust_endpoint_version 

50 

51 @staticmethod 

52 def _build_security_header(): 

53 

54 time_now = datetime.utcnow() 

55 expire_time = time_now + timedelta(minutes=10) 

56 

57 time_now_str = time_now.isoformat()[:-3] + 'Z' 

58 expire_time_str = expire_time.isoformat()[:-3] + 'Z' 

59 

60 security_header_xml = ("<wsse:Security s:mustUnderstand='1' xmlns:wsse='http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'>" 

61 "<wsu:Timestamp wsu:Id=\'_0\'>" 

62 "<wsu:Created>" + time_now_str + "</wsu:Created>" 

63 "<wsu:Expires>" + expire_time_str + "</wsu:Expires>" 

64 "</wsu:Timestamp>" 

65 "<wsse:UsernameToken wsu:Id='ADALUsernameToken'>" 

66 "<wsse:Username>" + _USERNAME_PLACEHOLDER + "</wsse:Username>" 

67 "<wsse:Password>" + _PASSWORD_PLACEHOLDER + "</wsse:Password>" 

68 "</wsse:UsernameToken>" 

69 "</wsse:Security>") 

70 

71 return security_header_xml 

72 

73 @staticmethod 

74 def _populate_rst_username_password(template, username, password): 

75 password = WSTrustRequest._escape_password(password) 

76 return template.replace(_USERNAME_PLACEHOLDER, username).replace(_PASSWORD_PLACEHOLDER, password) 

77 

78 @staticmethod 

79 def _escape_password(password): 

80 return password.replace('&', '&amp;').replace('"', '&quot;').replace("'", '&apos;').replace('<', '&lt;').replace('>', '&gt;') 

81 

82 def _build_rst(self, username, password): 

83 message_id = str(uuid.uuid4()) 

84 

85 schema_location = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd' 

86 soap_action = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue' 

87 rst_trust_namespace = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512' 

88 key_type = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/Bearer' 

89 request_type = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/Issue' 

90 

91 if self._wstrust_endpoint_version == WSTrustVersion.WSTRUST2005: 

92 soap_action = 'http://schemas.xmlsoap.org/ws/2005/02/trust/RST/Issue' 

93 rst_trust_namespace = 'http://schemas.xmlsoap.org/ws/2005/02/trust' 

94 key_type = 'http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey' 

95 request_type = 'http://schemas.xmlsoap.org/ws/2005/02/trust/Issue' 

96 

97 rst_template = ("<s:Envelope xmlns:s='http://www.w3.org/2003/05/soap-envelope' xmlns:wsa='http://www.w3.org/2005/08/addressing' xmlns:wsu='{}'>".format(schema_location) + 

98 "<s:Header>" + 

99 "<wsa:Action s:mustUnderstand='1'>{}</wsa:Action>".format(soap_action) + 

100 "<wsa:messageID>urn:uuid:{}</wsa:messageID>".format(message_id) + 

101 "<wsa:ReplyTo>" + 

102 "<wsa:Address>http://www.w3.org/2005/08/addressing/anonymous</wsa:Address>" + 

103 "</wsa:ReplyTo>" + 

104 "<wsa:To s:mustUnderstand='1'>{}</wsa:To>".format(self._wstrust_endpoint_url) + 

105 WSTrustRequest._build_security_header() + 

106 "</s:Header>" + 

107 "<s:Body>" + 

108 "<wst:RequestSecurityToken xmlns:wst='{}'>".format(rst_trust_namespace) + 

109 "<wsp:AppliesTo xmlns:wsp='http://schemas.xmlsoap.org/ws/2004/09/policy'>" + 

110 "<wsa:EndpointReference>" + 

111 "<wsa:Address>{}</wsa:Address>".format(self._applies_to) + 

112 "</wsa:EndpointReference>" + 

113 "</wsp:AppliesTo>" + 

114 "<wst:KeyType>{}</wst:KeyType>".format(key_type) + 

115 "<wst:RequestType>{}</wst:RequestType>".format(request_type) + 

116 "</wst:RequestSecurityToken>" + 

117 "</s:Body>" + 

118 "</s:Envelope>") 

119 

120 self._log.debug('Created RST: \n %(rst_template)s', 

121 {"rst_template": rst_template}) 

122 return WSTrustRequest._populate_rst_username_password(rst_template, username, password) 

123 

124 def _handle_rstr(self, body): 

125 wstrust_resp = wstrust_response.WSTrustResponse(self._call_context, body, self._wstrust_endpoint_version) 

126 wstrust_resp.parse() 

127 return wstrust_resp 

128 

129 def acquire_token(self, username, password): 

130 if self._wstrust_endpoint_version == WSTrustVersion.UNDEFINED: 

131 raise AdalError('Unsupported wstrust endpoint version. Current support version is wstrust2005 or wstrust13.') 

132 

133 rst = self._build_rst(username, password) 

134 if self._wstrust_endpoint_version == WSTrustVersion.WSTRUST2005: 

135 soap_action = 'http://schemas.xmlsoap.org/ws/2005/02/trust/RST/Issue' 

136 else: 

137 soap_action = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue' 

138 

139 headers = {'headers': {'Content-type':'application/soap+xml; charset=utf-8', 

140 'SOAPAction': soap_action}, 

141 'body': rst} 

142 options = util.create_request_options(self, headers) 

143 self._log.debug("Sending RST to: %(wstrust_endpoint)s", 

144 {"wstrust_endpoint": self._wstrust_endpoint_url}) 

145 

146 operation = "WS-Trust RST" 

147 resp = requests.post(self._wstrust_endpoint_url, headers=options['headers'], data=rst, 

148 allow_redirects=True, 

149 verify=self._call_context.get('verify_ssl', None), 

150 proxies=self._call_context.get('proxies', None), 

151 timeout=self._call_context.get('timeout', None)) 

152 

153 util.log_return_correlation_id(self._log, operation, resp) 

154 

155 if resp.status_code == 429: 

156 resp.raise_for_status() # Will raise requests.exceptions.HTTPError 

157 if not util.is_http_success(resp.status_code): 

158 return_error_string = u"{} request returned http error: {}".format(operation, resp.status_code) 

159 error_response = "" 

160 if resp.text: 

161 return_error_string = u"{} and server response: {}".format(return_error_string, resp.text) 

162 try: 

163 error_response = resp.json() 

164 except ValueError: 

165 pass 

166 

167 raise AdalError(return_error_string, error_response) 

168 else: 

169 return self._handle_rstr(resp.text)