1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: sqlite+aiosqlite
12 :name: aiosqlite
13 :dbapi: aiosqlite
14 :connectstring: sqlite+aiosqlite:///file_path
15 :url: https://pypi.org/project/aiosqlite/
16
17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
18running on top of pysqlite.
19
20aiosqlite is a wrapper around pysqlite that uses a background thread for
21each connection. It does not actually use non-blocking IO, as SQLite
22databases are not socket-based. However it does provide a working asyncio
23interface that's useful for testing and prototyping purposes.
24
25Using a special asyncio mediation layer, the aiosqlite dialect is usable
26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
27extension package.
28
29This dialect should normally be used only with the
30:func:`_asyncio.create_async_engine` engine creation function::
31
32 from sqlalchemy.ext.asyncio import create_async_engine
33
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52A newly revised version of this important section is now available
53at the top level of the SQLAlchemy SQLite documentation, in the section
54:ref:`sqlite_transactions`.
55
56
57.. _aiosqlite_pooling:
58
59Pooling Behavior
60----------------
61
62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
63based on the kind of SQLite database that's requested:
64
65* When a ``:memory:`` SQLite database is specified, the dialect by default
66 will use :class:`.StaticPool`. This pool maintains a single
67 connection, so that all access to the engine
68 use the same ``:memory:`` database.
69* When a file-based database is specified, the dialect will use
70 :class:`.AsyncAdaptedQueuePool` as the source of connections.
71
72 .. versionchanged:: 2.0.38
73
74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
76 may be used by specifying it via the
77 :paramref:`_sa.create_engine.poolclass` parameter.
78
79""" # noqa
80from __future__ import annotations
81
82import asyncio
83from functools import partial
84from types import ModuleType
85from typing import Any
86from typing import cast
87from typing import NoReturn
88from typing import Optional
89from typing import TYPE_CHECKING
90from typing import Union
91
92from .base import SQLiteExecutionContext
93from .pysqlite import SQLiteDialect_pysqlite
94from ... import pool
95from ...connectors.asyncio import AsyncAdapt_dbapi_connection
96from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
97from ...connectors.asyncio import AsyncAdapt_dbapi_module
98from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
99from ...util.concurrency import await_
100
101if TYPE_CHECKING:
102 from ...connectors.asyncio import AsyncIODBAPIConnection
103 from ...engine.interfaces import DBAPIConnection
104 from ...engine.interfaces import DBAPICursor
105 from ...engine.interfaces import DBAPIModule
106 from ...engine.url import URL
107 from ...pool.base import PoolProxiedConnection
108
109
110class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor):
111 __slots__ = ()
112
113
114class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor):
115 __slots__ = ()
116
117
118class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection):
119 __slots__ = ()
120
121 _cursor_cls = AsyncAdapt_aiosqlite_cursor
122 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor
123
124 @property
125 def isolation_level(self) -> Optional[str]:
126 return cast(str, self._connection.isolation_level)
127
128 @isolation_level.setter
129 def isolation_level(self, value: Optional[str]) -> None:
130 # aiosqlite's isolation_level setter works outside the Thread
131 # that it's supposed to, necessitating setting check_same_thread=False.
132 # for improved stability, we instead invent our own awaitable version
133 # using aiosqlite's async queue directly.
134
135 def set_iso(
136 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
137 ) -> None:
138 connection.isolation_level = value
139
140 function = partial(set_iso, self._connection._conn, value)
141 future = asyncio.get_event_loop().create_future()
142
143 self._connection._tx.put_nowait((future, function))
144
145 try:
146 await_(future)
147 except Exception as error:
148 self._handle_exception(error)
149
150 def create_function(self, *args: Any, **kw: Any) -> None:
151 try:
152 await_(self._connection.create_function(*args, **kw))
153 except Exception as error:
154 self._handle_exception(error)
155
156 def rollback(self) -> None:
157 if self._connection._connection:
158 super().rollback()
159
160 def commit(self) -> None:
161 if self._connection._connection:
162 super().commit()
163
164 def close(self) -> None:
165 try:
166 await_(self._connection.close())
167 except ValueError:
168 # this is undocumented for aiosqlite, that ValueError
169 # was raised if .close() was called more than once, which is
170 # both not customary for DBAPI and is also not a DBAPI.Error
171 # exception. This is now fixed in aiosqlite via my PR
172 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
173 # assured this will not become some other kind of exception,
174 # since it doesn't raise anymore.
175
176 pass
177 except Exception as error:
178 self._handle_exception(error)
179
180 @classmethod
181 def _handle_exception_no_connection(
182 cls, dbapi: Any, error: Exception
183 ) -> NoReturn:
184 if isinstance(error, ValueError) and error.args[0].lower() in (
185 "no active connection",
186 "connection closed",
187 ):
188 raise dbapi.sqlite.OperationalError(error.args[0]) from error
189 else:
190 super()._handle_exception_no_connection(dbapi, error)
191
192
193class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
194 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
195 super().__init__(aiosqlite, dbapi_module=sqlite)
196 self.aiosqlite = aiosqlite
197 self.sqlite = sqlite
198 self.paramstyle = "qmark"
199 self._init_dbapi_attributes()
200
201 def _init_dbapi_attributes(self) -> None:
202 for name in (
203 "DatabaseError",
204 "Error",
205 "IntegrityError",
206 "NotSupportedError",
207 "OperationalError",
208 "ProgrammingError",
209 "sqlite_version",
210 "sqlite_version_info",
211 ):
212 setattr(self, name, getattr(self.aiosqlite, name))
213
214 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
215 setattr(self, name, getattr(self.sqlite, name))
216
217 for name in ("Binary",):
218 setattr(self, name, getattr(self.sqlite, name))
219
220 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
221 creator_fn = kw.pop("async_creator_fn", None)
222 if creator_fn:
223 connection = creator_fn(*arg, **kw)
224 else:
225 connection = self.aiosqlite.connect(*arg, **kw)
226 # it's a Thread. you'll thank us later
227 connection.daemon = True
228
229 return AsyncAdapt_aiosqlite_connection(
230 self,
231 await_(connection),
232 )
233
234
235class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
236 def create_server_side_cursor(self) -> DBAPICursor:
237 return self._dbapi_connection.cursor(server_side=True)
238
239
240class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
241 driver = "aiosqlite"
242 supports_statement_cache = True
243
244 is_async = True
245
246 supports_server_side_cursors = True
247
248 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
249
250 @classmethod
251 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
252 return AsyncAdapt_aiosqlite_dbapi(
253 __import__("aiosqlite"), __import__("sqlite3")
254 )
255
256 @classmethod
257 def get_pool_class(cls, url: URL) -> type[pool.Pool]:
258 if cls._is_url_file_db(url):
259 return pool.AsyncAdaptedQueuePool
260 else:
261 return pool.StaticPool
262
263 def is_disconnect(
264 self,
265 e: DBAPIModule.Error,
266 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
267 cursor: Optional[DBAPICursor],
268 ) -> bool:
269 self.dbapi = cast("DBAPIModule", self.dbapi)
270 if isinstance(e, self.dbapi.OperationalError):
271 err_lower = str(e).lower()
272 if (
273 "no active connection" in err_lower
274 or "connection closed" in err_lower
275 ):
276 return True
277
278 return super().is_disconnect(e, connection, cursor)
279
280 def get_driver_connection(
281 self, connection: DBAPIConnection
282 ) -> AsyncIODBAPIConnection:
283 return connection._connection # type: ignore[no-any-return]
284
285
286dialect = SQLiteDialect_aiosqlite