Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/compat.py: 51%

150 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13 

14import copy 

15import datetime 

16import sys 

17import inspect 

18import warnings 

19import hashlib 

20from http.client import HTTPMessage 

21import logging 

22import shlex 

23import re 

24import os 

25from collections import OrderedDict 

26from collections.abc import MutableMapping 

27from math import floor 

28 

29from botocore.vendored import six 

30from botocore.exceptions import MD5UnavailableError 

31from dateutil.tz import tzlocal 

32from urllib3 import exceptions 

33 

34logger = logging.getLogger(__name__) 

35 

36 

37class HTTPHeaders(HTTPMessage): 

38 pass 

39 

40from urllib.parse import ( 

41 quote, 

42 urlencode, 

43 unquote, 

44 unquote_plus, 

45 urlparse, 

46 urlsplit, 

47 urlunsplit, 

48 urljoin, 

49 parse_qsl, 

50 parse_qs, 

51) 

52from http.client import HTTPResponse 

53from io import IOBase as _IOBase 

54from base64 import encodebytes 

55from email.utils import formatdate 

56from itertools import zip_longest 

57file_type = _IOBase 

58zip = zip 

59 

60# In python3, unquote takes a str() object, url decodes it, 

61# then takes the bytestring and decodes it to utf-8. 

62unquote_str = unquote_plus 

63 

64def set_socket_timeout(http_response, timeout): 

65 """Set the timeout of the socket from an HTTPResponse. 

66 

67 :param http_response: An instance of ``httplib.HTTPResponse`` 

68 

69 """ 

70 http_response._fp.fp.raw._sock.settimeout(timeout) 

71 

72def accepts_kwargs(func): 

73 # In python3.4.1, there's backwards incompatible 

74 # changes when using getargspec with functools.partials. 

75 return inspect.getfullargspec(func)[2] 

76 

77def ensure_unicode(s, encoding=None, errors=None): 

78 # NOOP in Python 3, because every string is already unicode 

79 return s 

80 

81def ensure_bytes(s, encoding='utf-8', errors='strict'): 

82 if isinstance(s, str): 

83 return s.encode(encoding, errors) 

84 if isinstance(s, bytes): 

85 return s 

86 raise ValueError(f"Expected str or bytes, received {type(s)}.") 

87 

88 

89try: 

90 import xml.etree.cElementTree as ETree 

91except ImportError: 

92 # cElementTree does not exist from Python3.9+ 

93 import xml.etree.ElementTree as ETree 

94XMLParseError = ETree.ParseError 

95import json 

96 

97 

98def filter_ssl_warnings(): 

99 # Ignore warnings related to SNI as it is not being used in validations. 

100 warnings.filterwarnings( 

101 'ignore', 

102 message="A true SSLContext object is not available.*", 

103 category=exceptions.InsecurePlatformWarning, 

104 module=r".*urllib3\.util\.ssl_", 

105 ) 

106 

107 

108@classmethod 

109def from_dict(cls, d): 

110 new_instance = cls() 

111 for key, value in d.items(): 

112 new_instance[key] = value 

113 return new_instance 

114 

115 

116@classmethod 

117def from_pairs(cls, pairs): 

118 new_instance = cls() 

119 for key, value in pairs: 

120 new_instance[key] = value 

121 return new_instance 

122 

123 

124HTTPHeaders.from_dict = from_dict 

125HTTPHeaders.from_pairs = from_pairs 

126 

127 

128def copy_kwargs(kwargs): 

129 """ 

130 This used to be a compat shim for 2.6 but is now just an alias. 

131 """ 

132 copy_kwargs = copy.copy(kwargs) 

133 return copy_kwargs 

134 

135 

136def total_seconds(delta): 

137 """ 

138 Returns the total seconds in a ``datetime.timedelta``. 

139 

140 This used to be a compat shim for 2.6 but is now just an alias. 

141 

142 :param delta: The timedelta object 

143 :type delta: ``datetime.timedelta`` 

144 """ 

145 return delta.total_seconds() 

146 

147 

148# Checks to see if md5 is available on this system. A given system might not 

149# have access to it for various reasons, such as FIPS mode being enabled. 

150try: 

