Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/oauth2/utils.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

50 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""OAuth 2.0 Utilities. 

16 

17This module provides implementations for various OAuth 2.0 utilities. 

18This includes `OAuth error handling`_ and 

19`Client authentication for OAuth flows`_. 

20 

21OAuth error handling 

22-------------------- 

23This will define interfaces for handling OAuth related error responses as 

24stated in `RFC 6749 section 5.2`_. 

25This will include a common function to convert these HTTP error responses to a 

26:class:`google.auth.exceptions.OAuthError` exception. 

27 

28 

29Client authentication for OAuth flows 

30------------------------------------- 

31We introduce an interface for defining client authentication credentials based 

32on `RFC 6749 section 2.3.1`_. This will expose the following 

33capabilities: 

34 

35 * Ability to support basic authentication via request header. 

36 * Ability to support bearer token authentication via request header. 

37 * Ability to support client ID / secret authentication via request body. 

38 

39.. _RFC 6749 section 2.3.1: https://tools.ietf.org/html/rfc6749#section-2.3.1 

40.. _RFC 6749 section 5.2: https://tools.ietf.org/html/rfc6749#section-5.2 

41""" 

42 

43import abc 

44import base64 

45import enum 

46import json 

47 

48from google.auth import exceptions 

49 

50 

51# OAuth client authentication based on 

52# https://tools.ietf.org/html/rfc6749#section-2.3. 

53class ClientAuthType(enum.Enum): 

54 basic = 1 

55 request_body = 2 

56 

57 

58class ClientAuthentication(object): 

59 """Defines the client authentication credentials for basic and request-body 

60 types based on https://tools.ietf.org/html/rfc6749#section-2.3.1. 

61 """ 

62 

63 def __init__(self, client_auth_type, client_id, client_secret=None): 

64 """Instantiates a client authentication object containing the client ID 

65 and secret credentials for basic and response-body auth. 

66 

67 Args: 

68 client_auth_type (google.oauth2.oauth_utils.ClientAuthType): The 

69 client authentication type. 

70 client_id (str): The client ID. 

71 client_secret (Optional[str]): The client secret. 

72 """ 

73 self.client_auth_type = client_auth_type 

74 self.client_id = client_id 

75 self.client_secret = client_secret 

76 

77 

78class OAuthClientAuthHandler(metaclass=abc.ABCMeta): 

79 """Abstract class for handling client authentication in OAuth-based 

80 operations. 

81 """ 

82 

83 def __init__(self, client_authentication=None): 

84 """Instantiates an OAuth client authentication handler. 

85 

86 Args: 

87 client_authentication (Optional[google.oauth2.utils.ClientAuthentication]): 

88 The OAuth client authentication credentials if available. 

89 """ 

90 super(OAuthClientAuthHandler, self).__init__() 

91 self._client_authentication = client_authentication 

92 

93 def apply_client_authentication_options( 

94 self, headers, request_body=None, bearer_token=None 

95 ): 

96 """Applies client authentication on the OAuth request's headers or POST 

97 body. 

98 

99 Args: 

100 headers (Mapping[str, str]): The HTTP request header. 

101 request_body (Optional[Mapping[str, str]]): The HTTP request body 

102 dictionary. For requests that do not support request body, this 

103 is None and will be ignored. 

104 bearer_token (Optional[str]): The optional bearer token. 

105 """ 

106 # Inject authenticated header. 

107 self._inject_authenticated_headers(headers, bearer_token) 

108 # Inject authenticated request body. 

109 if bearer_token is None: 

110 self._inject_authenticated_request_body(request_body) 

111 

112 def _inject_authenticated_headers(self, headers, bearer_token=None): 

113 if bearer_token is not None: 

114 headers["Authorization"] = "Bearer %s" % bearer_token 

115 elif ( 

116 self._client_authentication is not None 

117 and self._client_authentication.client_auth_type is ClientAuthType.basic 

118 ): 

119 username = self._client_authentication.client_id 

120 password = self._client_authentication.client_secret or "" 

121 

122 credentials = base64.b64encode( 

123 ("%s:%s" % (username, password)).encode() 

124 ).decode() 

125 headers["Authorization"] = "Basic %s" % credentials 

126 

127 def _inject_authenticated_request_body(self, request_body): 

128 if ( 

129 self._client_authentication is not None 

130 and self._client_authentication.client_auth_type 

131 is ClientAuthType.request_body 

132 ): 

133 if request_body is None: 

134 raise exceptions.OAuthError( 

135 "HTTP request does not support request-body" 

136 ) 

137 else: 

138 request_body["client_id"] = self._client_authentication.client_id 

139 request_body["client_secret"] = ( 

140 self._client_authentication.client_secret or "" 

141 ) 

142 

143 

144def handle_error_response(response_body): 

145 """Translates an error response from an OAuth operation into an 

146 OAuthError exception. 

147 

148 Args: 

149 response_body (str): The decoded response data. 

150 

151 Raises: 

152 google.auth.exceptions.OAuthError 

153 """ 

154 try: 

155 error_components = [] 

156 error_data = json.loads(response_body) 

157 

158 error_components.append("Error code {}".format(error_data["error"])) 

159 if "error_description" in error_data: 

160 error_components.append(": {}".format(error_data["error_description"])) 

161 if "error_uri" in error_data: 

162 error_components.append(" - {}".format(error_data["error_uri"])) 

163 error_details = "".join(error_components) 

164 # If no details could be extracted, use the response data. 

165 except (KeyError, ValueError): 

166 error_details = response_body 

167 

168 raise exceptions.OAuthError(error_details, response_body)