Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/SQLAlchemy-1.3.25.dev0-py3.11-linux-x86_64.egg/sqlalchemy/util/compat.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

322 statements  

1# util/compat.py 

2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: http://www.opensource.org/licenses/mit-license.php 

7 

8"""Handle Python version/platform incompatibilities.""" 

9 

10import collections 

11import contextlib 

12import inspect 

13import operator 

14import platform 

15import sys 

16 

17 

18py36 = sys.version_info >= (3, 6) 

19py33 = sys.version_info >= (3, 3) 

20py35 = sys.version_info >= (3, 5) 

21py32 = sys.version_info >= (3, 2) 

22py3k = sys.version_info >= (3, 0) 

23py2k = sys.version_info < (3, 0) 

24py265 = sys.version_info >= (2, 6, 5) 

25jython = sys.platform.startswith("java") 

26pypy = hasattr(sys, "pypy_version_info") 

27 

28win32 = sys.platform.startswith("win") 

29osx = sys.platform.startswith("darwin") 

30cpython = not pypy and not jython # TODO: something better for this ? 

31arm = "aarch" in platform.machine().lower() 

32 

33 

34contextmanager = contextlib.contextmanager 

35dottedgetter = operator.attrgetter 

36namedtuple = collections.namedtuple 

37next = next # noqa 

38 

39FullArgSpec = collections.namedtuple( 

40 "FullArgSpec", 

41 [ 

42 "args", 

43 "varargs", 

44 "varkw", 

45 "defaults", 

46 "kwonlyargs", 

47 "kwonlydefaults", 

48 "annotations", 

49 ], 

50) 

51 

52try: 

53 import threading 

54except ImportError: 

55 import dummy_threading as threading # noqa 

56 

57 

58# work around http://bugs.python.org/issue2646 

59if py265: 

60 safe_kwarg = lambda arg: arg # noqa 

61else: 

62 safe_kwarg = str 

63 

64 

65def inspect_getfullargspec(func): 

66 """Fully vendored version of getfullargspec from Python 3.3.""" 

67 

68 if inspect.ismethod(func): 

69 func = func.__func__ 

70 if not inspect.isfunction(func): 

71 raise TypeError("{!r} is not a Python function".format(func)) 

72 

73 co = func.__code__ 

74 if not inspect.iscode(co): 

75 raise TypeError("{!r} is not a code object".format(co)) 

76 

77 nargs = co.co_argcount 

78 names = co.co_varnames 

79 nkwargs = co.co_kwonlyargcount if py3k else 0 

80 args = list(names[:nargs]) 

81 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

82 

83 nargs += nkwargs 

84 varargs = None 

85 if co.co_flags & inspect.CO_VARARGS: 

86 varargs = co.co_varnames[nargs] 

87 nargs = nargs + 1 

88 varkw = None 

89 if co.co_flags & inspect.CO_VARKEYWORDS: 

90 varkw = co.co_varnames[nargs] 

91 

92 return FullArgSpec( 

93 args, 

94 varargs, 

95 varkw, 

96 func.__defaults__, 

97 kwonlyargs, 

98 func.__kwdefaults__ if py3k else None, 

99 func.__annotations__ if py3k else {}, 

100 ) 

101 

102 

103if py3k: 

104 import base64 

105 import builtins 

106 import configparser 

107 import itertools 

108 import pickle 

109 

110 from functools import reduce 

111 from io import BytesIO as byte_buffer 

112 from io import StringIO 

113 from itertools import zip_longest 

114 from urllib.parse import ( 

115 quote_plus, 

116 unquote_plus, 

117 parse_qsl, 

118 quote, 

119 unquote, 

120 ) 

121 

122 string_types = (str,) 

123 binary_types = (bytes,) 

124 binary_type = bytes 

125 text_type = str 

126 int_types = (int,) 

127 iterbytes = iter 

128 

129 itertools_filterfalse = itertools.filterfalse 

130 itertools_filter = filter 

131 itertools_imap = map 

132 

133 exec_ = getattr(builtins, "exec") 

134 import_ = getattr(builtins, "__import__") 

135 print_ = getattr(builtins, "print") 

136 

137 def b(s): 

138 return s.encode("latin-1") 

139 

140 def b64decode(x): 

141 return base64.b64decode(x.encode("ascii")) 

142 

143 def b64encode(x): 

144 return base64.b64encode(x).decode("ascii") 

145 

146 def decode_backslashreplace(text, encoding): 

147 return text.decode(encoding, errors="backslashreplace") 

148 

149 def cmp(a, b): 

150 return (a > b) - (a < b) 

151 

152 def raise_( 

153 exception, with_traceback=None, replace_context=None, from_=False 

154 ): 

