Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py: 45%

343 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# util/compat.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Handle Python version/platform incompatibilities.""" 

9 

10import collections 

11import contextlib 

12import inspect 

13import operator 

14import platform 

15import sys 

16 

17py312 = sys.version_info >= (3, 12) 

18py311 = sys.version_info >= (3, 11) 

19py310 = sys.version_info >= (3, 10) 

20py39 = sys.version_info >= (3, 9) 

21py38 = sys.version_info >= (3, 8) 

22py37 = sys.version_info >= (3, 7) 

23py3k = sys.version_info >= (3, 0) 

24py2k = sys.version_info < (3, 0) 

25pypy = platform.python_implementation() == "PyPy" 

26 

27 

28cpython = platform.python_implementation() == "CPython" 

29win32 = sys.platform.startswith("win") 

30osx = sys.platform.startswith("darwin") 

31arm = "aarch" in platform.machine().lower() 

32is64bit = sys.maxsize > 2 ** 32 

33 

34has_refcount_gc = bool(cpython) 

35 

36contextmanager = contextlib.contextmanager 

37dottedgetter = operator.attrgetter 

38namedtuple = collections.namedtuple 

39next = next # noqa 

40 

41FullArgSpec = collections.namedtuple( 

42 "FullArgSpec", 

43 [ 

44 "args", 

45 "varargs", 

46 "varkw", 

47 "defaults", 

48 "kwonlyargs", 

49 "kwonlydefaults", 

50 "annotations", 

51 ], 

52) 

53 

54 

55class nullcontext(object): 

56 """Context manager that does no additional processing. 

57 

58 Vendored from Python 3.7. 

59 

60 """ 

61 

62 def __init__(self, enter_result=None): 

63 self.enter_result = enter_result 

64 

65 def __enter__(self): 

66 return self.enter_result 

67 

68 def __exit__(self, *excinfo): 

69 pass 

70 

71 

72try: 

73 import threading 

74except ImportError: 

75 import dummy_threading as threading # noqa 

76 

77 

78def inspect_getfullargspec(func): 

79 """Fully vendored version of getfullargspec from Python 3.3.""" 

80 

81 if inspect.ismethod(func): 

82 func = func.__func__ 

83 if not inspect.isfunction(func): 

84 raise TypeError("{!r} is not a Python function".format(func)) 

85 

86 co = func.__code__ 

87 if not inspect.iscode(co): 

88 raise TypeError("{!r} is not a code object".format(co)) 

89 

90 nargs = co.co_argcount 

91 names = co.co_varnames 

92 nkwargs = co.co_kwonlyargcount if py3k else 0 

93 args = list(names[:nargs]) 

94 kwonlyargs = list(names[nargs : nargs + nkwargs]) 

95 

96 nargs += nkwargs 

97 varargs = None 

98 if co.co_flags & inspect.CO_VARARGS: 

99 varargs = co.co_varnames[nargs] 

100 nargs = nargs + 1 

101 varkw = None 

102 if co.co_flags & inspect.CO_VARKEYWORDS: 

103 varkw = co.co_varnames[nargs] 

104 

105 return FullArgSpec( 

106 args, 

107 varargs, 

108 varkw, 

109 func.__defaults__, 

110 kwonlyargs, 

111 func.__kwdefaults__ if py3k else None, 

112 func.__annotations__ if py3k else {}, 

113 ) 

114 

115 

116if py38: 

117 from importlib import metadata as importlib_metadata 

118else: 

119 import importlib_metadata # noqa 

120 

121 

122def importlib_metadata_get(group): 

123 ep = importlib_metadata.entry_points() 

124 if hasattr(ep, "select"): 

125 return ep.select(group=group) 

126 else: 

127 return ep.get(group, ()) 

128 

129 

130if py3k: 

131 import base64 

132 import builtins 

133 import configparser 

134 import itertools 

135 import pickle 

136 

137 from functools import reduce 

138 from io import BytesIO as byte_buffer 

139 from io import StringIO 

140 from itertools import zip_longest 

141 from time import perf_counter 