151 hashlib.md5() 

152 MD5_AVAILABLE = True 

153except ValueError: 

154 MD5_AVAILABLE = False 

155 

156 

157def get_md5(*args, **kwargs): 

158 """ 

159 Attempts to get an md5 hashing object. 

160 

161 :param raise_error_if_unavailable: raise an error if md5 is unavailable on 

162 this system. If False, None will be returned if it is unavailable. 

163 :type raise_error_if_unavailable: bool 

164 :param args: Args to pass to the MD5 constructor 

165 :param kwargs: Key word arguments to pass to the MD5 constructor 

166 :return: An MD5 hashing object if available. If it is unavailable, None 

167 is returned if raise_error_if_unavailable is set to False. 

168 """ 

169 if MD5_AVAILABLE: 

170 return hashlib.md5(*args, **kwargs) 

171 else: 

172 raise MD5UnavailableError() 

173 

174 

175def compat_shell_split(s, platform=None): 

176 if platform is None: 

177 platform = sys.platform 

178 

179 if platform == "win32": 

180 return _windows_shell_split(s) 

181 else: 

182 return shlex.split(s) 

183 

184 

185def _windows_shell_split(s): 

186 """Splits up a windows command as the built-in command parser would. 

187 

188 Windows has potentially bizarre rules depending on where you look. When 

189 spawning a process via the Windows C runtime (which is what python does 

190 when you call popen) the rules are as follows: 

191 

192 https://docs.microsoft.com/en-us/cpp/cpp/parsing-cpp-command-line-arguments 

193 

194 To summarize: 

195 

196 * Only space and tab are valid delimiters 

197 * Double quotes are the only valid quotes 

198 * Backslash is interpreted literally unless it is part of a chain that 

199 leads up to a double quote. Then the backslashes escape the backslashes, 

200 and if there is an odd number the final backslash escapes the quote. 

201 

202 :param s: The command string to split up into parts. 

203 :return: A list of command components. 

204 """ 

205 if not s: 

206 return [] 

207 

208 components = [] 

209 buff = [] 

210 is_quoted = False 

211 num_backslashes = 0 

212 for character in s: 

213 if character == '\\': 

214 # We can't simply append backslashes because we don't know if 

215 # they are being used as escape characters or not. Instead we 

216 # keep track of how many we've encountered and handle them when 

217 # we encounter a different character. 

218 num_backslashes += 1 

219 elif character == '"': 

220 if num_backslashes > 0: 

221 # The backslashes are in a chain leading up to a double 

222 # quote, so they are escaping each other. 

223 buff.append('\\' * int(floor(num_backslashes / 2))) 

224 remainder = num_backslashes % 2 

225 num_backslashes = 0 

226 if remainder == 1: 

227 # The number of backslashes is uneven, so they are also 

228 # escaping the double quote, so it needs to be added to 

229 # the current component buffer. 

230 buff.append('"') 

231 continue 

232 

233 # We've encountered a double quote that is not escaped, 

234 # so we toggle is_quoted. 

235 is_quoted = not is_quoted 

236 

237 # If there are quotes, then we may want an empty string. To be 

238 # safe, we add an empty string to the buffer so that we make 

239 # sure it sticks around if there's nothing else between quotes. 

240 # If there is other stuff between quotes, the empty string will 

241 # disappear during the joining process. 

242 buff.append('') 

243 elif character in [' ', '\t'] and not is_quoted: 

244 # Since the backslashes aren't leading up to a quote, we put in 

245 # the exact number of backslashes. 

246 if num_backslashes > 0: 

247 buff.append('\\' * num_backslashes) 

248 num_backslashes = 0 

249 

250 # Excess whitespace is ignored, so only add the components list 

251 # if there is anything in the buffer. 

252 if buff: 

253 components.append(''.join(buff)) 

254 buff = [] 

255 else: 

256 # Since the backslashes aren't leading up to a quote, we put in 

257 # the exact number of backslashes. 

258 if num_backslashes > 0: 

259 buff.append('\\' * num_backslashes) 

260 num_backslashes = 0 

261 buff.append(character) 

262 

263 # Quotes must be terminated. 

264 if is_quoted: 