155 r"""implement "raise" with cause support. 

156 

157 :param exception: exception to raise 

158 :param with_traceback: will call exception.with_traceback() 

159 :param replace_context: an as-yet-unsupported feature. This is 

160 an exception object which we are "replacing", e.g., it's our 

161 "cause" but we don't want it printed. Basically just what 

162 ``__suppress_context__`` does but we don't want to suppress 

163 the enclosing context, if any. So for now we make it the 

164 cause. 

165 :param from\_: the cause. this actually sets the cause and doesn't 

166 hope to hide it someday. 

167 

168 """ 

169 if with_traceback is not None: 

170 exception = exception.with_traceback(with_traceback) 

171 

172 if from_ is not False: 

173 exception.__cause__ = from_ 

174 elif replace_context is not None: 

175 # no good solution here, we would like to have the exception 

176 # have only the context of replace_context.__context__ so that the 

177 # intermediary exception does not change, but we can't figure 

178 # that out. 

179 exception.__cause__ = replace_context 

180 

181 try: 

182 raise exception 

183 finally: 

184 # credit to 

185 # https://cosmicpercolator.com/2016/01/13/exception-leaks-in-python-2-and-3/ 

186 # as the __traceback__ object creates a cycle 

187 del exception, replace_context, from_, with_traceback 

188 

189 if py35: 

190 from typing import TYPE_CHECKING 

191 else: 

192 TYPE_CHECKING = False 

193 

194 def u(s): 

195 return s 

196 

197 def ue(s): 

198 return s 

199 

200 if py32: 

201 callable = callable # noqa 

202 else: 

203 

204 def callable(fn): # noqa 

205 return hasattr(fn, "__call__") 

206 

207 

208else: 

209 import base64 

210 import ConfigParser as configparser # noqa 

211 import itertools 

212 

213 from StringIO import StringIO # noqa 

214 from cStringIO import StringIO as byte_buffer # noqa 

215 from itertools import izip_longest as zip_longest # noqa 

216 from urllib import quote # noqa 

217 from urllib import quote_plus # noqa 

218 from urllib import unquote # noqa 

219 from urllib import unquote_plus # noqa 

220 from urlparse import parse_qsl # noqa 

221 

222 try: 

223 import cPickle as pickle 

224 except ImportError: 

225 import pickle # noqa 

226 

227 string_types = (basestring,) # noqa 

228 binary_types = (bytes,) 

229 binary_type = str 

230 text_type = unicode # noqa 

231 int_types = int, long # noqa 

232 

233 callable = callable # noqa 

234 cmp = cmp # noqa 

235 reduce = reduce # noqa 

236 

237 b64encode = base64.b64encode 

238 b64decode = base64.b64decode 

239 

240 itertools_filterfalse = itertools.ifilterfalse 

241 itertools_filter = itertools.ifilter 

242 itertools_imap = itertools.imap 

243 

244 def b(s): 

245 return s 

246 

247 def exec_(func_text, globals_, lcl=None): 

248 if lcl is None: 

249 exec("exec func_text in globals_") 

250 else: 

251 exec("exec func_text in globals_, lcl") 

252 

253 def iterbytes(buf): 

254 return (ord(byte) for byte in buf) 

255 

256 def import_(*args): 

257 if len(args) == 4: 

258 args = args[0:3] + ([str(arg) for arg in args[3]],) 

259 return __import__(*args) 

260 

261 def print_(*args, **kwargs): 

262 fp = kwargs.pop("file", sys.stdout) 

263 if fp is None: 

264 return 

265 for arg in enumerate(args): 

266 if not isinstance(arg, basestring): # noqa 

267 arg = str(arg) 

268 fp.write(arg) 

269 

270 def u(s): 

271 # this differs from what six does, which doesn't support non-ASCII 

272 # strings - we only use u() with 

273 # literal source strings, and all our source files with non-ascii 

274 # in them (all are tests) are utf-8 encoded. 

275 return unicode(s, "utf-8") # noqa 

276 

277 def ue(s): 

278 return unicode(s, "unicode_escape") # noqa 

279 

280 def decode_backslashreplace(text, encoding): 

281 try: 

282 return text.decode(encoding) 

283 except UnicodeDecodeError: 

284 # regular "backslashreplace" for an incompatible encoding raises: 

285 # "TypeError: don't know how to handle UnicodeDecodeError in 

286 # error callback" 

287 return repr(text)[1:-1].decode() 

288 

289 def safe_bytestring(text): 

290 # py2k only 

291 if not isinstance(text, string_types): 

292 return unicode(text).encode("ascii", errors="backslashreplace") 

293 elif isinstance(text, unicode): 