142 from urllib.parse import ( 

143 quote_plus, 

144 unquote_plus, 

145 parse_qsl, 

146 quote, 

147 unquote, 

148 ) 

149 

150 string_types = (str,) 

151 binary_types = (bytes,) 

152 binary_type = bytes 

153 text_type = str 

154 int_types = (int,) 

155 iterbytes = iter 

156 long_type = int 

157 

158 itertools_filterfalse = itertools.filterfalse 

159 itertools_filter = filter 

160 itertools_imap = map 

161 

162 exec_ = getattr(builtins, "exec") 

163 import_ = getattr(builtins, "__import__") 

164 print_ = getattr(builtins, "print") 

165 

166 def b(s): 

167 return s.encode("latin-1") 

168 

169 def b64decode(x): 

170 return base64.b64decode(x.encode("ascii")) 

171 

172 def b64encode(x): 

173 return base64.b64encode(x).decode("ascii") 

174 

175 def decode_backslashreplace(text, encoding): 

176 return text.decode(encoding, errors="backslashreplace") 

177 

178 def cmp(a, b): 

179 return (a > b) - (a < b) 

180 

181 def raise_( 

182 exception, with_traceback=None, replace_context=None, from_=False 

183 ): 

184 r"""implement "raise" with cause support. 

185 

186 :param exception: exception to raise 

187 :param with_traceback: will call exception.with_traceback() 

188 :param replace_context: an as-yet-unsupported feature. This is 

189 an exception object which we are "replacing", e.g., it's our 

190 "cause" but we don't want it printed. Basically just what 

191 ``__suppress_context__`` does but we don't want to suppress 

192 the enclosing context, if any. So for now we make it the 

193 cause. 

194 :param from\_: the cause. this actually sets the cause and doesn't 

195 hope to hide it someday. 

196 

197 """ 

198 if with_traceback is not None: 

199 exception = exception.with_traceback(with_traceback) 

200 

201 if from_ is not False: 

202 exception.__cause__ = from_ 

203 elif replace_context is not None: 

204 # no good solution here, we would like to have the exception 

205 # have only the context of replace_context.__context__ so that the 

206 # intermediary exception does not change, but we can't figure 

207 # that out. 

208 exception.__cause__ = replace_context 

209 

210 try: 

211 raise exception 

212 finally: 

213 # credit to 

214 # https://cosmicpercolator.com/2016/01/13/exception-leaks-in-python-2-and-3/ 

215 # as the __traceback__ object creates a cycle 

216 del exception, replace_context, from_, with_traceback 

217 

218 def u(s): 

219 return s 

220 

221 def ue(s): 

222 return s 

223 

224 from typing import TYPE_CHECKING 

225 

226 # Unused. Kept for backwards compatibility. 

227 callable = callable # noqa 

228 

229 from abc import ABC 

230 

231 def _qualname(fn): 

232 return fn.__qualname__ 

233 

234 

235else: 

236 import base64 

237 import ConfigParser as configparser # noqa 

238 import itertools 

239 

240 from StringIO import StringIO # noqa 

241 from cStringIO import StringIO as byte_buffer # noqa 

242 from itertools import izip_longest as zip_longest # noqa 

243 from time import clock as perf_counter # noqa 

244 from urllib import quote # noqa 

245 from urllib import quote_plus # noqa 

246 from urllib import unquote # noqa 

247 from urllib import unquote_plus # noqa 

248 from urlparse import parse_qsl # noqa 

249 

250 from abc import ABCMeta 

251 

252 class ABC(object): 

253 __metaclass__ = ABCMeta 

254 

255 try: 

256 import cPickle as pickle 

257 except ImportError: 

258 import pickle # noqa 

259 

260 string_types = (basestring,) # noqa 

261 binary_types = (bytes,) 

262 binary_type = str 

263 text_type = unicode # noqa 

264 int_types = int, long # noqa 

265 long_type = long # noqa 

266 

267 callable = callable # noqa 

268 cmp = cmp # noqa 

269 reduce = reduce # noqa 

270 

271 b64encode = base64.b64encode 

