1# util/compat.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Handle Python version/platform incompatibilities."""
9
10import collections
11import contextlib
12import inspect
13import operator
14import platform
15import sys
16
17py312 = sys.version_info >= (3, 12)
18py311 = sys.version_info >= (3, 11)
19py310 = sys.version_info >= (3, 10)
20py39 = sys.version_info >= (3, 9)
21py38 = sys.version_info >= (3, 8)
22py37 = sys.version_info >= (3, 7)
23py3k = sys.version_info >= (3, 0)
24py2k = sys.version_info < (3, 0)
25pypy = platform.python_implementation() == "PyPy"
26
27
28cpython = platform.python_implementation() == "CPython"
29win32 = sys.platform.startswith("win")
30osx = sys.platform.startswith("darwin")
31arm = "aarch" in platform.machine().lower()
32is64bit = sys.maxsize > 2 ** 32
33
34has_refcount_gc = bool(cpython)
35
36contextmanager = contextlib.contextmanager
37dottedgetter = operator.attrgetter
38namedtuple = collections.namedtuple
39next = next # noqa
40
41FullArgSpec = collections.namedtuple(
42 "FullArgSpec",
43 [
44 "args",
45 "varargs",
46 "varkw",
47 "defaults",
48 "kwonlyargs",
49 "kwonlydefaults",
50 "annotations",
51 ],
52)
53
54
55class nullcontext(object):
56 """Context manager that does no additional processing.
57
58 Vendored from Python 3.7.
59
60 """
61
62 def __init__(self, enter_result=None):
63 self.enter_result = enter_result
64
65 def __enter__(self):
66 return self.enter_result
67
68 def __exit__(self, *excinfo):
69 pass
70
71
72try:
73 import threading
74except ImportError:
75 import dummy_threading as threading # noqa
76
77
78def inspect_getfullargspec(func):
79 """Fully vendored version of getfullargspec from Python 3.3."""
80
81 if inspect.ismethod(func):
82 func = func.__func__
83 if not inspect.isfunction(func):
84 raise TypeError("{!r} is not a Python function".format(func))
85
86 co = func.__code__
87 if not inspect.iscode(co):
88 raise TypeError("{!r} is not a code object".format(co))
89
90 nargs = co.co_argcount
91 names = co.co_varnames
92 nkwargs = co.co_kwonlyargcount if py3k else 0
93 args = list(names[:nargs])
94 kwonlyargs = list(names[nargs : nargs + nkwargs])
95
96 nargs += nkwargs
97 varargs = None
98 if co.co_flags & inspect.CO_VARARGS:
99 varargs = co.co_varnames[nargs]
100 nargs = nargs + 1
101 varkw = None
102 if co.co_flags & inspect.CO_VARKEYWORDS:
103 varkw = co.co_varnames[nargs]
104
105 return FullArgSpec(
106 args,
107 varargs,
108 varkw,
109 func.__defaults__,
110 kwonlyargs,
111 func.__kwdefaults__ if py3k else None,
112 func.__annotations__ if py3k else {},
113 )
114
115
116if py38:
117 from importlib import metadata as importlib_metadata
118else:
119 import importlib_metadata # noqa
120
121
122def importlib_metadata_get(group):
123 ep = importlib_metadata.entry_points()
124 if hasattr(ep, "select"):
125 return ep.select(group=group)
126 else:
127 return ep.get(group, ())
128
129
130if py3k:
131 import base64
132 import builtins
133 import configparser
134 import itertools
135 import pickle
136
137 from functools import reduce
138 from io import BytesIO as byte_buffer
139 from io import StringIO
140 from itertools import zip_longest
141 from time import perf_counter
142 from urllib.parse import (
143 quote_plus,
144 unquote_plus,
145 parse_qsl,
146 quote,
147 unquote,
148 )
149
150 string_types = (str,)
151 binary_types = (bytes,)
152 binary_type = bytes
153 text_type = str
154 int_types = (int,)
155 iterbytes = iter
156 long_type = int
157
158 itertools_filterfalse = itertools.filterfalse
159 itertools_filter = filter
160 itertools_imap = map
161
162 exec_ = getattr(builtins, "exec")
163 import_ = getattr(builtins, "__import__")
164 print_ = getattr(builtins, "print")
165
166 def b(s):
167 return s.encode("latin-1")
168
169 def b64decode(x):
170 return base64.b64decode(x.encode("ascii"))
171
172 def b64encode(x):
173 return base64.b64encode(x).decode("ascii")
174
175 def decode_backslashreplace(text, encoding):
176 return text.decode(encoding, errors="backslashreplace")
177
178 def cmp(a, b):
179 return (a > b) - (a < b)
180
181 def raise_(
182 exception, with_traceback=None, replace_context=None, from_=False
183 ):
184 r"""implement "raise" with cause support.
185
186 :param exception: exception to raise
187 :param with_traceback: will call exception.with_traceback()
188 :param replace_context: an as-yet-unsupported feature. This is
189 an exception object which we are "replacing", e.g., it's our
190 "cause" but we don't want it printed. Basically just what
191 ``__suppress_context__`` does but we don't want to suppress
192 the enclosing context, if any. So for now we make it the
193 cause.
194 :param from\_: the cause. this actually sets the cause and doesn't
195 hope to hide it someday.
196
197 """
198 if with_traceback is not None:
199 exception = exception.with_traceback(with_traceback)
200
201 if from_ is not False:
202 exception.__cause__ = from_
203 elif replace_context is not None:
204 # no good solution here, we would like to have the exception
205 # have only the context of replace_context.__context__ so that the
206 # intermediary exception does not change, but we can't figure
207 # that out.
208 exception.__cause__ = replace_context
209
210 try:
211 raise exception
212 finally:
213 # credit to
214 # https://cosmicpercolator.com/2016/01/13/exception-leaks-in-python-2-and-3/
215 # as the __traceback__ object creates a cycle
216 del exception, replace_context, from_, with_traceback
217
218 def u(s):
219 return s
220
221 def ue(s):
222 return s
223
224 from typing import TYPE_CHECKING
225
226 # Unused. Kept for backwards compatibility.
227 callable = callable # noqa
228
229 from abc import ABC
230
231 def _qualname(fn):
232 return fn.__qualname__
233
234
235else:
236 import base64
237 import ConfigParser as configparser # noqa
238 import itertools
239
240 from StringIO import StringIO # noqa
241 from cStringIO import StringIO as byte_buffer # noqa
242 from itertools import izip_longest as zip_longest # noqa
243 from time import clock as perf_counter # noqa
244 from urllib import quote # noqa
245 from urllib import quote_plus # noqa
246 from urllib import unquote # noqa
247 from urllib import unquote_plus # noqa
248 from urlparse import parse_qsl # noqa
249
250 from abc import ABCMeta
251
252 class ABC(object):
253 __metaclass__ = ABCMeta
254
255 try:
256 import cPickle as pickle
257 except ImportError:
258 import pickle # noqa
259
260 string_types = (basestring,) # noqa
261 binary_types = (bytes,)
262 binary_type = str
263 text_type = unicode # noqa
264 int_types = int, long # noqa
265 long_type = long # noqa
266
267 callable = callable # noqa
268 cmp = cmp # noqa
269 reduce = reduce # noqa
270
271 b64encode = base64.b64encode
272 b64decode = base64.b64decode
273
274 itertools_filterfalse = itertools.ifilterfalse
275 itertools_filter = itertools.ifilter
276 itertools_imap = itertools.imap
277
278 def b(s):
279 return s
280
281 def exec_(func_text, globals_, lcl=None):
282 if lcl is None:
283 exec("exec func_text in globals_")
284 else:
285 exec("exec func_text in globals_, lcl")
286
287 def iterbytes(buf):
288 return (ord(byte) for byte in buf)
289
290 def import_(*args):
291 if len(args) == 4:
292 args = args[0:3] + ([str(arg) for arg in args[3]],)
293 return __import__(*args)
294
295 def print_(*args, **kwargs):
296 fp = kwargs.pop("file", sys.stdout)
297 if fp is None:
298 return
299 for arg in enumerate(args):
300 if not isinstance(arg, basestring): # noqa
301 arg = str(arg)
302 fp.write(arg)
303
304 def u(s):
305 # this differs from what six does, which doesn't support non-ASCII
306 # strings - we only use u() with
307 # literal source strings, and all our source files with non-ascii
308 # in them (all are tests) are utf-8 encoded.
309 return unicode(s, "utf-8") # noqa
310
311 def ue(s):
312 return unicode(s, "unicode_escape") # noqa
313
314 def decode_backslashreplace(text, encoding):
315 try:
316 return text.decode(encoding)
317 except UnicodeDecodeError:
318 # regular "backslashreplace" for an incompatible encoding raises:
319 # "TypeError: don't know how to handle UnicodeDecodeError in
320 # error callback"
321 return repr(text)[1:-1].decode()
322
323 def safe_bytestring(text):
324 # py2k only
325 if not isinstance(text, string_types):
326 return unicode(text).encode( # noqa: F821
327 "ascii", errors="backslashreplace"
328 )
329 elif isinstance(text, unicode): # noqa: F821
330 return text.encode("ascii", errors="backslashreplace")
331 else:
332 return text
333
334 exec(
335 "def raise_(exception, with_traceback=None, replace_context=None, "
336 "from_=False):\n"
337 " if with_traceback:\n"
338 " raise type(exception), exception, with_traceback\n"
339 " else:\n"
340 " raise exception\n"
341 )
342
343 TYPE_CHECKING = False
344
345 def _qualname(meth):
346 """return __qualname__ equivalent for a method on a class"""
347
348 for cls in meth.im_class.__mro__:
349 if meth.__name__ in cls.__dict__:
350 break
351 else:
352 return meth.__name__
353
354 return "%s.%s" % (cls.__name__, meth.__name__)
355
356
357if py3k:
358
359 def _formatannotation(annotation, base_module=None):
360 """vendored from python 3.7"""
361
362 if getattr(annotation, "__module__", None) == "typing":
363 return repr(annotation).replace("typing.", "")
364 if isinstance(annotation, type):
365 if annotation.__module__ in ("builtins", base_module):
366 return annotation.__qualname__
367 return annotation.__module__ + "." + annotation.__qualname__
368 return repr(annotation)
369
370 def inspect_formatargspec(
371 args,
372 varargs=None,
373 varkw=None,
374 defaults=None,
375 kwonlyargs=(),
376 kwonlydefaults={},
377 annotations={},
378 formatarg=str,
379 formatvarargs=lambda name: "*" + name,
380 formatvarkw=lambda name: "**" + name,
381 formatvalue=lambda value: "=" + repr(value),
382 formatreturns=lambda text: " -> " + text,
383 formatannotation=_formatannotation,
384 ):
385 """Copy formatargspec from python 3.7 standard library.
386
387 Python 3 has deprecated formatargspec and requested that Signature
388 be used instead, however this requires a full reimplementation
389 of formatargspec() in terms of creating Parameter objects and such.
390 Instead of introducing all the object-creation overhead and having
391 to reinvent from scratch, just copy their compatibility routine.
392
393 Ultimately we would need to rewrite our "decorator" routine completely
394 which is not really worth it right now, until all Python 2.x support
395 is dropped.
396
397 """
398
399 kwonlydefaults = kwonlydefaults or {}
400 annotations = annotations or {}
401
402 def formatargandannotation(arg):
403 result = formatarg(arg)
404 if arg in annotations:
405 result += ": " + formatannotation(annotations[arg])
406 return result
407
408 specs = []
409 if defaults:
410 firstdefault = len(args) - len(defaults)
411 for i, arg in enumerate(args):
412 spec = formatargandannotation(arg)
413 if defaults and i >= firstdefault:
414 spec = spec + formatvalue(defaults[i - firstdefault])
415 specs.append(spec)
416
417 if varargs is not None:
418 specs.append(formatvarargs(formatargandannotation(varargs)))
419 else:
420 if kwonlyargs:
421 specs.append("*")
422
423 if kwonlyargs:
424 for kwonlyarg in kwonlyargs:
425 spec = formatargandannotation(kwonlyarg)
426 if kwonlydefaults and kwonlyarg in kwonlydefaults:
427 spec += formatvalue(kwonlydefaults[kwonlyarg])
428 specs.append(spec)
429
430 if varkw is not None:
431 specs.append(formatvarkw(formatargandannotation(varkw)))
432
433 result = "(" + ", ".join(specs) + ")"
434 if "return" in annotations:
435 result += formatreturns(formatannotation(annotations["return"]))
436 return result
437
438
439else:
440 from inspect import formatargspec as _inspect_formatargspec
441
442 def inspect_formatargspec(*spec, **kw):
443 # convert for a potential FullArgSpec from compat.getfullargspec()
444 return _inspect_formatargspec(*spec[0:4], **kw) # noqa
445
446
447# Fix deprecation of accessing ABCs straight from collections module
448# (which will stop working in 3.8).
449if py3k:
450 import collections.abc as collections_abc
451else:
452 import collections as collections_abc # noqa
453
454
455if py37:
456 import dataclasses
457
458 def dataclass_fields(cls):
459 """Return a sequence of all dataclasses.Field objects associated
460 with a class."""
461
462 if dataclasses.is_dataclass(cls):
463 return dataclasses.fields(cls)
464 else:
465 return []
466
467 def local_dataclass_fields(cls):
468 """Return a sequence of all dataclasses.Field objects associated with
469 a class, excluding those that originate from a superclass."""
470
471 if dataclasses.is_dataclass(cls):
472 super_fields = set()
473 for sup in cls.__bases__:
474 super_fields.update(dataclass_fields(sup))
475 return [
476 f for f in dataclasses.fields(cls) if f not in super_fields
477 ]
478 else:
479 return []
480
481
482else:
483
484 def dataclass_fields(cls):
485 return []
486
487 def local_dataclass_fields(cls):
488 return []
489
490
491def raise_from_cause(exception, exc_info=None):
492 r"""legacy. use raise\_()"""
493
494 if exc_info is None:
495 exc_info = sys.exc_info()
496 exc_type, exc_value, exc_tb = exc_info
497 cause = exc_value if exc_value is not exception else None
498 reraise(type(exception), exception, tb=exc_tb, cause=cause)
499
500
501def reraise(tp, value, tb=None, cause=None):
502 r"""legacy. use raise\_()"""
503
504 raise_(value, with_traceback=tb, from_=cause)
505
506
507def with_metaclass(meta, *bases, **kw):
508 """Create a base class with a metaclass.
509
510 Drops the middle class upon creation.
511
512 Source: https://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/
513
514 """
515
516 class metaclass(meta):
517 __call__ = type.__call__
518 __init__ = type.__init__
519
520 def __new__(cls, name, this_bases, d):
521 if this_bases is None:
522 cls = type.__new__(cls, name, (), d)
523 else:
524 cls = meta(name, bases, d)
525
526 if hasattr(cls, "__init_subclass__") and hasattr(
527 cls.__init_subclass__, "__func__"
528 ):
529 cls.__init_subclass__.__func__(cls, **kw)
530 return cls
531
532 return metaclass("temporary_class", None, {})
533
534
535if py3k:
536 from datetime import timezone
537else:
538 from datetime import datetime
539 from datetime import timedelta
540 from datetime import tzinfo
541
542 class timezone(tzinfo):
543 """Minimal port of python 3 timezone object"""
544
545 __slots__ = "_offset"
546
547 def __init__(self, offset):
548 if not isinstance(offset, timedelta):
549 raise TypeError("offset must be a timedelta")
550 if not self._minoffset <= offset <= self._maxoffset:
551 raise ValueError(
552 "offset must be a timedelta "
553 "strictly between -timedelta(hours=24) and "
554 "timedelta(hours=24)."
555 )
556 self._offset = offset
557
558 def __eq__(self, other):
559 if type(other) != timezone:
560 return False
561 return self._offset == other._offset
562
563 def __hash__(self):
564 return hash(self._offset)
565
566 def __repr__(self):
567 return "sqlalchemy.util.%s(%r)" % (
568 self.__class__.__name__,
569 self._offset,
570 )
571
572 def __str__(self):
573 return self.tzname(None)
574
575 def utcoffset(self, dt):
576 return self._offset
577
578 def tzname(self, dt):
579 return self._name_from_offset(self._offset)
580
581 def dst(self, dt):
582 return None
583
584 def fromutc(self, dt):
585 if isinstance(dt, datetime):
586 if dt.tzinfo is not self:
587 raise ValueError("fromutc: dt.tzinfo " "is not self")
588 return dt + self._offset
589 raise TypeError(
590 "fromutc() argument must be a datetime instance" " or None"
591 )
592
593 @staticmethod
594 def _timedelta_to_microseconds(timedelta):
595 """backport of timedelta._to_microseconds()"""
596 return (
597 timedelta.days * (24 * 3600) + timedelta.seconds
598 ) * 1000000 + timedelta.microseconds
599
600 @staticmethod
601 def _divmod_timedeltas(a, b):
602 """backport of timedelta.__divmod__"""
603
604 q, r = divmod(
605 timezone._timedelta_to_microseconds(a),
606 timezone._timedelta_to_microseconds(b),
607 )
608 return q, timedelta(0, 0, r)
609
610 @staticmethod
611 def _name_from_offset(delta):
612 if not delta:
613 return "UTC"
614 if delta < timedelta(0):
615 sign = "-"
616 delta = -delta
617 else:
618 sign = "+"
619 hours, rest = timezone._divmod_timedeltas(
620 delta, timedelta(hours=1)
621 )
622 minutes, rest = timezone._divmod_timedeltas(
623 rest, timedelta(minutes=1)
624 )
625 result = "UTC%s%02d:%02d" % (sign, hours, minutes)
626 if rest.seconds:
627 result += ":%02d" % (rest.seconds,)
628 if rest.microseconds:
629 result += ".%06d" % (rest.microseconds,)
630 return result
631
632 _maxoffset = timedelta(hours=23, minutes=59)
633 _minoffset = -_maxoffset
634
635 timezone.utc = timezone(timedelta(0))