294 return text.encode("ascii", errors="backslashreplace") 

295 else: 

296 return text 

297 

298 exec( 

299 "def raise_(exception, with_traceback=None, replace_context=None, " 

300 "from_=False):\n" 

301 " if with_traceback:\n" 

302 " raise type(exception), exception, with_traceback\n" 

303 " else:\n" 

304 " raise exception\n" 

305 ) 

306 

307 TYPE_CHECKING = False 

308 

309if py35: 

310 

311 def _formatannotation(annotation, base_module=None): 

312 """vendored from python 3.7""" 

313 

314 if getattr(annotation, "__module__", None) == "typing": 

315 return repr(annotation).replace("typing.", "") 

316 if isinstance(annotation, type): 

317 if annotation.__module__ in ("builtins", base_module): 

318 return annotation.__qualname__ 

319 return annotation.__module__ + "." + annotation.__qualname__ 

320 return repr(annotation) 

321 

322 def inspect_formatargspec( 

323 args, 

324 varargs=None, 

325 varkw=None, 

326 defaults=None, 

327 kwonlyargs=(), 

328 kwonlydefaults={}, 

329 annotations={}, 

330 formatarg=str, 

331 formatvarargs=lambda name: "*" + name, 

332 formatvarkw=lambda name: "**" + name, 

333 formatvalue=lambda value: "=" + repr(value), 

334 formatreturns=lambda text: " -> " + text, 

335 formatannotation=_formatannotation, 

336 ): 

337 """Copy formatargspec from python 3.7 standard library. 

338 

339 Python 3 has deprecated formatargspec and requested that Signature 

340 be used instead, however this requires a full reimplementation 

341 of formatargspec() in terms of creating Parameter objects and such. 

342 Instead of introducing all the object-creation overhead and having 

343 to reinvent from scratch, just copy their compatibility routine. 

344 

345 Utimately we would need to rewrite our "decorator" routine completely 

346 which is not really worth it right now, until all Python 2.x support 

347 is dropped. 

348 

349 """ 

350 

351 def formatargandannotation(arg): 

352 result = formatarg(arg) 

353 if arg in annotations: 

354 result += ": " + formatannotation(annotations[arg]) 

355 return result 

356 

357 specs = [] 

358 if defaults: 

359 firstdefault = len(args) - len(defaults) 

360 for i, arg in enumerate(args): 

361 spec = formatargandannotation(arg) 

362 if defaults and i >= firstdefault: 

363 spec = spec + formatvalue(defaults[i - firstdefault]) 

364 specs.append(spec) 

365 

366 if varargs is not None: 

367 specs.append(formatvarargs(formatargandannotation(varargs))) 

368 else: 

369 if kwonlyargs: 

370 specs.append("*") 

371 

372 if kwonlyargs: 

373 for kwonlyarg in kwonlyargs: 

374 spec = formatargandannotation(kwonlyarg) 

375 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

376 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

377 specs.append(spec) 

378 

379 if varkw is not None: 

380 specs.append(formatvarkw(formatargandannotation(varkw))) 

381 

382 result = "(" + ", ".join(specs) + ")" 

383 if "return" in annotations: 

384 result += formatreturns(formatannotation(annotations["return"])) 

385 return result 

386 

387 

388elif py2k: 

389 from inspect import formatargspec as _inspect_formatargspec 

390 

391 def inspect_formatargspec(*spec, **kw): 

392 # convert for a potential FullArgSpec from compat.getfullargspec() 

393 return _inspect_formatargspec(*spec[0:4], **kw) # noqa 

394 

395 

396else: 

397 from inspect import formatargspec as inspect_formatargspec # noqa 

398 

399 

400# Fix deprecation of accessing ABCs straight from collections module 

401# (which will stop working in 3.8). 

402if py33: 

403 import collections.abc as collections_abc 

404else: 

405 import collections as collections_abc # noqa 

406 

407 

408@contextlib.contextmanager 

409def nested(*managers): 

410 """Implement contextlib.nested, mostly for unit tests. 

411 

412 As tests still need to run on py2.6 we can't use multiple-with yet. 

413 

414 Function is removed in py3k but also emits deprecation warning in 2.7 

415 so just roll it here for everyone. 

416 

417 """ 

418 

419 exits = [] 

420 vars_ = [] 

421 exc = (None, None, None) 

422 try: 

423 for mgr in managers: 

424 exit_ = mgr.__exit__ 

425 enter = mgr.__enter__ 

426 vars_.append(enter()) 

427 exits.append(exit_) 

428 yield vars_ 

429 except: 

430 exc = sys.exc_info() 

431 finally: 

432 while exits: 

433 exit_ = exits.pop() # noqa 