272 b64decode = base64.b64decode 

273 

274 itertools_filterfalse = itertools.ifilterfalse 

275 itertools_filter = itertools.ifilter 

276 itertools_imap = itertools.imap 

277 

278 def b(s): 

279 return s 

280 

281 def exec_(func_text, globals_, lcl=None): 

282 if lcl is None: 

283 exec("exec func_text in globals_") 

284 else: 

285 exec("exec func_text in globals_, lcl") 

286 

287 def iterbytes(buf): 

288 return (ord(byte) for byte in buf) 

289 

290 def import_(*args): 

291 if len(args) == 4: 

292 args = args[0:3] + ([str(arg) for arg in args[3]],) 

293 return __import__(*args) 

294 

295 def print_(*args, **kwargs): 

296 fp = kwargs.pop("file", sys.stdout) 

297 if fp is None: 

298 return 

299 for arg in enumerate(args): 

300 if not isinstance(arg, basestring): # noqa 

301 arg = str(arg) 

302 fp.write(arg) 

303 

304 def u(s): 

305 # this differs from what six does, which doesn't support non-ASCII 

306 # strings - we only use u() with 

307 # literal source strings, and all our source files with non-ascii 

308 # in them (all are tests) are utf-8 encoded. 

309 return unicode(s, "utf-8") # noqa 

310 

311 def ue(s): 

312 return unicode(s, "unicode_escape") # noqa 

313 

314 def decode_backslashreplace(text, encoding): 

315 try: 

316 return text.decode(encoding) 

317 except UnicodeDecodeError: 

318 # regular "backslashreplace" for an incompatible encoding raises: 

319 # "TypeError: don't know how to handle UnicodeDecodeError in 

320 # error callback" 

321 return repr(text)[1:-1].decode() 

322 

323 def safe_bytestring(text): 

324 # py2k only 

325 if not isinstance(text, string_types): 

326 return unicode(text).encode( # noqa: F821 

327 "ascii", errors="backslashreplace" 

328 ) 

329 elif isinstance(text, unicode): # noqa: F821 

330 return text.encode("ascii", errors="backslashreplace") 

331 else: 

332 return text 

333 

334 exec( 

335 "def raise_(exception, with_traceback=None, replace_context=None, " 

336 "from_=False):\n" 

337 " if with_traceback:\n" 

338 " raise type(exception), exception, with_traceback\n" 

339 " else:\n" 

340 " raise exception\n" 

341 ) 

342 

343 TYPE_CHECKING = False 

344 

345 def _qualname(meth): 

346 """return __qualname__ equivalent for a method on a class""" 

347 

348 for cls in meth.im_class.__mro__: 

349 if meth.__name__ in cls.__dict__: 

350 break 

351 else: 

352 return meth.__name__ 

353 

354 return "%s.%s" % (cls.__name__, meth.__name__) 

355 

356 

357if py3k: 

358 

359 def _formatannotation(annotation, base_module=None): 

360 """vendored from python 3.7""" 

361 

362 if getattr(annotation, "__module__", None) == "typing": 

363 return repr(annotation).replace("typing.", "") 

364 if isinstance(annotation, type): 

365 if annotation.__module__ in ("builtins", base_module): 

366 return annotation.__qualname__ 

367 return annotation.__module__ + "." + annotation.__qualname__ 

368 return repr(annotation) 

369 

370 def inspect_formatargspec( 

371 args, 

372 varargs=None, 

373 varkw=None, 

374 defaults=None, 

375 kwonlyargs=(), 

376 kwonlydefaults={}, 

377 annotations={}, 

378 formatarg=str, 

379 formatvarargs=lambda name: "*" + name, 

380 formatvarkw=lambda name: "**" + name, 

381 formatvalue=lambda value: "=" + repr(value), 

382 formatreturns=lambda text: " -> " + text, 

383 formatannotation=_formatannotation, 

384 ): 

