Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py: 45%
343 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# util/compat.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8"""Handle Python version/platform incompatibilities."""
10import collections
11import contextlib
12import inspect
13import operator
14import platform
15import sys
17py312 = sys.version_info >= (3, 12)
18py311 = sys.version_info >= (3, 11)
19py310 = sys.version_info >= (3, 10)
20py39 = sys.version_info >= (3, 9)
21py38 = sys.version_info >= (3, 8)
22py37 = sys.version_info >= (3, 7)
23py3k = sys.version_info >= (3, 0)
24py2k = sys.version_info < (3, 0)
25pypy = platform.python_implementation() == "PyPy"
28cpython = platform.python_implementation() == "CPython"
29win32 = sys.platform.startswith("win")
30osx = sys.platform.startswith("darwin")
31arm = "aarch" in platform.machine().lower()
32is64bit = sys.maxsize > 2 ** 32
34has_refcount_gc = bool(cpython)
36contextmanager = contextlib.contextmanager
37dottedgetter = operator.attrgetter
38namedtuple = collections.namedtuple
39next = next # noqa
41FullArgSpec = collections.namedtuple(
42 "FullArgSpec",
43 [
44 "args",
45 "varargs",
46 "varkw",
47 "defaults",
48 "kwonlyargs",
49 "kwonlydefaults",
50 "annotations",
51 ],
52)
55class nullcontext(object):
56 """Context manager that does no additional processing.
58 Vendored from Python 3.7.
60 """
62 def __init__(self, enter_result=None):
63 self.enter_result = enter_result
65 def __enter__(self):
66 return self.enter_result
68 def __exit__(self, *excinfo):
69 pass
72try:
73 import threading
74except ImportError:
75 import dummy_threading as threading # noqa
78def inspect_getfullargspec(func):
79 """Fully vendored version of getfullargspec from Python 3.3."""
81 if inspect.ismethod(func):
82 func = func.__func__
83 if not inspect.isfunction(func):
84 raise TypeError("{!r} is not a Python function".format(func))
86 co = func.__code__
87 if not inspect.iscode(co):
88 raise TypeError("{!r} is not a code object".format(co))
90 nargs = co.co_argcount
91 names = co.co_varnames
92 nkwargs = co.co_kwonlyargcount if py3k else 0
93 args = list(names[:nargs])
94 kwonlyargs = list(names[nargs : nargs + nkwargs])
96 nargs += nkwargs
97 varargs = None
98 if co.co_flags & inspect.CO_VARARGS:
99 varargs = co.co_varnames[nargs]
100 nargs = nargs + 1
101 varkw = None
102 if co.co_flags & inspect.CO_VARKEYWORDS:
103 varkw = co.co_varnames[nargs]
105 return FullArgSpec(
106 args,
107 varargs,
108 varkw,
109 func.__defaults__,
110 kwonlyargs,
111 func.__kwdefaults__ if py3k else None,
112 func.__annotations__ if py3k else {},
113 )
116if py38:
117 from importlib import metadata as importlib_metadata
118else:
119 import importlib_metadata # noqa
122def importlib_metadata_get(group):
123 ep = importlib_metadata.entry_points()
124 if hasattr(ep, "select"):
125 return ep.select(group=group)
126 else:
127 return ep.get(group, ())
130if py3k:
131 import base64
132 import builtins
133 import configparser
134 import itertools
135 import pickle
137 from functools import reduce
138 from io import BytesIO as byte_buffer
139 from io import StringIO
140 from itertools import zip_longest
141 from time import perf_counter
142 from urllib.parse import (
143 quote_plus,
144 unquote_plus,
145 parse_qsl,
146 quote,
147 unquote,
148 )
150 string_types = (str,)
151 binary_types = (bytes,)
152 binary_type = bytes
153 text_type = str
154 int_types = (int,)
155 iterbytes = iter
156 long_type = int
158 itertools_filterfalse = itertools.filterfalse
159 itertools_filter = filter
160 itertools_imap = map
162 exec_ = getattr(builtins, "exec")
163 import_ = getattr(builtins, "__import__")
164 print_ = getattr(builtins, "print")
166 def b(s):
167 return s.encode("latin-1")
169 def b64decode(x):
170 return base64.b64decode(x.encode("ascii"))
172 def b64encode(x):
173 return base64.b64encode(x).decode("ascii")
175 def decode_backslashreplace(text, encoding):
176 return text.decode(encoding, errors="backslashreplace")
178 def cmp(a, b):
179 return (a > b) - (a < b)
181 def raise_(
182 exception, with_traceback=None, replace_context=None, from_=False
183 ):
184 r"""implement "raise" with cause support.
186 :param exception: exception to raise
187 :param with_traceback: will call exception.with_traceback()
188 :param replace_context: an as-yet-unsupported feature. This is
189 an exception object which we are "replacing", e.g., it's our
190 "cause" but we don't want it printed. Basically just what
191 ``__suppress_context__`` does but we don't want to suppress
192 the enclosing context, if any. So for now we make it the
193 cause.
194 :param from\_: the cause. this actually sets the cause and doesn't
195 hope to hide it someday.
197 """
198 if with_traceback is not None:
199 exception = exception.with_traceback(with_traceback)
201 if from_ is not False:
202 exception.__cause__ = from_
203 elif replace_context is not None:
204 # no good solution here, we would like to have the exception
205 # have only the context of replace_context.__context__ so that the
206 # intermediary exception does not change, but we can't figure
207 # that out.
208 exception.__cause__ = replace_context
210 try:
211 raise exception
212 finally:
213 # credit to
214 # https://cosmicpercolator.com/2016/01/13/exception-leaks-in-python-2-and-3/
215 # as the __traceback__ object creates a cycle
216 del exception, replace_context, from_, with_traceback
218 def u(s):
219 return s
221 def ue(s):
222 return s
224 from typing import TYPE_CHECKING
226 # Unused. Kept for backwards compatibility.
227 callable = callable # noqa
229 from abc import ABC
231 def _qualname(fn):
232 return fn.__qualname__
235else:
236 import base64
237 import ConfigParser as configparser # noqa
238 import itertools
240 from StringIO import StringIO # noqa
241 from cStringIO import StringIO as byte_buffer # noqa
242 from itertools import izip_longest as zip_longest # noqa
243 from time import clock as perf_counter # noqa
244 from urllib import quote # noqa
245 from urllib import quote_plus # noqa
246 from urllib import unquote # noqa
247 from urllib import unquote_plus # noqa
248 from urlparse import parse_qsl # noqa
250 from abc import ABCMeta
252 class ABC(object):
253 __metaclass__ = ABCMeta
255 try:
256 import cPickle as pickle
257 except ImportError:
258 import pickle # noqa
260 string_types = (basestring,) # noqa
261 binary_types = (bytes,)
262 binary_type = str
263 text_type = unicode # noqa
264 int_types = int, long # noqa
265 long_type = long # noqa
267 callable = callable # noqa
268 cmp = cmp # noqa
269 reduce = reduce # noqa
271 b64encode = base64.b64encode
272 b64decode = base64.b64decode
274 itertools_filterfalse = itertools.ifilterfalse
275 itertools_filter = itertools.ifilter
276 itertools_imap = itertools.imap
278 def b(s):
279 return s
281 def exec_(func_text, globals_, lcl=None):
282 if lcl is None:
283 exec("exec func_text in globals_")
284 else:
285 exec("exec func_text in globals_, lcl")
287 def iterbytes(buf):
288 return (ord(byte) for byte in buf)
290 def import_(*args):
291 if len(args) == 4:
292 args = args[0:3] + ([str(arg) for arg in args[3]],)
293 return __import__(*args)
295 def print_(*args, **kwargs):
296 fp = kwargs.pop("file", sys.stdout)
297 if fp is None:
298 return
299 for arg in enumerate(args):
300 if not isinstance(arg, basestring): # noqa
301 arg = str(arg)
302 fp.write(arg)
304 def u(s):
305 # this differs from what six does, which doesn't support non-ASCII
306 # strings - we only use u() with
307 # literal source strings, and all our source files with non-ascii
308 # in them (all are tests) are utf-8 encoded.
309 return unicode(s, "utf-8") # noqa
311 def ue(s):
312 return unicode(s, "unicode_escape") # noqa
314 def decode_backslashreplace(text, encoding):
315 try:
316 return text.decode(encoding)
317 except UnicodeDecodeError:
318 # regular "backslashreplace" for an incompatible encoding raises:
319 # "TypeError: don't know how to handle UnicodeDecodeError in
320 # error callback"
321 return repr(text)[1:-1].decode()
323 def safe_bytestring(text):
324 # py2k only
325 if not isinstance(text, string_types):
326 return unicode(text).encode( # noqa: F821
327 "ascii", errors="backslashreplace"
328 )
329 elif isinstance(text, unicode): # noqa: F821
330 return text.encode("ascii", errors="backslashreplace")
331 else:
332 return text
334 exec(
335 "def raise_(exception, with_traceback=None, replace_context=None, "
336 "from_=False):\n"
337 " if with_traceback:\n"
338 " raise type(exception), exception, with_traceback\n"
339 " else:\n"
340 " raise exception\n"
341 )
343 TYPE_CHECKING = False
345 def _qualname(meth):
346 """return __qualname__ equivalent for a method on a class"""
348 for cls in meth.im_class.__mro__:
349 if meth.__name__ in cls.__dict__:
350 break
351 else:
352 return meth.__name__
354 return "%s.%s" % (cls.__name__, meth.__name__)
357if py3k:
359 def _formatannotation(annotation, base_module=None):
360 """vendored from python 3.7"""
362 if getattr(annotation, "__module__", None) == "typing":
363 return repr(annotation).replace("typing.", "")
364 if isinstance(annotation, type):
365 if annotation.__module__ in ("builtins", base_module):
366 return annotation.__qualname__
367 return annotation.__module__ + "." + annotation.__qualname__
368 return repr(annotation)
370 def inspect_formatargspec(
371 args,
372 varargs=None,
373 varkw=None,
374 defaults=None,
375 kwonlyargs=(),
376 kwonlydefaults={},
377 annotations={},
378 formatarg=str,
379 formatvarargs=lambda name: "*" + name,
380 formatvarkw=lambda name: "**" + name,
381 formatvalue=lambda value: "=" + repr(value),
382 formatreturns=lambda text: " -> " + text,
383 formatannotation=_formatannotation,
384 ):
385 """Copy formatargspec from python 3.7 standard library.
387 Python 3 has deprecated formatargspec and requested that Signature
388 be used instead, however this requires a full reimplementation
389 of formatargspec() in terms of creating Parameter objects and such.
390 Instead of introducing all the object-creation overhead and having
391 to reinvent from scratch, just copy their compatibility routine.
393 Ultimately we would need to rewrite our "decorator" routine completely
394 which is not really worth it right now, until all Python 2.x support
395 is dropped.
397 """
399 kwonlydefaults = kwonlydefaults or {}
400 annotations = annotations or {}
402 def formatargandannotation(arg):
403 result = formatarg(arg)
404 if arg in annotations:
405 result += ": " + formatannotation(annotations[arg])
406 return result
408 specs = []
409 if defaults:
410 firstdefault = len(args) - len(defaults)
411 for i, arg in enumerate(args):
412 spec = formatargandannotation(arg)
413 if defaults and i >= firstdefault:
414 spec = spec + formatvalue(defaults[i - firstdefault])
415 specs.append(spec)
417 if varargs is not None:
418 specs.append(formatvarargs(formatargandannotation(varargs)))
419 else:
420 if kwonlyargs:
421 specs.append("*")
423 if kwonlyargs:
424 for kwonlyarg in kwonlyargs:
425 spec = formatargandannotation(kwonlyarg)
426 if kwonlydefaults and kwonlyarg in kwonlydefaults:
427 spec += formatvalue(kwonlydefaults[kwonlyarg])
428 specs.append(spec)
430 if varkw is not None:
431 specs.append(formatvarkw(formatargandannotation(varkw)))
433 result = "(" + ", ".join(specs) + ")"
434 if "return" in annotations:
435 result += formatreturns(formatannotation(annotations["return"]))
436 return result
439else:
440 from inspect import formatargspec as _inspect_formatargspec
442 def inspect_formatargspec(*spec, **kw):
443 # convert for a potential FullArgSpec from compat.getfullargspec()
444 return _inspect_formatargspec(*spec[0:4], **kw) # noqa
447# Fix deprecation of accessing ABCs straight from collections module
448# (which will stop working in 3.8).
449if py3k:
450 import collections.abc as collections_abc
451else:
452 import collections as collections_abc # noqa
455if py37:
456 import dataclasses
458 def dataclass_fields(cls):
459 """Return a sequence of all dataclasses.Field objects associated
460 with a class."""
462 if dataclasses.is_dataclass(cls):
463 return dataclasses.fields(cls)
464 else:
465 return []
467 def local_dataclass_fields(cls):
468 """Return a sequence of all dataclasses.Field objects associated with
469 a class, excluding those that originate from a superclass."""
471 if dataclasses.is_dataclass(cls):
472 super_fields = set()
473 for sup in cls.__bases__:
474 super_fields.update(dataclass_fields(sup))
475 return [
476 f for f in dataclasses.fields(cls) if f not in super_fields
477 ]
478 else:
479 return []
482else:
484 def dataclass_fields(cls):
485 return []
487 def local_dataclass_fields(cls):
488 return []
491def raise_from_cause(exception, exc_info=None):
492 r"""legacy. use raise\_()"""
494 if exc_info is None:
495 exc_info = sys.exc_info()
496 exc_type, exc_value, exc_tb = exc_info
497 cause = exc_value if exc_value is not exception else None
498 reraise(type(exception), exception, tb=exc_tb, cause=cause)
501def reraise(tp, value, tb=None, cause=None):
502 r"""legacy. use raise\_()"""
504 raise_(value, with_traceback=tb, from_=cause)
507def with_metaclass(meta, *bases, **kw):
508 """Create a base class with a metaclass.
510 Drops the middle class upon creation.
512 Source: https://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/
514 """
516 class metaclass(meta):
517 __call__ = type.__call__
518 __init__ = type.__init__
520 def __new__(cls, name, this_bases, d):
521 if this_bases is None:
522 cls = type.__new__(cls, name, (), d)
523 else:
524 cls = meta(name, bases, d)
526 if hasattr(cls, "__init_subclass__") and hasattr(
527 cls.__init_subclass__, "__func__"
528 ):
529 cls.__init_subclass__.__func__(cls, **kw)
530 return cls
532 return metaclass("temporary_class", None, {})
535if py3k:
536 from datetime import timezone
537else:
538 from datetime import datetime
539 from datetime import timedelta
540 from datetime import tzinfo
542 class timezone(tzinfo):
543 """Minimal port of python 3 timezone object"""
545 __slots__ = "_offset"
547 def __init__(self, offset):
548 if not isinstance(offset, timedelta):
549 raise TypeError("offset must be a timedelta")
550 if not self._minoffset <= offset <= self._maxoffset:
551 raise ValueError(
552 "offset must be a timedelta "
553 "strictly between -timedelta(hours=24) and "
554 "timedelta(hours=24)."
555 )
556 self._offset = offset
558 def __eq__(self, other):
559 if type(other) != timezone:
560 return False
561 return self._offset == other._offset
563 def __hash__(self):
564 return hash(self._offset)
566 def __repr__(self):
567 return "sqlalchemy.util.%s(%r)" % (
568 self.__class__.__name__,
569 self._offset,
570 )
572 def __str__(self):
573 return self.tzname(None)
575 def utcoffset(self, dt):
576 return self._offset
578 def tzname(self, dt):
579 return self._name_from_offset(self._offset)
581 def dst(self, dt):
582 return None
584 def fromutc(self, dt):
585 if isinstance(dt, datetime):
586 if dt.tzinfo is not self:
587 raise ValueError("fromutc: dt.tzinfo " "is not self")
588 return dt + self._offset
589 raise TypeError(
590 "fromutc() argument must be a datetime instance" " or None"
591 )
593 @staticmethod
594 def _timedelta_to_microseconds(timedelta):
595 """backport of timedelta._to_microseconds()"""
596 return (
597 timedelta.days * (24 * 3600) + timedelta.seconds
598 ) * 1000000 + timedelta.microseconds
600 @staticmethod
601 def _divmod_timedeltas(a, b):
602 """backport of timedelta.__divmod__"""
604 q, r = divmod(
605 timezone._timedelta_to_microseconds(a),
606 timezone._timedelta_to_microseconds(b),
607 )
608 return q, timedelta(0, 0, r)
610 @staticmethod
611 def _name_from_offset(delta):
612 if not delta:
613 return "UTC"
614 if delta < timedelta(0):
615 sign = "-"
616 delta = -delta
617 else:
618 sign = "+"
619 hours, rest = timezone._divmod_timedeltas(
620 delta, timedelta(hours=1)
621 )
622 minutes, rest = timezone._divmod_timedeltas(
623 rest, timedelta(minutes=1)
624 )
625 result = "UTC%s%02d:%02d" % (sign, hours, minutes)
626 if rest.seconds:
627 result += ":%02d" % (rest.seconds,)
628 if rest.microseconds:
629 result += ".%06d" % (rest.microseconds,)
630 return result
632 _maxoffset = timedelta(hours=23, minutes=59)
633 _minoffset = -_maxoffset
635 timezone.utc = timezone(timedelta(0))