434 try: 

435 if exit_(*exc): 

436 exc = (None, None, None) 

437 except: 

438 exc = sys.exc_info() 

439 if exc != (None, None, None): 

440 reraise(exc[0], exc[1], exc[2]) 

441 

442 

443def raise_from_cause(exception, exc_info=None): 

444 r"""legacy. use raise\_()""" 

445 

446 if exc_info is None: 

447 exc_info = sys.exc_info() 

448 exc_type, exc_value, exc_tb = exc_info 

449 cause = exc_value if exc_value is not exception else None 

450 reraise(type(exception), exception, tb=exc_tb, cause=cause) 

451 

452 

453def reraise(tp, value, tb=None, cause=None): 

454 r"""legacy. use raise\_()""" 

455 

456 raise_(value, with_traceback=tb, from_=cause) 

457 

458 

459def with_metaclass(meta, *bases): 

460 """Create a base class with a metaclass. 

461 

462 Drops the middle class upon creation. 

463 

464 Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/ 

465 

466 """ 

467 

468 class metaclass(meta): 

469 __call__ = type.__call__ 

470 __init__ = type.__init__ 

471 

472 def __new__(cls, name, this_bases, d): 

473 if this_bases is None: 

474 return type.__new__(cls, name, (), d) 

475 return meta(name, bases, d) 

476 

477 return metaclass("temporary_class", None, {}) 

478 

479 

480if py3k: 

481 from datetime import timezone 

482else: 

483 from datetime import datetime 

484 from datetime import timedelta 

485 from datetime import tzinfo 

486 

487 class timezone(tzinfo): 

488 """Minimal port of python 3 timezone object""" 

489 

490 __slots__ = "_offset" 

491 

492 def __init__(self, offset): 

493 if not isinstance(offset, timedelta): 

494 raise TypeError("offset must be a timedelta") 

495 if not self._minoffset <= offset <= self._maxoffset: 

496 raise ValueError( 

497 "offset must be a timedelta " 

498 "strictly between -timedelta(hours=24) and " 

499 "timedelta(hours=24)." 

500 ) 

501 self._offset = offset 

502 

503 def __eq__(self, other): 

504 if type(other) != timezone: 

505 return False 

506 return self._offset == other._offset 

507 

508 def __hash__(self): 

509 return hash(self._offset) 

510 

511 def __repr__(self): 

512 return "sqlalchemy.util.%s(%r)" % ( 

513 self.__class__.__name__, 

514 self._offset, 

515 ) 

516 

517 def __str__(self): 

518 return self.tzname(None) 

519 

520 def utcoffset(self, dt): 

521 return self._offset 

522 

523 def tzname(self, dt): 

524 return self._name_from_offset(self._offset) 

525 

526 def dst(self, dt): 

527 return None 

528 

529 def fromutc(self, dt): 

530 if isinstance(dt, datetime): 

531 if dt.tzinfo is not self: 

532 raise ValueError("fromutc: dt.tzinfo " "is not self") 

533 return dt + self._offset 

534 raise TypeError( 

535 "fromutc() argument must be a datetime instance" " or None" 

536 ) 

537 

538 @staticmethod 

539 def _timedelta_to_microseconds(timedelta): 

540 """backport of timedelta._to_microseconds()""" 

541 return ( 

542 timedelta.days * (24 * 3600) + timedelta.seconds 

543 ) * 1000000 + timedelta.microseconds 

544 

545 @staticmethod 

546 def _divmod_timedeltas(a, b): 

547 """backport of timedelta.__divmod__""" 

548 

549 q, r = divmod( 

550 timezone._timedelta_to_microseconds(a), 

551 timezone._timedelta_to_microseconds(b), 

552 ) 

553 return q, timedelta(0, 0, r) 

554 

555 @staticmethod 

556 def _name_from_offset(delta): 

557 if not delta: 

558 return "UTC" 

559 if delta < timedelta(0): 

560 sign = "-" 

561 delta = -delta 

562 else: 

563 sign = "+" 

564 hours, rest = timezone._divmod_timedeltas( 

565 delta, timedelta(hours=1) 

566 ) 

567 minutes, rest = timezone._divmod_timedeltas( 

568 rest, timedelta(minutes=1) 

569 ) 

570 result = "UTC%s%02d:%02d" % (sign, hours, minutes) 

571 if rest.seconds: 

572 result += ":%02d" % (rest.seconds,) 

573 if rest.microseconds: 

574 result += ".%06d" % (rest.microseconds,) 

575 return result 

576 

577 _maxoffset = timedelta(hours=23, minutes=59) 

578 _minoffset = -_maxoffset 

579 

580 timezone.utc = timezone(timedelta(0))