385 """Copy formatargspec from python 3.7 standard library. 

386 

387 Python 3 has deprecated formatargspec and requested that Signature 

388 be used instead, however this requires a full reimplementation 

389 of formatargspec() in terms of creating Parameter objects and such. 

390 Instead of introducing all the object-creation overhead and having 

391 to reinvent from scratch, just copy their compatibility routine. 

392 

393 Ultimately we would need to rewrite our "decorator" routine completely 

394 which is not really worth it right now, until all Python 2.x support 

395 is dropped. 

396 

397 """ 

398 

399 kwonlydefaults = kwonlydefaults or {} 

400 annotations = annotations or {} 

401 

402 def formatargandannotation(arg): 

403 result = formatarg(arg) 

404 if arg in annotations: 

405 result += ": " + formatannotation(annotations[arg]) 

406 return result 

407 

408 specs = [] 

409 if defaults: 

410 firstdefault = len(args) - len(defaults) 

411 for i, arg in enumerate(args): 

412 spec = formatargandannotation(arg) 

413 if defaults and i >= firstdefault: 

414 spec = spec + formatvalue(defaults[i - firstdefault]) 

415 specs.append(spec) 

416 

417 if varargs is not None: 

418 specs.append(formatvarargs(formatargandannotation(varargs))) 

419 else: 

420 if kwonlyargs: 

421 specs.append("*") 

422 

423 if kwonlyargs: 

424 for kwonlyarg in kwonlyargs: 

425 spec = formatargandannotation(kwonlyarg) 

426 if kwonlydefaults and kwonlyarg in kwonlydefaults: 

427 spec += formatvalue(kwonlydefaults[kwonlyarg]) 

428 specs.append(spec) 

429 

430 if varkw is not None: 

431 specs.append(formatvarkw(formatargandannotation(varkw))) 

432 

433 result = "(" + ", ".join(specs) + ")" 

434 if "return" in annotations: 

435 result += formatreturns(formatannotation(annotations["return"])) 

436 return result 

437 

438 

439else: 

440 from inspect import formatargspec as _inspect_formatargspec 

441 

442 def inspect_formatargspec(*spec, **kw): 

443 # convert for a potential FullArgSpec from compat.getfullargspec() 

444 return _inspect_formatargspec(*spec[0:4], **kw) # noqa 

445 

446 

447# Fix deprecation of accessing ABCs straight from collections module 

448# (which will stop working in 3.8). 

449if py3k: 

450 import collections.abc as collections_abc 

451else: 

452 import collections as collections_abc # noqa 

453 

454 

455if py37: 

456 import dataclasses 

457 

458 def dataclass_fields(cls): 

459 """Return a sequence of all dataclasses.Field objects associated 

460 with a class.""" 

461 

462 if dataclasses.is_dataclass(cls): 

463 return dataclasses.fields(cls) 

464 else: 

465 return [] 

466 

467 def local_dataclass_fields(cls): 

468 """Return a sequence of all dataclasses.Field objects associated with 

469 a class, excluding those that originate from a superclass.""" 

470 

471 if dataclasses.is_dataclass(cls): 

472 super_fields = set() 

473 for sup in cls.__bases__: 

474 super_fields.update(dataclass_fields(sup)) 

475 return [ 

476 f for f in dataclasses.fields(cls) if f not in super_fields 

477 ] 

478 else: 

479 return [] 

480 

481 

482else: 

483 

484 def dataclass_fields(cls): 

485 return [] 

486 

487 def local_dataclass_fields(cls): 

488 return [] 

489 

490 

491def raise_from_cause(exception, exc_info=None): 

492 r"""legacy. use raise\_()""" 

493 

494 if exc_info is None: 

495 exc_info = sys.exc_info() 

496 exc_type, exc_value, exc_tb = exc_info 

497 cause = exc_value if exc_value is not exception else None 

498 reraise(type(exception), exception, tb=exc_tb, cause=cause) 

499 

500 

501def reraise(tp, value, tb=None, cause=None): 

502 r"""legacy. use raise\_()""" 

503 

504 raise_(value, with_traceback=tb, from_=cause) 

505 

506 

507def with_metaclass(meta, *bases, **kw): 

