1# util/compat.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8"""Handle Python version/platform incompatibilities."""
9
10import collections
11import contextlib
12import inspect
13import operator
14import platform
15import sys
16
17
18py36 = sys.version_info >= (3, 6)
19py33 = sys.version_info >= (3, 3)
20py35 = sys.version_info >= (3, 5)
21py32 = sys.version_info >= (3, 2)
22py3k = sys.version_info >= (3, 0)
23py2k = sys.version_info < (3, 0)
24py265 = sys.version_info >= (2, 6, 5)
25jython = sys.platform.startswith("java")
26pypy = hasattr(sys, "pypy_version_info")
27
28win32 = sys.platform.startswith("win")
29osx = sys.platform.startswith("darwin")
30cpython = not pypy and not jython # TODO: something better for this ?
31arm = "aarch" in platform.machine().lower()
32
33
34contextmanager = contextlib.contextmanager
35dottedgetter = operator.attrgetter
36namedtuple = collections.namedtuple
37next = next # noqa
38
39FullArgSpec = collections.namedtuple(
40 "FullArgSpec",
41 [
42 "args",
43 "varargs",
44 "varkw",
45 "defaults",
46 "kwonlyargs",
47 "kwonlydefaults",
48 "annotations",
49 ],
50)
51
52try:
53 import threading
54except ImportError:
55 import dummy_threading as threading # noqa
56
57
58# work around http://bugs.python.org/issue2646
59if py265:
60 safe_kwarg = lambda arg: arg # noqa
61else:
62 safe_kwarg = str
63
64
65def inspect_getfullargspec(func):
66 """Fully vendored version of getfullargspec from Python 3.3."""
67
68 if inspect.ismethod(func):
69 func = func.__func__
70 if not inspect.isfunction(func):
71 raise TypeError("{!r} is not a Python function".format(func))
72
73 co = func.__code__
74 if not inspect.iscode(co):
75 raise TypeError("{!r} is not a code object".format(co))
76
77 nargs = co.co_argcount
78 names = co.co_varnames
79 nkwargs = co.co_kwonlyargcount if py3k else 0
80 args = list(names[:nargs])
81 kwonlyargs = list(names[nargs : nargs + nkwargs])
82
83 nargs += nkwargs
84 varargs = None
85 if co.co_flags & inspect.CO_VARARGS:
86 varargs = co.co_varnames[nargs]
87 nargs = nargs + 1
88 varkw = None
89 if co.co_flags & inspect.CO_VARKEYWORDS:
90 varkw = co.co_varnames[nargs]
91
92 return FullArgSpec(
93 args,
94 varargs,
95 varkw,
96 func.__defaults__,
97 kwonlyargs,
98 func.__kwdefaults__ if py3k else None,
99 func.__annotations__ if py3k else {},
100 )
101
102
103if py3k:
104 import base64
105 import builtins
106 import configparser
107 import itertools
108 import pickle
109
110 from functools import reduce
111 from io import BytesIO as byte_buffer
112 from io import StringIO
113 from itertools import zip_longest
114 from urllib.parse import (
115 quote_plus,
116 unquote_plus,
117 parse_qsl,
118 quote,
119 unquote,
120 )
121
122 string_types = (str,)
123 binary_types = (bytes,)
124 binary_type = bytes
125 text_type = str
126 int_types = (int,)
127 iterbytes = iter
128
129 itertools_filterfalse = itertools.filterfalse
130 itertools_filter = filter
131 itertools_imap = map
132
133 exec_ = getattr(builtins, "exec")
134 import_ = getattr(builtins, "__import__")
135 print_ = getattr(builtins, "print")
136
137 def b(s):
138 return s.encode("latin-1")
139
140 def b64decode(x):
141 return base64.b64decode(x.encode("ascii"))
142
143 def b64encode(x):
144 return base64.b64encode(x).decode("ascii")
145
146 def decode_backslashreplace(text, encoding):
147 return text.decode(encoding, errors="backslashreplace")
148
149 def cmp(a, b):
150 return (a > b) - (a < b)
151
152 def raise_(
153 exception, with_traceback=None, replace_context=None, from_=False
154 ):
155 r"""implement "raise" with cause support.
156
157 :param exception: exception to raise
158 :param with_traceback: will call exception.with_traceback()
159 :param replace_context: an as-yet-unsupported feature. This is
160 an exception object which we are "replacing", e.g., it's our
161 "cause" but we don't want it printed. Basically just what
162 ``__suppress_context__`` does but we don't want to suppress
163 the enclosing context, if any. So for now we make it the
164 cause.
165 :param from\_: the cause. this actually sets the cause and doesn't
166 hope to hide it someday.
167
168 """
169 if with_traceback is not None:
170 exception = exception.with_traceback(with_traceback)
171
172 if from_ is not False:
173 exception.__cause__ = from_
174 elif replace_context is not None:
175 # no good solution here, we would like to have the exception
176 # have only the context of replace_context.__context__ so that the
177 # intermediary exception does not change, but we can't figure
178 # that out.
179 exception.__cause__ = replace_context
180
181 try:
182 raise exception
183 finally:
184 # credit to
185 # https://cosmicpercolator.com/2016/01/13/exception-leaks-in-python-2-and-3/
186 # as the __traceback__ object creates a cycle
187 del exception, replace_context, from_, with_traceback
188
189 if py35:
190 from typing import TYPE_CHECKING
191 else:
192 TYPE_CHECKING = False
193
194 def u(s):
195 return s
196
197 def ue(s):
198 return s
199
200 if py32:
201 callable = callable # noqa
202 else:
203
204 def callable(fn): # noqa
205 return hasattr(fn, "__call__")
206
207
208else:
209 import base64
210 import ConfigParser as configparser # noqa
211 import itertools
212
213 from StringIO import StringIO # noqa
214 from cStringIO import StringIO as byte_buffer # noqa
215 from itertools import izip_longest as zip_longest # noqa
216 from urllib import quote # noqa
217 from urllib import quote_plus # noqa
218 from urllib import unquote # noqa
219 from urllib import unquote_plus # noqa
220 from urlparse import parse_qsl # noqa
221
222 try:
223 import cPickle as pickle
224 except ImportError:
225 import pickle # noqa
226
227 string_types = (basestring,) # noqa
228 binary_types = (bytes,)
229 binary_type = str
230 text_type = unicode # noqa
231 int_types = int, long # noqa
232
233 callable = callable # noqa
234 cmp = cmp # noqa
235 reduce = reduce # noqa
236
237 b64encode = base64.b64encode
238 b64decode = base64.b64decode
239
240 itertools_filterfalse = itertools.ifilterfalse
241 itertools_filter = itertools.ifilter
242 itertools_imap = itertools.imap
243
244 def b(s):
245 return s
246
247 def exec_(func_text, globals_, lcl=None):
248 if lcl is None:
249 exec("exec func_text in globals_")
250 else:
251 exec("exec func_text in globals_, lcl")
252
253 def iterbytes(buf):
254 return (ord(byte) for byte in buf)
255
256 def import_(*args):
257 if len(args) == 4:
258 args = args[0:3] + ([str(arg) for arg in args[3]],)
259 return __import__(*args)
260
261 def print_(*args, **kwargs):
262 fp = kwargs.pop("file", sys.stdout)
263 if fp is None:
264 return
265 for arg in enumerate(args):
266 if not isinstance(arg, basestring): # noqa
267 arg = str(arg)
268 fp.write(arg)
269
270 def u(s):
271 # this differs from what six does, which doesn't support non-ASCII
272 # strings - we only use u() with
273 # literal source strings, and all our source files with non-ascii
274 # in them (all are tests) are utf-8 encoded.
275 return unicode(s, "utf-8") # noqa
276
277 def ue(s):
278 return unicode(s, "unicode_escape") # noqa
279
280 def decode_backslashreplace(text, encoding):
281 try:
282 return text.decode(encoding)
283 except UnicodeDecodeError:
284 # regular "backslashreplace" for an incompatible encoding raises:
285 # "TypeError: don't know how to handle UnicodeDecodeError in
286 # error callback"
287 return repr(text)[1:-1].decode()
288
289 def safe_bytestring(text):
290 # py2k only
291 if not isinstance(text, string_types):
292 return unicode(text).encode("ascii", errors="backslashreplace")
293 elif isinstance(text, unicode):
294 return text.encode("ascii", errors="backslashreplace")
295 else:
296 return text
297
298 exec(
299 "def raise_(exception, with_traceback=None, replace_context=None, "
300 "from_=False):\n"
301 " if with_traceback:\n"
302 " raise type(exception), exception, with_traceback\n"
303 " else:\n"
304 " raise exception\n"
305 )
306
307 TYPE_CHECKING = False
308
309if py35:
310
311 def _formatannotation(annotation, base_module=None):
312 """vendored from python 3.7"""
313
314 if getattr(annotation, "__module__", None) == "typing":
315 return repr(annotation).replace("typing.", "")
316 if isinstance(annotation, type):
317 if annotation.__module__ in ("builtins", base_module):
318 return annotation.__qualname__
319 return annotation.__module__ + "." + annotation.__qualname__
320 return repr(annotation)
321
322 def inspect_formatargspec(
323 args,
324 varargs=None,
325 varkw=None,
326 defaults=None,
327 kwonlyargs=(),
328 kwonlydefaults={},
329 annotations={},
330 formatarg=str,
331 formatvarargs=lambda name: "*" + name,
332 formatvarkw=lambda name: "**" + name,
333 formatvalue=lambda value: "=" + repr(value),
334 formatreturns=lambda text: " -> " + text,
335 formatannotation=_formatannotation,
336 ):
337 """Copy formatargspec from python 3.7 standard library.
338
339 Python 3 has deprecated formatargspec and requested that Signature
340 be used instead, however this requires a full reimplementation
341 of formatargspec() in terms of creating Parameter objects and such.
342 Instead of introducing all the object-creation overhead and having
343 to reinvent from scratch, just copy their compatibility routine.
344
345 Utimately we would need to rewrite our "decorator" routine completely
346 which is not really worth it right now, until all Python 2.x support
347 is dropped.
348
349 """
350
351 def formatargandannotation(arg):
352 result = formatarg(arg)
353 if arg in annotations:
354 result += ": " + formatannotation(annotations[arg])
355 return result
356
357 specs = []
358 if defaults:
359 firstdefault = len(args) - len(defaults)
360 for i, arg in enumerate(args):
361 spec = formatargandannotation(arg)
362 if defaults and i >= firstdefault:
363 spec = spec + formatvalue(defaults[i - firstdefault])
364 specs.append(spec)
365
366 if varargs is not None:
367 specs.append(formatvarargs(formatargandannotation(varargs)))
368 else:
369 if kwonlyargs:
370 specs.append("*")
371
372 if kwonlyargs:
373 for kwonlyarg in kwonlyargs:
374 spec = formatargandannotation(kwonlyarg)
375 if kwonlydefaults and kwonlyarg in kwonlydefaults:
376 spec += formatvalue(kwonlydefaults[kwonlyarg])
377 specs.append(spec)
378
379 if varkw is not None:
380 specs.append(formatvarkw(formatargandannotation(varkw)))
381
382 result = "(" + ", ".join(specs) + ")"
383 if "return" in annotations:
384 result += formatreturns(formatannotation(annotations["return"]))
385 return result
386
387
388elif py2k:
389 from inspect import formatargspec as _inspect_formatargspec
390
391 def inspect_formatargspec(*spec, **kw):
392 # convert for a potential FullArgSpec from compat.getfullargspec()
393 return _inspect_formatargspec(*spec[0:4], **kw) # noqa
394
395
396else:
397 from inspect import formatargspec as inspect_formatargspec # noqa
398
399
400# Fix deprecation of accessing ABCs straight from collections module
401# (which will stop working in 3.8).
402if py33:
403 import collections.abc as collections_abc
404else:
405 import collections as collections_abc # noqa
406
407
408@contextlib.contextmanager
409def nested(*managers):
410 """Implement contextlib.nested, mostly for unit tests.
411
412 As tests still need to run on py2.6 we can't use multiple-with yet.
413
414 Function is removed in py3k but also emits deprecation warning in 2.7
415 so just roll it here for everyone.
416
417 """
418
419 exits = []
420 vars_ = []
421 exc = (None, None, None)
422 try:
423 for mgr in managers:
424 exit_ = mgr.__exit__
425 enter = mgr.__enter__
426 vars_.append(enter())
427 exits.append(exit_)
428 yield vars_
429 except:
430 exc = sys.exc_info()
431 finally:
432 while exits:
433 exit_ = exits.pop() # noqa
434 try:
435 if exit_(*exc):
436 exc = (None, None, None)
437 except:
438 exc = sys.exc_info()
439 if exc != (None, None, None):
440 reraise(exc[0], exc[1], exc[2])
441
442
443def raise_from_cause(exception, exc_info=None):
444 r"""legacy. use raise\_()"""
445
446 if exc_info is None:
447 exc_info = sys.exc_info()
448 exc_type, exc_value, exc_tb = exc_info
449 cause = exc_value if exc_value is not exception else None
450 reraise(type(exception), exception, tb=exc_tb, cause=cause)
451
452
453def reraise(tp, value, tb=None, cause=None):
454 r"""legacy. use raise\_()"""
455
456 raise_(value, with_traceback=tb, from_=cause)
457
458
459def with_metaclass(meta, *bases):
460 """Create a base class with a metaclass.
461
462 Drops the middle class upon creation.
463
464 Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/
465
466 """
467
468 class metaclass(meta):
469 __call__ = type.__call__
470 __init__ = type.__init__
471
472 def __new__(cls, name, this_bases, d):
473 if this_bases is None:
474 return type.__new__(cls, name, (), d)
475 return meta(name, bases, d)
476
477 return metaclass("temporary_class", None, {})
478
479
480if py3k:
481 from datetime import timezone
482else:
483 from datetime import datetime
484 from datetime import timedelta
485 from datetime import tzinfo
486
487 class timezone(tzinfo):
488 """Minimal port of python 3 timezone object"""
489
490 __slots__ = "_offset"
491
492 def __init__(self, offset):
493 if not isinstance(offset, timedelta):
494 raise TypeError("offset must be a timedelta")
495 if not self._minoffset <= offset <= self._maxoffset:
496 raise ValueError(
497 "offset must be a timedelta "
498 "strictly between -timedelta(hours=24) and "
499 "timedelta(hours=24)."
500 )
501 self._offset = offset
502
503 def __eq__(self, other):
504 if type(other) != timezone:
505 return False
506 return self._offset == other._offset
507
508 def __hash__(self):
509 return hash(self._offset)
510
511 def __repr__(self):
512 return "sqlalchemy.util.%s(%r)" % (
513 self.__class__.__name__,
514 self._offset,
515 )
516
517 def __str__(self):
518 return self.tzname(None)
519
520 def utcoffset(self, dt):
521 return self._offset
522
523 def tzname(self, dt):
524 return self._name_from_offset(self._offset)
525
526 def dst(self, dt):
527 return None
528
529 def fromutc(self, dt):
530 if isinstance(dt, datetime):
531 if dt.tzinfo is not self:
532 raise ValueError("fromutc: dt.tzinfo " "is not self")
533 return dt + self._offset
534 raise TypeError(
535 "fromutc() argument must be a datetime instance" " or None"
536 )
537
538 @staticmethod
539 def _timedelta_to_microseconds(timedelta):
540 """backport of timedelta._to_microseconds()"""
541 return (
542 timedelta.days * (24 * 3600) + timedelta.seconds
543 ) * 1000000 + timedelta.microseconds
544
545 @staticmethod
546 def _divmod_timedeltas(a, b):
547 """backport of timedelta.__divmod__"""
548
549 q, r = divmod(
550 timezone._timedelta_to_microseconds(a),
551 timezone._timedelta_to_microseconds(b),
552 )
553 return q, timedelta(0, 0, r)
554
555 @staticmethod
556 def _name_from_offset(delta):
557 if not delta:
558 return "UTC"
559 if delta < timedelta(0):
560 sign = "-"
561 delta = -delta
562 else:
563 sign = "+"
564 hours, rest = timezone._divmod_timedeltas(
565 delta, timedelta(hours=1)
566 )
567 minutes, rest = timezone._divmod_timedeltas(
568 rest, timedelta(minutes=1)
569 )
570 result = "UTC%s%02d:%02d" % (sign, hours, minutes)
571 if rest.seconds:
572 result += ":%02d" % (rest.seconds,)
573 if rest.microseconds:
574 result += ".%06d" % (rest.microseconds,)
575 return result
576
577 _maxoffset = timedelta(hours=23, minutes=59)
578 _minoffset = -_maxoffset
579
580 timezone.utc = timezone(timedelta(0))