1# util/compat.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7# mypy: allow-untyped-defs, allow-untyped-calls 
    8 
    9"""Handle Python version/platform incompatibilities.""" 
    10 
    11from __future__ import annotations 
    12 
    13import base64 
    14import dataclasses 
    15import hashlib 
    16from importlib import metadata as importlib_metadata 
    17import inspect 
    18import operator 
    19import platform 
    20import sys 
    21import sysconfig 
    22import typing 
    23from typing import Any 
    24from typing import Callable 
    25from typing import Dict 
    26from typing import Iterable 
    27from typing import List 
    28from typing import Mapping 
    29from typing import Optional 
    30from typing import Sequence 
    31from typing import Set 
    32from typing import Tuple 
    33from typing import Type 
    34 
    35py314b1 = sys.version_info >= (3, 14, 0, "beta", 1) 
    36py314 = sys.version_info >= (3, 14) 
    37py313 = sys.version_info >= (3, 13) 
    38py312 = sys.version_info >= (3, 12) 
    39py311 = sys.version_info >= (3, 11) 
    40pypy = platform.python_implementation() == "PyPy" 
    41cpython = platform.python_implementation() == "CPython" 
    42freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) 
    43 
    44win32 = sys.platform.startswith("win") 
    45osx = sys.platform.startswith("darwin") 
    46arm = "aarch" in platform.machine().lower() 
    47is64bit = sys.maxsize > 2**32 
    48 
    49has_refcount_gc = bool(cpython) 
    50 
    51dottedgetter = operator.attrgetter 
    52 
    53 
    54class FullArgSpec(typing.NamedTuple): 
    55    args: List[str] 
    56    varargs: Optional[str] 
    57    varkw: Optional[str] 
    58    defaults: Optional[Tuple[Any, ...]] 
    59    kwonlyargs: List[str] 
    60    kwonlydefaults: Optional[Dict[str, Any]] 
    61    annotations: Dict[str, Any] 
    62 
    63 
    64def inspect_getfullargspec(func: Callable[..., Any]) -> FullArgSpec: 
    65    """Fully vendored version of getfullargspec from Python 3.3.""" 
    66 
    67    if inspect.ismethod(func): 
    68        func = func.__func__ 
    69    if not inspect.isfunction(func): 
    70        raise TypeError(f"{func!r} is not a Python function") 
    71 
    72    co = func.__code__ 
    73    if not inspect.iscode(co): 
    74        raise TypeError(f"{co!r} is not a code object") 
    75 
    76    nargs = co.co_argcount 
    77    names = co.co_varnames 
    78    nkwargs = co.co_kwonlyargcount 
    79    args = list(names[:nargs]) 
    80    kwonlyargs = list(names[nargs : nargs + nkwargs]) 
    81 
    82    nargs += nkwargs 
    83    varargs = None 
    84    if co.co_flags & inspect.CO_VARARGS: 
    85        varargs = co.co_varnames[nargs] 
    86        nargs = nargs + 1 
    87    varkw = None 
    88    if co.co_flags & inspect.CO_VARKEYWORDS: 
    89        varkw = co.co_varnames[nargs] 
    90 
    91    return FullArgSpec( 
    92        args, 
    93        varargs, 
    94        varkw, 
    95        func.__defaults__, 
    96        kwonlyargs, 
    97        func.__kwdefaults__, 
    98        func.__annotations__, 
    99    ) 
    100 
    101 
    102# python stubs don't have a public type for this. not worth 
    103# making a protocol 
    104def md5_not_for_security() -> Any: 
    105    return hashlib.md5(usedforsecurity=False) 
    106 
    107 
    108def importlib_metadata_get(group): 
    109    ep = importlib_metadata.entry_points() 
    110    if typing.TYPE_CHECKING or hasattr(ep, "select"): 
    111        return ep.select(group=group) 
    112    else: 
    113        return ep.get(group, ()) 
    114 
    115 
    116def b(s): 
    117    return s.encode("latin-1") 
    118 
    119 
    120def b64decode(x: str) -> bytes: 
    121    return base64.b64decode(x.encode("ascii")) 
    122 
    123 
    124def b64encode(x: bytes) -> str: 
    125    return base64.b64encode(x).decode("ascii") 
    126 
    127 
    128def decode_backslashreplace(text: bytes, encoding: str) -> str: 
    129    return text.decode(encoding, errors="backslashreplace") 
    130 
    131 
    132def cmp(a, b): 
    133    return (a > b) - (a < b) 
    134 
    135 
    136def _formatannotation(annotation, base_module=None): 
    137    """vendored from python 3.7""" 
    138 
    139    if isinstance(annotation, str): 
    140        return annotation 
    141 
    142    if getattr(annotation, "__module__", None) == "typing": 
    143        return repr(annotation).replace("typing.", "").replace("~", "") 
    144    if isinstance(annotation, type): 
    145        if annotation.__module__ in ("builtins", base_module): 
    146            return repr(annotation.__qualname__) 
    147        return annotation.__module__ + "." + annotation.__qualname__ 
    148    elif isinstance(annotation, typing.TypeVar): 
    149        return repr(annotation).replace("~", "") 
    150    return repr(annotation).replace("~", "") 
    151 
    152 
    153def inspect_formatargspec( 
    154    args: List[str], 
    155    varargs: Optional[str] = None, 
    156    varkw: Optional[str] = None, 
    157    defaults: Optional[Sequence[Any]] = None, 
    158    kwonlyargs: Optional[Sequence[str]] = (), 
    159    kwonlydefaults: Optional[Mapping[str, Any]] = {}, 
    160    annotations: Mapping[str, Any] = {}, 
    161    formatarg: Callable[[str], str] = str, 
    162    formatvarargs: Callable[[str], str] = lambda name: "*" + name, 
    163    formatvarkw: Callable[[str], str] = lambda name: "**" + name, 
    164    formatvalue: Callable[[Any], str] = lambda value: "=" + repr(value), 
    165    formatreturns: Callable[[Any], str] = lambda text: " -> " + str(text), 
    166    formatannotation: Callable[[Any], str] = _formatannotation, 
    167) -> str: 
    168    """Copy formatargspec from python 3.7 standard library. 
    169 
    170    Python 3 has deprecated formatargspec and requested that Signature 
    171    be used instead, however this requires a full reimplementation 
    172    of formatargspec() in terms of creating Parameter objects and such. 
    173    Instead of introducing all the object-creation overhead and having 
    174    to reinvent from scratch, just copy their compatibility routine. 
    175 
    176    Ultimately we would need to rewrite our "decorator" routine completely 
    177    which is not really worth it right now, until all Python 2.x support 
    178    is dropped. 
    179 
    180    """ 
    181 
    182    kwonlydefaults = kwonlydefaults or {} 
    183    annotations = annotations or {} 
    184 
    185    def formatargandannotation(arg): 
    186        result = formatarg(arg) 
    187        if arg in annotations: 
    188            result += ": " + formatannotation(annotations[arg]) 
    189        return result 
    190 
    191    specs = [] 
    192    if defaults: 
    193        firstdefault = len(args) - len(defaults) 
    194    else: 
    195        firstdefault = -1 
    196 
    197    for i, arg in enumerate(args): 
    198        spec = formatargandannotation(arg) 
    199        if defaults and i >= firstdefault: 
    200            spec = spec + formatvalue(defaults[i - firstdefault]) 
    201        specs.append(spec) 
    202 
    203    if varargs is not None: 
    204        specs.append(formatvarargs(formatargandannotation(varargs))) 
    205    else: 
    206        if kwonlyargs: 
    207            specs.append("*") 
    208 
    209    if kwonlyargs: 
    210        for kwonlyarg in kwonlyargs: 
    211            spec = formatargandannotation(kwonlyarg) 
    212            if kwonlydefaults and kwonlyarg in kwonlydefaults: 
    213                spec += formatvalue(kwonlydefaults[kwonlyarg]) 
    214            specs.append(spec) 
    215 
    216    if varkw is not None: 
    217        specs.append(formatvarkw(formatargandannotation(varkw))) 
    218 
    219    result = "(" + ", ".join(specs) + ")" 
    220    if "return" in annotations: 
    221        result += formatreturns(formatannotation(annotations["return"])) 
    222    return result 
    223 
    224 
    225def dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 
    226    """Return a sequence of all dataclasses.Field objects associated 
    227    with a class as an already processed dataclass. 
    228 
    229    The class must **already be a dataclass** for Field objects to be returned. 
    230 
    231    """ 
    232 
    233    if dataclasses.is_dataclass(cls): 
    234        return dataclasses.fields(cls) 
    235    else: 
    236        return [] 
    237 
    238 
    239def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: 
    240    """Return a sequence of all dataclasses.Field objects associated with 
    241    an already processed dataclass, excluding those that originate from a 
    242    superclass. 
    243 
    244    The class must **already be a dataclass** for Field objects to be returned. 
    245 
    246    """ 
    247 
    248    if dataclasses.is_dataclass(cls): 
    249        super_fields: Set[dataclasses.Field[Any]] = set() 
    250        for sup in cls.__bases__: 
    251            super_fields.update(dataclass_fields(sup)) 
    252        return [f for f in dataclasses.fields(cls) if f not in super_fields] 
    253    else: 
    254        return [] 
    255 
    256 
    257if freethreading: 
    258    import threading 
    259 
    260    mini_gil = threading.RLock() 
    261    """provide a threading.RLock() under python freethreading only""" 
    262else: 
    263    import contextlib 
    264 
    265    mini_gil = contextlib.nullcontext()  # type: ignore[assignment]