508 """Create a base class with a metaclass. 

509 

510 Drops the middle class upon creation. 

511 

512 Source: https://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/ 

513 

514 """ 

515 

516 class metaclass(meta): 

517 __call__ = type.__call__ 

518 __init__ = type.__init__ 

519 

520 def __new__(cls, name, this_bases, d): 

521 if this_bases is None: 

522 cls = type.__new__(cls, name, (), d) 

523 else: 

524 cls = meta(name, bases, d) 

525 

526 if hasattr(cls, "__init_subclass__") and hasattr( 

527 cls.__init_subclass__, "__func__" 

528 ): 

529 cls.__init_subclass__.__func__(cls, **kw) 

530 return cls 

531 

532 return metaclass("temporary_class", None, {}) 

533 

534 

535if py3k: 

536 from datetime import timezone 

537else: 

538 from datetime import datetime 

539 from datetime import timedelta 

540 from datetime import tzinfo 

541 

542 class timezone(tzinfo): 

543 """Minimal port of python 3 timezone object""" 

544 

545 __slots__ = "_offset" 

546 

547 def __init__(self, offset): 

548 if not isinstance(offset, timedelta): 

549 raise TypeError("offset must be a timedelta") 

550 if not self._minoffset <= offset <= self._maxoffset: 

551 raise ValueError( 

552 "offset must be a timedelta " 

553 "strictly between -timedelta(hours=24) and " 

554 "timedelta(hours=24)." 

555 ) 

556 self._offset = offset 

557 

558 def __eq__(self, other): 

559 if type(other) != timezone: 

560 return False 

561 return self._offset == other._offset 

562 

563 def __hash__(self): 

564 return hash(self._offset) 

565 

566 def __repr__(self): 

567 return "sqlalchemy.util.%s(%r)" % ( 

568 self.__class__.__name__, 

569 self._offset, 

570 ) 

571 

572 def __str__(self): 

573 return self.tzname(None) 

574 

575 def utcoffset(self, dt): 

576 return self._offset 

577 

578 def tzname(self, dt): 

579 return self._name_from_offset(self._offset) 

580 

581 def dst(self, dt): 

582 return None 

583 

584 def fromutc(self, dt): 

585 if isinstance(dt, datetime): 

586 if dt.tzinfo is not self: 

587 raise ValueError("fromutc: dt.tzinfo " "is not self") 

588 return dt + self._offset 

589 raise TypeError( 

590 "fromutc() argument must be a datetime instance" " or None" 

591 ) 

592 

593 @staticmethod 

594 def _timedelta_to_microseconds(timedelta): 

595 """backport of timedelta._to_microseconds()""" 

596 return ( 

597 timedelta.days * (24 * 3600) + timedelta.seconds 

598 ) * 1000000 + timedelta.microseconds 

599 

600 @staticmethod 

601 def _divmod_timedeltas(a, b): 

602 """backport of timedelta.__divmod__""" 

603 

604 q, r = divmod( 

605 timezone._timedelta_to_microseconds(a), 

606 timezone._timedelta_to_microseconds(b), 

607 ) 

608 return q, timedelta(0, 0, r) 

609 

610 @staticmethod 

611 def _name_from_offset(delta): 

612 if not delta: 

613 return "UTC" 

614 if delta < timedelta(0): 

615 sign = "-" 

616 delta = -delta 

617 else: 

618 sign = "+" 

619 hours, rest = timezone._divmod_timedeltas( 

620 delta, timedelta(hours=1) 

621 ) 

622 minutes, rest = timezone._divmod_timedeltas( 

623 rest, timedelta(minutes=1) 

624 ) 

625 result = "UTC%s%02d:%02d" % (sign, hours, minutes) 

626 if rest.seconds: 

627 result += ":%02d" % (rest.seconds,) 

628 if rest.microseconds: 

629 result += ".%06d" % (rest.microseconds,) 

630 return result 

631 

632 _maxoffset = timedelta(hours=23, minutes=59) 

633 _minoffset = -_maxoffset 

634 

635 timezone.utc = timezone(timedelta(0))