1# dialects/sqlite/aiosqlite.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8 
    9r""" 
    10 
    11.. dialect:: sqlite+aiosqlite 
    12    :name: aiosqlite 
    13    :dbapi: aiosqlite 
    14    :connectstring: sqlite+aiosqlite:///file_path 
    15    :url: https://pypi.org/project/aiosqlite/ 
    16 
    17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 
    18running on top of pysqlite. 
    19 
    20aiosqlite is a wrapper around pysqlite that uses a background thread for 
    21each connection.   It does not actually use non-blocking IO, as SQLite 
    22databases are not socket-based.  However it does provide a working asyncio 
    23interface that's useful for testing and prototyping purposes. 
    24 
    25Using a special asyncio mediation layer, the aiosqlite dialect is usable 
    26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 
    27extension package. 
    28 
    29This dialect should normally be used only with the 
    30:func:`_asyncio.create_async_engine` engine creation function:: 
    31 
    32    from sqlalchemy.ext.asyncio import create_async_engine 
    33 
    34    engine = create_async_engine("sqlite+aiosqlite:///filename") 
    35 
    36The URL passes through all arguments to the ``pysqlite`` driver, so all 
    37connection arguments are the same as they are for that of :ref:`pysqlite`. 
    38 
    39.. _aiosqlite_udfs: 
    40 
    41User-Defined Functions 
    42---------------------- 
    43 
    44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 
    45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 
    46 
    47.. _aiosqlite_serializable: 
    48 
    49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 
    50------------------------------------------------------------------------- 
    51 
    52A newly revised version of this important section is now available 
    53at the top level of the SQLAlchemy SQLite documentation, in the section 
    54:ref:`sqlite_transactions`. 
    55 
    56 
    57.. _aiosqlite_pooling: 
    58 
    59Pooling Behavior 
    60---------------- 
    61 
    62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 
    63based on the kind of SQLite database that's requested: 
    64 
    65* When a ``:memory:`` SQLite database is specified, the dialect by default 
    66  will use :class:`.StaticPool`. This pool maintains a single 
    67  connection, so that all access to the engine 
    68  use the same ``:memory:`` database. 
    69* When a file-based database is specified, the dialect will use 
    70  :class:`.AsyncAdaptedQueuePool` as the source of connections. 
    71 
    72  .. versionchanged:: 2.0.38 
    73 
    74    SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 
    75    Previously, :class:`.NullPool` were used.  The :class:`.NullPool` class 
    76    may be used by specifying it via the 
    77    :paramref:`_sa.create_engine.poolclass` parameter. 
    78 
    79"""  # noqa 
    80from __future__ import annotations 
    81 
    82import asyncio 
    83from functools import partial 
    84from types import ModuleType 
    85from typing import Any 
    86from typing import cast 
    87from typing import NoReturn 
    88from typing import Optional 
    89from typing import TYPE_CHECKING 
    90from typing import Union 
    91 
    92from .base import SQLiteExecutionContext 
    93from .pysqlite import SQLiteDialect_pysqlite 
    94from ... import pool 
    95from ...connectors.asyncio import AsyncAdapt_dbapi_connection 
    96from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 
    97from ...connectors.asyncio import AsyncAdapt_dbapi_module 
    98from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 
    99from ...util.concurrency import await_ 
    100 
    101if TYPE_CHECKING: 
    102    from ...connectors.asyncio import AsyncIODBAPIConnection 
    103    from ...engine.interfaces import DBAPIConnection 
    104    from ...engine.interfaces import DBAPICursor 
    105    from ...engine.interfaces import DBAPIModule 
    106    from ...engine.url import URL 
    107    from ...pool.base import PoolProxiedConnection 
    108 
    109 
    110class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor): 
    111    __slots__ = () 
    112 
    113 
    114class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor): 
    115    __slots__ = () 
    116 
    117 
    118class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection): 
    119    __slots__ = () 
    120 
    121    _cursor_cls = AsyncAdapt_aiosqlite_cursor 
    122    _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor 
    123 
    124    @property 
    125    def isolation_level(self) -> Optional[str]: 
    126        return cast(str, self._connection.isolation_level) 
    127 
    128    @isolation_level.setter 
    129    def isolation_level(self, value: Optional[str]) -> None: 
    130        # aiosqlite's isolation_level setter works outside the Thread 
    131        # that it's supposed to, necessitating setting check_same_thread=False. 
    132        # for improved stability, we instead invent our own awaitable version 
    133        # using aiosqlite's async queue directly. 
    134 
    135        def set_iso( 
    136            connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 
    137        ) -> None: 
    138            connection.isolation_level = value 
    139 
    140        function = partial(set_iso, self._connection._conn, value) 
    141        future = asyncio.get_event_loop().create_future() 
    142 
    143        self._connection._tx.put_nowait((future, function)) 
    144 
    145        try: 
    146            await_(future) 
    147        except Exception as error: 
    148            self._handle_exception(error) 
    149 
    150    def create_function(self, *args: Any, **kw: Any) -> None: 
    151        try: 
    152            await_(self._connection.create_function(*args, **kw)) 
    153        except Exception as error: 
    154            self._handle_exception(error) 
    155 
    156    def rollback(self) -> None: 
    157        if self._connection._connection: 
    158            super().rollback() 
    159 
    160    def commit(self) -> None: 
    161        if self._connection._connection: 
    162            super().commit() 
    163 
    164    def close(self) -> None: 
    165        try: 
    166            await_(self._connection.close()) 
    167        except ValueError: 
    168            # this is undocumented for aiosqlite, that ValueError 
    169            # was raised if .close() was called more than once, which is 
    170            # both not customary for DBAPI and is also not a DBAPI.Error 
    171            # exception. This is now fixed in aiosqlite via my PR 
    172            # https://github.com/omnilib/aiosqlite/pull/238, so we can be 
    173            # assured this will not become some other kind of exception, 
    174            # since it doesn't raise anymore. 
    175 
    176            pass 
    177        except Exception as error: 
    178            self._handle_exception(error) 
    179 
    180    @classmethod 
    181    def _handle_exception_no_connection( 
    182        cls, dbapi: Any, error: Exception 
    183    ) -> NoReturn: 
    184        if isinstance(error, ValueError) and error.args[0].lower() in ( 
    185            "no active connection", 
    186            "connection closed", 
    187        ): 
    188            raise dbapi.sqlite.OperationalError(error.args[0]) from error 
    189        else: 
    190            super()._handle_exception_no_connection(dbapi, error) 
    191 
    192 
    193class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 
    194    def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 
    195        super().__init__(aiosqlite, dbapi_module=sqlite) 
    196        self.aiosqlite = aiosqlite 
    197        self.sqlite = sqlite 
    198        self.paramstyle = "qmark" 
    199        self._init_dbapi_attributes() 
    200 
    201    def _init_dbapi_attributes(self) -> None: 
    202        for name in ( 
    203            "DatabaseError", 
    204            "Error", 
    205            "IntegrityError", 
    206            "NotSupportedError", 
    207            "OperationalError", 
    208            "ProgrammingError", 
    209            "sqlite_version", 
    210            "sqlite_version_info", 
    211        ): 
    212            setattr(self, name, getattr(self.aiosqlite, name)) 
    213 
    214        for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 
    215            setattr(self, name, getattr(self.sqlite, name)) 
    216 
    217        for name in ("Binary",): 
    218            setattr(self, name, getattr(self.sqlite, name)) 
    219 
    220    def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 
    221        creator_fn = kw.pop("async_creator_fn", None) 
    222        if creator_fn: 
    223            connection = creator_fn(*arg, **kw) 
    224        else: 
    225            connection = self.aiosqlite.connect(*arg, **kw) 
    226            # it's a Thread.   you'll thank us later 
    227            connection.daemon = True 
    228 
    229        return AsyncAdapt_aiosqlite_connection( 
    230            self, 
    231            await_(connection), 
    232        ) 
    233 
    234 
    235class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 
    236    def create_server_side_cursor(self) -> DBAPICursor: 
    237        return self._dbapi_connection.cursor(server_side=True) 
    238 
    239 
    240class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 
    241    driver = "aiosqlite" 
    242    supports_statement_cache = True 
    243 
    244    is_async = True 
    245 
    246    supports_server_side_cursors = True 
    247 
    248    execution_ctx_cls = SQLiteExecutionContext_aiosqlite 
    249 
    250    @classmethod 
    251    def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 
    252        return AsyncAdapt_aiosqlite_dbapi( 
    253            __import__("aiosqlite"), __import__("sqlite3") 
    254        ) 
    255 
    256    @classmethod 
    257    def get_pool_class(cls, url: URL) -> type[pool.Pool]: 
    258        if cls._is_url_file_db(url): 
    259            return pool.AsyncAdaptedQueuePool 
    260        else: 
    261            return pool.StaticPool 
    262 
    263    def is_disconnect( 
    264        self, 
    265        e: DBAPIModule.Error, 
    266        connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 
    267        cursor: Optional[DBAPICursor], 
    268    ) -> bool: 
    269        self.dbapi = cast("DBAPIModule", self.dbapi) 
    270        if isinstance(e, self.dbapi.OperationalError): 
    271            err_lower = str(e).lower() 
    272            if ( 
    273                "no active connection" in err_lower 
    274                or "connection closed" in err_lower 
    275            ): 
    276                return True 
    277 
    278        return super().is_disconnect(e, connection, cursor) 
    279 
    280    def get_driver_connection( 
    281        self, connection: DBAPIConnection 
    282    ) -> AsyncIODBAPIConnection: 
    283        return connection._connection  # type: ignore[no-any-return] 
    284 
    285 
    286dialect = SQLiteDialect_aiosqlite