Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/_helpers.py: 46%

54 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:37 +0000

1# Copyright 2015 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for commonly used utilities.""" 

16 

17import base64 

18import calendar 

19import datetime 

20import sys 

21import urllib 

22 

23from google.auth import exceptions 

24 

25# Token server doesn't provide a new a token when doing refresh unless the 

26# token is expiring within 30 seconds, so refresh threshold should not be 

27# more than 30 seconds. Otherwise auth lib will send tons of refresh requests 

28# until 30 seconds before the expiration, and cause a spike of CPU usage. 

29REFRESH_THRESHOLD = datetime.timedelta(seconds=20) 

30 

31 

32def copy_docstring(source_class): 

33 """Decorator that copies a method's docstring from another class. 

34 

35 Args: 

36 source_class (type): The class that has the documented method. 

37 

38 Returns: 

39 Callable: A decorator that will copy the docstring of the same 

40 named method in the source class to the decorated method. 

41 """ 

42 

43 def decorator(method): 

44 """Decorator implementation. 

45 

46 Args: 

47 method (Callable): The method to copy the docstring to. 

48 

49 Returns: 

50 Callable: the same method passed in with an updated docstring. 

51 

52 Raises: 

53 google.auth.exceptions.InvalidOperation: if the method already has a docstring. 

54 """ 

55 if method.__doc__: 

56 raise exceptions.InvalidOperation("Method already has a docstring.") 

57 

58 source_method = getattr(source_class, method.__name__) 

59 method.__doc__ = source_method.__doc__ 

60 

61 return method 

62 

63 return decorator 

64 

65 

66def utcnow(): 

67 """Returns the current UTC datetime. 

68 

69 Returns: 

70 datetime: The current time in UTC. 

71 """ 

72 return datetime.datetime.utcnow() 

73 

74 

75def datetime_to_secs(value): 

76 """Convert a datetime object to the number of seconds since the UNIX epoch. 

77 

78 Args: 

79 value (datetime): The datetime to convert. 

80 

81 Returns: 

82 int: The number of seconds since the UNIX epoch. 

83 """ 

84 return calendar.timegm(value.utctimetuple()) 

85 

86 

87def to_bytes(value, encoding="utf-8"): 

88 """Converts a string value to bytes, if necessary. 

89 

90 Args: 

91 value (Union[str, bytes]): The value to be converted. 

92 encoding (str): The encoding to use to convert unicode to bytes. 

93 Defaults to "utf-8". 

94 

95 Returns: 

96 bytes: The original value converted to bytes (if unicode) or as 

97 passed in if it started out as bytes. 

98 

99 Raises: 

100 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes. 

101 """ 

102 result = value.encode(encoding) if isinstance(value, str) else value 

103 if isinstance(result, bytes): 

104 return result 

105 else: 

106 raise exceptions.InvalidValue( 

107 "{0!r} could not be converted to bytes".format(value) 

108 ) 

109 

110 

111def from_bytes(value): 

112 """Converts bytes to a string value, if necessary. 

113 

114 Args: 

115 value (Union[str, bytes]): The value to be converted. 

116 

117 Returns: 

118 str: The original value converted to unicode (if bytes) or as passed in 

119 if it started out as unicode. 

120 

121 Raises: 

122 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode. 

123 """ 

124 result = value.decode("utf-8") if isinstance(value, bytes) else value 

125 if isinstance(result, str): 

126 return result 

127 else: 

128 raise exceptions.InvalidValue( 

129 "{0!r} could not be converted to unicode".format(value) 

130 ) 

131 

132 

133def update_query(url, params, remove=None): 

134 """Updates a URL's query parameters. 

135 

136 Replaces any current values if they are already present in the URL. 

137 

138 Args: 

139 url (str): The URL to update. 

140 params (Mapping[str, str]): A mapping of query parameter 

141 keys to values. 

142 remove (Sequence[str]): Parameters to remove from the query string. 

143 

144 Returns: 

145 str: The URL with updated query parameters. 

146 

147 Examples: 

148 

149 >>> url = 'http://example.com?a=1' 

150 >>> update_query(url, {'a': '2'}) 

151 http://example.com?a=2 

152 >>> update_query(url, {'b': '3'}) 

153 http://example.com?a=1&b=3 

154 >> update_query(url, {'b': '3'}, remove=['a']) 

155 http://example.com?b=3 

156 

157 """ 

158 if remove is None: 

159 remove = [] 

160 

161 # Split the URL into parts. 

162 parts = urllib.parse.urlparse(url) 

163 # Parse the query string. 

164 query_params = urllib.parse.parse_qs(parts.query) 

165 # Update the query parameters with the new parameters. 

166 query_params.update(params) 

167 # Remove any values specified in remove. 

168 query_params = { 

169 key: value for key, value in query_params.items() if key not in remove 

170 } 

171 # Re-encoded the query string. 

172 new_query = urllib.parse.urlencode(query_params, doseq=True) 

173 # Unsplit the url. 

174 new_parts = parts._replace(query=new_query) 

175 return urllib.parse.urlunparse(new_parts) 

176 

177 

178def scopes_to_string(scopes): 

179 """Converts scope value to a string suitable for sending to OAuth 2.0 

180 authorization servers. 

181 

182 Args: 

183 scopes (Sequence[str]): The sequence of scopes to convert. 

184 

185 Returns: 

186 str: The scopes formatted as a single string. 

187 """ 

188 return " ".join(scopes) 

189 

190 

191def string_to_scopes(scopes): 

192 """Converts stringifed scopes value to a list. 

193 

194 Args: 

195 scopes (Union[Sequence, str]): The string of space-separated scopes 

196 to convert. 

197 Returns: 

198 Sequence(str): The separated scopes. 

199 """ 

200 if not scopes: 

201 return [] 

202 

203 return scopes.split(" ") 

204 

205 

206def padded_urlsafe_b64decode(value): 

207 """Decodes base64 strings lacking padding characters. 

208 

209 Google infrastructure tends to omit the base64 padding characters. 

210 

211 Args: 

212 value (Union[str, bytes]): The encoded value. 

213 

214 Returns: 

215 bytes: The decoded value 

216 """ 

217 b64string = to_bytes(value) 

218 padded = b64string + b"=" * (-len(b64string) % 4) 

219 return base64.urlsafe_b64decode(padded) 

220 

221 

222def unpadded_urlsafe_b64encode(value): 

223 """Encodes base64 strings removing any padding characters. 

224 

225 `rfc 7515`_ defines Base64url to NOT include any padding 

226 characters, but the stdlib doesn't do that by default. 

227 

228 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6 

229 

230 Args: 

231 value (Union[str|bytes]): The bytes-like value to encode 

232 

233 Returns: 

234 Union[str|bytes]: The encoded value 

235 """ 

236 return base64.urlsafe_b64encode(value).rstrip(b"=") 

237 

238 

239def is_python_3(): 

240 """Check if the Python interpreter is Python 2 or 3. 

241 

242 Returns: 

243 bool: True if the Python interpreter is Python 3 and False otherwise. 

244 """ 

245 return sys.version_info > (3, 0)