Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/oauth2/sts.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

33 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""OAuth 2.0 Token Exchange Spec. 

16 

17This module defines a token exchange utility based on the `OAuth 2.0 Token 

18Exchange`_ spec. This will be mainly used to exchange external credentials 

19for GCP access tokens in workload identity pools to access Google APIs. 

20 

21The implementation will support various types of client authentication as 

22allowed in the spec. 

23 

24A deviation on the spec will be for additional Google specific options that 

25cannot be easily mapped to parameters defined in the RFC. 

26 

27The returned dictionary response will be based on the `rfc8693 section 2.2.1`_ 

28spec JSON response. 

29 

30.. _OAuth 2.0 Token Exchange: https://tools.ietf.org/html/rfc8693 

31.. _rfc8693 section 2.2.1: https://tools.ietf.org/html/rfc8693#section-2.2.1 

32""" 

33 

34import http.client as http_client 

35import json 

36import urllib 

37 

38from google.oauth2 import utils 

39 

40 

41_URLENCODED_HEADERS = {"Content-Type": "application/x-www-form-urlencoded"} 

42 

43 

44class Client(utils.OAuthClientAuthHandler): 

45 """Implements the OAuth 2.0 token exchange spec based on 

46 https://tools.ietf.org/html/rfc8693. 

47 """ 

48 

49 def __init__(self, token_exchange_endpoint, client_authentication=None): 

50 """Initializes an STS client instance. 

51 

52 Args: 

53 token_exchange_endpoint (str): The token exchange endpoint. 

54 client_authentication (Optional(google.oauth2.oauth2_utils.ClientAuthentication)): 

55 The optional OAuth client authentication credentials if available. 

56 """ 

57 super(Client, self).__init__(client_authentication) 

58 self._token_exchange_endpoint = token_exchange_endpoint 

59 

60 def _make_request(self, request, headers, request_body): 

61 # Initialize request headers. 

62 request_headers = _URLENCODED_HEADERS.copy() 

63 

64 # Inject additional headers. 

65 if headers: 

66 for k, v in dict(headers).items(): 

67 request_headers[k] = v 

68 

69 # Apply OAuth client authentication. 

70 self.apply_client_authentication_options(request_headers, request_body) 

71 

72 # Execute request. 

73 response = request( 

74 url=self._token_exchange_endpoint, 

75 method="POST", 

76 headers=request_headers, 

77 body=urllib.parse.urlencode(request_body).encode("utf-8"), 

78 ) 

79 

80 response_body = ( 

81 response.data.decode("utf-8") 

82 if hasattr(response.data, "decode") 

83 else response.data 

84 ) 

85 

86 # If non-200 response received, translate to OAuthError exception. 

87 if response.status != http_client.OK: 

88 utils.handle_error_response(response_body) 

89 

90 response_data = json.loads(response_body) 

91 

92 # Return successful response. 

93 return response_data 

94 

95 def exchange_token( 

96 self, 

97 request, 

98 grant_type, 

99 subject_token, 

100 subject_token_type, 

101 resource=None, 

102 audience=None, 

103 scopes=None, 

104 requested_token_type=None, 

105 actor_token=None, 

106 actor_token_type=None, 

107 additional_options=None, 

108 additional_headers=None, 

109 ): 

110 """Exchanges the provided token for another type of token based on the 

111 rfc8693 spec. 

112 

113 Args: 

114 request (google.auth.transport.Request): A callable used to make 

115 HTTP requests. 

116 grant_type (str): The OAuth 2.0 token exchange grant type. 

117 subject_token (str): The OAuth 2.0 token exchange subject token. 

118 subject_token_type (str): The OAuth 2.0 token exchange subject token type. 

119 resource (Optional[str]): The optional OAuth 2.0 token exchange resource field. 

120 audience (Optional[str]): The optional OAuth 2.0 token exchange audience field. 

121 scopes (Optional[Sequence[str]]): The optional list of scopes to use. 

122 requested_token_type (Optional[str]): The optional OAuth 2.0 token exchange requested 

123 token type. 

124 actor_token (Optional[str]): The optional OAuth 2.0 token exchange actor token. 

125 actor_token_type (Optional[str]): The optional OAuth 2.0 token exchange actor token type. 

126 additional_options (Optional[Mapping[str, str]]): The optional additional 

127 non-standard Google specific options. 

128 additional_headers (Optional[Mapping[str, str]]): The optional additional 

129 headers to pass to the token exchange endpoint. 

130 

131 Returns: 

132 Mapping[str, str]: The token exchange JSON-decoded response data containing 

133 the requested token and its expiration time. 

134 

135 Raises: 

136 google.auth.exceptions.OAuthError: If the token endpoint returned 

137 an error. 

138 """ 

139 # Initialize request body. 

140 request_body = { 

141 "grant_type": grant_type, 

142 "resource": resource, 

143 "audience": audience, 

144 "scope": " ".join(scopes or []), 

145 "requested_token_type": requested_token_type, 

146 "subject_token": subject_token, 

147 "subject_token_type": subject_token_type, 

148 "actor_token": actor_token, 

149 "actor_token_type": actor_token_type, 

150 "options": None, 

151 } 

152 # Add additional non-standard options. 

153 if additional_options: 

154 request_body["options"] = urllib.parse.quote(json.dumps(additional_options)) 

155 # Remove empty fields in request body. 

156 for k, v in dict(request_body).items(): 

157 if v is None or v == "": 

158 del request_body[k] 

159 

160 return self._make_request(request, additional_headers, request_body) 

161 

162 def refresh_token(self, request, refresh_token): 

163 """Exchanges a refresh token for an access token based on the 

164 RFC6749 spec. 

165 

166 Args: 

167 request (google.auth.transport.Request): A callable used to make 

168 HTTP requests. 

169 subject_token (str): The OAuth 2.0 refresh token. 

170 """ 

171 

172 return self._make_request( 

173 request, 

174 None, 

175 {"grant_type": "refresh_token", "refresh_token": refresh_token}, 

176 )