265 raise ValueError(f"No closing quotation in string: {s}") 

266 

267 # There may be some leftover backslashes, so we need to add them in. 

268 # There's no quote so we add the exact number. 

269 if num_backslashes > 0: 

270 buff.append('\\' * num_backslashes) 

271 

272 # Add the final component in if there is anything in the buffer. 

273 if buff: 

274 components.append(''.join(buff)) 

275 

276 return components 

277 

278 

279def get_tzinfo_options(): 

280 # Due to dateutil/dateutil#197, Windows may fail to parse times in the past 

281 # with the system clock. We can alternatively fallback to tzwininfo when 

282 # this happens, which will get time info from the Windows registry. 

283 if sys.platform == 'win32': 

284 from dateutil.tz import tzwinlocal 

285 

286 return (tzlocal, tzwinlocal) 

287 else: 

288 return (tzlocal,) 

289 

290 

291# Detect if CRT is available for use 

292try: 

293 import awscrt.auth 

294 

295 # Allow user opt-out if needed 

296 disabled = os.environ.get('BOTO_DISABLE_CRT', "false") 

297 HAS_CRT = not disabled.lower() == 'true' 

298except ImportError: 

299 HAS_CRT = False 

300 

301 

302######################################################## 

303# urllib3 compat backports # 

304######################################################## 

305 

306# Vendoring IPv6 validation regex patterns from urllib3 

307# https://github.com/urllib3/urllib3/blob/7e856c0/src/urllib3/util/url.py 

308IPV4_PAT = r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}" 

309IPV4_RE = re.compile("^" + IPV4_PAT + "$") 

310HEX_PAT = "[0-9A-Fa-f]{1,4}" 

311LS32_PAT = "(?:{hex}:{hex}|{ipv4})".format(hex=HEX_PAT, ipv4=IPV4_PAT) 

312_subs = {"hex": HEX_PAT, "ls32": LS32_PAT} 

313_variations = [ 

314 # 6( h16 ":" ) ls32 

315 "(?:%(hex)s:){6}%(ls32)s", 

316 # "::" 5( h16 ":" ) ls32 

317 "::(?:%(hex)s:){5}%(ls32)s", 

318 # [ h16 ] "::" 4( h16 ":" ) ls32 

319 "(?:%(hex)s)?::(?:%(hex)s:){4}%(ls32)s", 

320 # [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 

321 "(?:(?:%(hex)s:)?%(hex)s)?::(?:%(hex)s:){3}%(ls32)s", 

322 # [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 

323 "(?:(?:%(hex)s:){0,2}%(hex)s)?::(?:%(hex)s:){2}%(ls32)s", 

324 # [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 

325 "(?:(?:%(hex)s:){0,3}%(hex)s)?::%(hex)s:%(ls32)s", 

326 # [ *4( h16 ":" ) h16 ] "::" ls32 

327 "(?:(?:%(hex)s:){0,4}%(hex)s)?::%(ls32)s", 

328 # [ *5( h16 ":" ) h16 ] "::" h16 

329 "(?:(?:%(hex)s:){0,5}%(hex)s)?::%(hex)s", 

330 # [ *6( h16 ":" ) h16 ] "::" 

331 "(?:(?:%(hex)s:){0,6}%(hex)s)?::", 

332] 

333 

334UNRESERVED_PAT = ( 

335 r"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._!\-~" 

336) 

337IPV6_PAT = "(?:" + "|".join([x % _subs for x in _variations]) + ")" 

338ZONE_ID_PAT = "(?:%25|%)(?:[" + UNRESERVED_PAT + "]|%[a-fA-F0-9]{2})+" 

339IPV6_ADDRZ_PAT = r"\[" + IPV6_PAT + r"(?:" + ZONE_ID_PAT + r")?\]" 

340IPV6_ADDRZ_RE = re.compile("^" + IPV6_ADDRZ_PAT + "$") 

341 

342# These are the characters that are stripped by post-bpo-43882 urlparse(). 

343UNSAFE_URL_CHARS = frozenset('\t\r\n') 

344 

345# Detect if gzip is available for use 

346try: 

347 import gzip 

348 HAS_GZIP = True 

349except ImportError: 

350 HAS_GZIP = False