1# Copyright 2015 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper functions for commonly used utilities."""
16
17import base64
18import calendar
19import datetime
20from email.message import Message
21import sys
22import urllib
23
24from google.auth import exceptions
25
26# The smallest MDS cache used by this library stores tokens until 4 minutes from
27# expiry.
28REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45)
29
30
31def copy_docstring(source_class):
32 """Decorator that copies a method's docstring from another class.
33
34 Args:
35 source_class (type): The class that has the documented method.
36
37 Returns:
38 Callable: A decorator that will copy the docstring of the same
39 named method in the source class to the decorated method.
40 """
41
42 def decorator(method):
43 """Decorator implementation.
44
45 Args:
46 method (Callable): The method to copy the docstring to.
47
48 Returns:
49 Callable: the same method passed in with an updated docstring.
50
51 Raises:
52 google.auth.exceptions.InvalidOperation: if the method already has a docstring.
53 """
54 if method.__doc__:
55 raise exceptions.InvalidOperation("Method already has a docstring.")
56
57 source_method = getattr(source_class, method.__name__)
58 method.__doc__ = source_method.__doc__
59
60 return method
61
62 return decorator
63
64
65def parse_content_type(header_value):
66 """Parse a 'content-type' header value to get just the plain media-type (without parameters).
67
68 This is done using the class Message from email.message as suggested in PEP 594
69 (because the cgi is now deprecated and will be removed in python 3.13,
70 see https://peps.python.org/pep-0594/#cgi).
71
72 Args:
73 header_value (str): The value of a 'content-type' header as a string.
74
75 Returns:
76 str: A string with just the lowercase media-type from the parsed 'content-type' header.
77 If the provided content-type is not parsable, returns 'text/plain',
78 the default value for textual files.
79 """
80 m = Message()
81 m["content-type"] = header_value
82 return (
83 m.get_content_type()
84 ) # Despite the name, actually returns just the media-type
85
86
87def utcnow():
88 """Returns the current UTC datetime.
89
90 Returns:
91 datetime: The current time in UTC.
92 """
93 # We used datetime.utcnow() before, since it's deprecated from python 3.12,
94 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native
95 # (no timezone info), but "now()" is offset-aware (with timezone info).
96 # This will cause datetime comparison problem. For backward compatibility,
97 # we need to remove the timezone info.
98 now = datetime.datetime.now(datetime.timezone.utc)
99 now = now.replace(tzinfo=None)
100 return now
101
102
103def datetime_to_secs(value):
104 """Convert a datetime object to the number of seconds since the UNIX epoch.
105
106 Args:
107 value (datetime): The datetime to convert.
108
109 Returns:
110 int: The number of seconds since the UNIX epoch.
111 """
112 return calendar.timegm(value.utctimetuple())
113
114
115def to_bytes(value, encoding="utf-8"):
116 """Converts a string value to bytes, if necessary.
117
118 Args:
119 value (Union[str, bytes]): The value to be converted.
120 encoding (str): The encoding to use to convert unicode to bytes.
121 Defaults to "utf-8".
122
123 Returns:
124 bytes: The original value converted to bytes (if unicode) or as
125 passed in if it started out as bytes.
126
127 Raises:
128 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes.
129 """
130 result = value.encode(encoding) if isinstance(value, str) else value
131 if isinstance(result, bytes):
132 return result
133 else:
134 raise exceptions.InvalidValue(
135 "{0!r} could not be converted to bytes".format(value)
136 )
137
138
139def from_bytes(value):
140 """Converts bytes to a string value, if necessary.
141
142 Args:
143 value (Union[str, bytes]): The value to be converted.
144
145 Returns:
146 str: The original value converted to unicode (if bytes) or as passed in
147 if it started out as unicode.
148
149 Raises:
150 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode.
151 """
152 result = value.decode("utf-8") if isinstance(value, bytes) else value
153 if isinstance(result, str):
154 return result
155 else:
156 raise exceptions.InvalidValue(
157 "{0!r} could not be converted to unicode".format(value)
158 )
159
160
161def update_query(url, params, remove=None):
162 """Updates a URL's query parameters.
163
164 Replaces any current values if they are already present in the URL.
165
166 Args:
167 url (str): The URL to update.
168 params (Mapping[str, str]): A mapping of query parameter
169 keys to values.
170 remove (Sequence[str]): Parameters to remove from the query string.
171
172 Returns:
173 str: The URL with updated query parameters.
174
175 Examples:
176
177 >>> url = 'http://example.com?a=1'
178 >>> update_query(url, {'a': '2'})
179 http://example.com?a=2
180 >>> update_query(url, {'b': '3'})
181 http://example.com?a=1&b=3
182 >> update_query(url, {'b': '3'}, remove=['a'])
183 http://example.com?b=3
184
185 """
186 if remove is None:
187 remove = []
188
189 # Split the URL into parts.
190 parts = urllib.parse.urlparse(url)
191 # Parse the query string.
192 query_params = urllib.parse.parse_qs(parts.query)
193 # Update the query parameters with the new parameters.
194 query_params.update(params)
195 # Remove any values specified in remove.
196 query_params = {
197 key: value for key, value in query_params.items() if key not in remove
198 }
199 # Re-encoded the query string.
200 new_query = urllib.parse.urlencode(query_params, doseq=True)
201 # Unsplit the url.
202 new_parts = parts._replace(query=new_query)
203 return urllib.parse.urlunparse(new_parts)
204
205
206def scopes_to_string(scopes):
207 """Converts scope value to a string suitable for sending to OAuth 2.0
208 authorization servers.
209
210 Args:
211 scopes (Sequence[str]): The sequence of scopes to convert.
212
213 Returns:
214 str: The scopes formatted as a single string.
215 """
216 return " ".join(scopes)
217
218
219def string_to_scopes(scopes):
220 """Converts stringifed scopes value to a list.
221
222 Args:
223 scopes (Union[Sequence, str]): The string of space-separated scopes
224 to convert.
225 Returns:
226 Sequence(str): The separated scopes.
227 """
228 if not scopes:
229 return []
230
231 return scopes.split(" ")
232
233
234def padded_urlsafe_b64decode(value):
235 """Decodes base64 strings lacking padding characters.
236
237 Google infrastructure tends to omit the base64 padding characters.
238
239 Args:
240 value (Union[str, bytes]): The encoded value.
241
242 Returns:
243 bytes: The decoded value
244 """
245 b64string = to_bytes(value)
246 padded = b64string + b"=" * (-len(b64string) % 4)
247 return base64.urlsafe_b64decode(padded)
248
249
250def unpadded_urlsafe_b64encode(value):
251 """Encodes base64 strings removing any padding characters.
252
253 `rfc 7515`_ defines Base64url to NOT include any padding
254 characters, but the stdlib doesn't do that by default.
255
256 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6
257
258 Args:
259 value (Union[str|bytes]): The bytes-like value to encode
260
261 Returns:
262 Union[str|bytes]: The encoded value
263 """
264 return base64.urlsafe_b64encode(value).rstrip(b"=")
265
266
267def is_python_3():
268 """Check if the Python interpreter is Python 2 or 3.
269
270 Returns:
271 bool: True if the Python interpreter is Python 3 and False otherwise.
272 """
273 return sys.version_info > (3, 0)