1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: sqlite+aiosqlite
12 :name: aiosqlite
13 :dbapi: aiosqlite
14 :connectstring: sqlite+aiosqlite:///file_path
15 :url: https://pypi.org/project/aiosqlite/
16
17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
18running on top of pysqlite.
19
20aiosqlite is a wrapper around pysqlite that uses a background thread for
21each connection. It does not actually use non-blocking IO, as SQLite
22databases are not socket-based. However it does provide a working asyncio
23interface that's useful for testing and prototyping purposes.
24
25Using a special asyncio mediation layer, the aiosqlite dialect is usable
26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
27extension package.
28
29This dialect should normally be used only with the
30:func:`_asyncio.create_async_engine` engine creation function::
31
32 from sqlalchemy.ext.asyncio import create_async_engine
33
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52A newly revised version of this important section is now available
53at the top level of the SQLAlchemy SQLite documentation, in the section
54:ref:`sqlite_transactions`.
55
56
57.. _aiosqlite_pooling:
58
59Pooling Behavior
60----------------
61
62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
63based on the kind of SQLite database that's requested:
64
65* When a ``:memory:`` SQLite database is specified, the dialect by default
66 will use :class:`.StaticPool`. This pool maintains a single
67 connection, so that all access to the engine
68 use the same ``:memory:`` database.
69* When a file-based database is specified, the dialect will use
70 :class:`.AsyncAdaptedQueuePool` as the source of connections.
71
72 .. versionchanged:: 2.0.38
73
74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
76 may be used by specifying it via the
77 :paramref:`_sa.create_engine.poolclass` parameter.
78
79""" # noqa
80
81from __future__ import annotations
82
83import asyncio
84from functools import partial
85from threading import Thread
86from types import ModuleType
87from typing import Any
88from typing import cast
89from typing import NoReturn
90from typing import Optional
91from typing import TYPE_CHECKING
92from typing import Union
93
94from .base import SQLiteExecutionContext
95from .pysqlite import SQLiteDialect_pysqlite
96from ... import pool
97from ...connectors.asyncio import AsyncAdapt_dbapi_connection
98from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
99from ...connectors.asyncio import AsyncAdapt_dbapi_module
100from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
101from ...connectors.asyncio import AsyncAdapt_terminate
102from ...util.concurrency import await_
103
104if TYPE_CHECKING:
105 from ...connectors.asyncio import AsyncIODBAPIConnection
106 from ...engine.interfaces import DBAPIConnection
107 from ...engine.interfaces import DBAPICursor
108 from ...engine.interfaces import DBAPIModule
109 from ...engine.url import URL
110 from ...pool.base import PoolProxiedConnection
111
112
113class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor):
114 __slots__ = ()
115
116
117class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor):
118 __slots__ = ()
119
120
121class AsyncAdapt_aiosqlite_connection(
122 AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
123):
124 __slots__ = ()
125
126 _cursor_cls = AsyncAdapt_aiosqlite_cursor
127 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor
128
129 @property
130 def isolation_level(self) -> Optional[str]:
131 return cast(str, self._connection.isolation_level)
132
133 @isolation_level.setter
134 def isolation_level(self, value: Optional[str]) -> None:
135 # aiosqlite's isolation_level setter works outside the Thread
136 # that it's supposed to, necessitating setting check_same_thread=False.
137 # for improved stability, we instead invent our own awaitable version
138 # using aiosqlite's async queue directly.
139
140 def set_iso(
141 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
142 ) -> None:
143 connection.isolation_level = value
144
145 function = partial(set_iso, self._connection._conn, value)
146 future = asyncio.get_event_loop().create_future()
147
148 self._connection._tx.put_nowait((future, function))
149
150 try:
151 await_(future)
152 except Exception as error:
153 self._handle_exception(error)
154
155 def create_function(self, *args: Any, **kw: Any) -> None:
156 try:
157 await_(self._connection.create_function(*args, **kw))
158 except Exception as error:
159 self._handle_exception(error)
160
161 def rollback(self) -> None:
162 if self._connection._connection:
163 super().rollback()
164
165 def commit(self) -> None:
166 if self._connection._connection:
167 super().commit()
168
169 def close(self) -> None:
170 try:
171 await_(self._connection.close())
172 except ValueError:
173 # this is undocumented for aiosqlite, that ValueError
174 # was raised if .close() was called more than once, which is
175 # both not customary for DBAPI and is also not a DBAPI.Error
176 # exception. This is now fixed in aiosqlite via my PR
177 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
178 # assured this will not become some other kind of exception,
179 # since it doesn't raise anymore.
180
181 pass
182 except Exception as error:
183 self._handle_exception(error)
184
185 @classmethod
186 def _handle_exception_no_connection(
187 cls, dbapi: Any, error: Exception
188 ) -> NoReturn:
189 if isinstance(error, ValueError) and error.args[0].lower() in (
190 "no active connection",
191 "connection closed",
192 ):
193 raise dbapi.sqlite.OperationalError(error.args[0]) from error
194 else:
195 super()._handle_exception_no_connection(dbapi, error)
196
197 async def _terminate_graceful_close(self) -> None:
198 """Try to close connection gracefully"""
199 await self._connection.close()
200
201 def _terminate_force_close(self) -> None:
202 """Terminate the connection"""
203
204 # this was added in aiosqlite 0.22.1. if stop() is not present,
205 # the dialect should indicate has_terminate=False
206 try:
207 meth = self._connection.stop
208 except AttributeError as ae:
209 raise NotImplementedError(
210 "terminate_force_close() not implemented by this DBAPI shim"
211 ) from ae
212 else:
213 meth()
214
215
216class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
217 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
218 super().__init__(aiosqlite, dbapi_module=sqlite)
219 self.aiosqlite = aiosqlite
220 self.sqlite = sqlite
221 self.paramstyle = "qmark"
222 self.has_stop = hasattr(aiosqlite.Connection, "stop")
223 self._init_dbapi_attributes()
224
225 def _init_dbapi_attributes(self) -> None:
226 for name in (
227 "DatabaseError",
228 "Error",
229 "IntegrityError",
230 "NotSupportedError",
231 "OperationalError",
232 "ProgrammingError",
233 "sqlite_version",
234 "sqlite_version_info",
235 ):
236 setattr(self, name, getattr(self.aiosqlite, name))
237
238 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
239 setattr(self, name, getattr(self.sqlite, name))
240
241 for name in ("Binary",):
242 setattr(self, name, getattr(self.sqlite, name))
243
244 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
245 creator_fn = kw.pop("async_creator_fn", None)
246 if creator_fn:
247 connection = creator_fn(*arg, **kw)
248 else:
249 connection = self.aiosqlite.connect(*arg, **kw)
250
251 # aiosqlite uses a Thread. you'll thank us later
252 if isinstance(connection, Thread):
253 # Connection itself was a thread in version prior to 0.22
254 connection.daemon = True
255 else:
256 # in 0.22+ instead it contains a thread.
257 connection._thread.daemon = True
258
259 return AsyncAdapt_aiosqlite_connection(self, await_(connection))
260
261
262class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
263 def create_server_side_cursor(self) -> DBAPICursor:
264 return self._dbapi_connection.cursor(server_side=True)
265
266
267class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
268 driver = "aiosqlite"
269 supports_statement_cache = True
270
271 is_async = True
272 has_terminate = True
273
274 supports_server_side_cursors = True
275
276 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
277
278 def __init__(self, **kwargs: Any):
279 super().__init__(**kwargs)
280 if self.dbapi and not self.dbapi.has_stop:
281 self.has_terminate = False
282
283 @classmethod
284 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
285 return AsyncAdapt_aiosqlite_dbapi(
286 __import__("aiosqlite"), __import__("sqlite3")
287 )
288
289 @classmethod
290 def get_pool_class(cls, url: URL) -> type[pool.Pool]:
291 if cls._is_url_file_db(url):
292 return pool.AsyncAdaptedQueuePool
293 else:
294 return pool.StaticPool
295
296 def is_disconnect(
297 self,
298 e: DBAPIModule.Error,
299 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
300 cursor: Optional[DBAPICursor],
301 ) -> bool:
302 self.dbapi = cast("DBAPIModule", self.dbapi)
303 if isinstance(e, self.dbapi.OperationalError):
304 err_lower = str(e).lower()
305 if (
306 "no active connection" in err_lower
307 or "connection closed" in err_lower
308 ):
309 return True
310
311 return super().is_disconnect(e, connection, cursor)
312
313 def get_driver_connection(
314 self, connection: DBAPIConnection
315 ) -> AsyncIODBAPIConnection:
316 return connection._connection # type: ignore[no-any-return]
317
318 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
319 dbapi_connection.terminate()
320
321
322dialect = SQLiteDialect_aiosqlite