1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: sqlite+aiosqlite
12 :name: aiosqlite
13 :dbapi: aiosqlite
14 :connectstring: sqlite+aiosqlite:///file_path
15 :url: https://pypi.org/project/aiosqlite/
16
17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
18running on top of pysqlite.
19
20aiosqlite is a wrapper around pysqlite that uses a background thread for
21each connection. It does not actually use non-blocking IO, as SQLite
22databases are not socket-based. However it does provide a working asyncio
23interface that's useful for testing and prototyping purposes.
24
25Using a special asyncio mediation layer, the aiosqlite dialect is usable
26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
27extension package.
28
29This dialect should normally be used only with the
30:func:`_asyncio.create_async_engine` engine creation function::
31
32 from sqlalchemy.ext.asyncio import create_async_engine
33
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52A newly revised version of this important section is now available
53at the top level of the SQLAlchemy SQLite documentation, in the section
54:ref:`sqlite_transactions`.
55
56
57.. _aiosqlite_pooling:
58
59Pooling Behavior
60----------------
61
62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
63based on the kind of SQLite database that's requested:
64
65* When a ``:memory:`` SQLite database is specified, the dialect by default
66 will use :class:`.StaticPool`. This pool maintains a single
67 connection, so that all access to the engine
68 use the same ``:memory:`` database.
69* When a file-based database is specified, the dialect will use
70 :class:`.AsyncAdaptedQueuePool` as the source of connections.
71
72 .. versionchanged:: 2.0.38
73
74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
76 may be used by specifying it via the
77 :paramref:`_sa.create_engine.poolclass` parameter.
78
79""" # noqa
80from __future__ import annotations
81
82import asyncio
83from functools import partial
84from threading import Thread
85from types import ModuleType
86from typing import Any
87from typing import cast
88from typing import NoReturn
89from typing import Optional
90from typing import TYPE_CHECKING
91from typing import Union
92
93from .base import SQLiteExecutionContext
94from .pysqlite import SQLiteDialect_pysqlite
95from ... import pool
96from ...connectors.asyncio import AsyncAdapt_dbapi_connection
97from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
98from ...connectors.asyncio import AsyncAdapt_dbapi_module
99from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
100from ...connectors.asyncio import AsyncAdapt_terminate
101from ...util.concurrency import await_
102
103if TYPE_CHECKING:
104 from ...connectors.asyncio import AsyncIODBAPIConnection
105 from ...engine.interfaces import DBAPIConnection
106 from ...engine.interfaces import DBAPICursor
107 from ...engine.interfaces import DBAPIModule
108 from ...engine.url import URL
109 from ...pool.base import PoolProxiedConnection
110
111
112class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor):
113 __slots__ = ()
114
115
116class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor):
117 __slots__ = ()
118
119
120class AsyncAdapt_aiosqlite_connection(
121 AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
122):
123 __slots__ = ()
124
125 _cursor_cls = AsyncAdapt_aiosqlite_cursor
126 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor
127
128 @property
129 def isolation_level(self) -> Optional[str]:
130 return cast(str, self._connection.isolation_level)
131
132 @isolation_level.setter
133 def isolation_level(self, value: Optional[str]) -> None:
134 # aiosqlite's isolation_level setter works outside the Thread
135 # that it's supposed to, necessitating setting check_same_thread=False.
136 # for improved stability, we instead invent our own awaitable version
137 # using aiosqlite's async queue directly.
138
139 def set_iso(
140 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
141 ) -> None:
142 connection.isolation_level = value
143
144 function = partial(set_iso, self._connection._conn, value)
145 future = asyncio.get_event_loop().create_future()
146
147 self._connection._tx.put_nowait((future, function))
148
149 try:
150 await_(future)
151 except Exception as error:
152 self._handle_exception(error)
153
154 def create_function(self, *args: Any, **kw: Any) -> None:
155 try:
156 await_(self._connection.create_function(*args, **kw))
157 except Exception as error:
158 self._handle_exception(error)
159
160 def rollback(self) -> None:
161 if self._connection._connection:
162 super().rollback()
163
164 def commit(self) -> None:
165 if self._connection._connection:
166 super().commit()
167
168 def close(self) -> None:
169 try:
170 await_(self._connection.close())
171 except ValueError:
172 # this is undocumented for aiosqlite, that ValueError
173 # was raised if .close() was called more than once, which is
174 # both not customary for DBAPI and is also not a DBAPI.Error
175 # exception. This is now fixed in aiosqlite via my PR
176 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
177 # assured this will not become some other kind of exception,
178 # since it doesn't raise anymore.
179
180 pass
181 except Exception as error:
182 self._handle_exception(error)
183
184 @classmethod
185 def _handle_exception_no_connection(
186 cls, dbapi: Any, error: Exception
187 ) -> NoReturn:
188 if isinstance(error, ValueError) and error.args[0].lower() in (
189 "no active connection",
190 "connection closed",
191 ):
192 raise dbapi.sqlite.OperationalError(error.args[0]) from error
193 else:
194 super()._handle_exception_no_connection(dbapi, error)
195
196 async def _terminate_graceful_close(self) -> None:
197 """Try to close connection gracefully"""
198 await self._connection.close()
199
200 def _terminate_force_close(self) -> None:
201 """Terminate the connection"""
202
203 # this was added in aiosqlite 0.22.1. if stop() is not present,
204 # the dialect should indicate has_terminate=False
205 try:
206 meth = self._connection.stop
207 except AttributeError as ae:
208 raise NotImplementedError(
209 "terminate_force_close() not implemented by this DBAPI shim"
210 ) from ae
211 else:
212 meth()
213
214
215class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
216 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
217 super().__init__(aiosqlite, dbapi_module=sqlite)
218 self.aiosqlite = aiosqlite
219 self.sqlite = sqlite
220 self.paramstyle = "qmark"
221 self.has_stop = hasattr(aiosqlite.Connection, "stop")
222 self._init_dbapi_attributes()
223
224 def _init_dbapi_attributes(self) -> None:
225 for name in (
226 "DatabaseError",
227 "Error",
228 "IntegrityError",
229 "NotSupportedError",
230 "OperationalError",
231 "ProgrammingError",
232 "sqlite_version",
233 "sqlite_version_info",
234 ):
235 setattr(self, name, getattr(self.aiosqlite, name))
236
237 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
238 setattr(self, name, getattr(self.sqlite, name))
239
240 for name in ("Binary",):
241 setattr(self, name, getattr(self.sqlite, name))
242
243 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
244 creator_fn = kw.pop("async_creator_fn", None)
245 if creator_fn:
246 connection = creator_fn(*arg, **kw)
247 else:
248 connection = self.aiosqlite.connect(*arg, **kw)
249
250 # aiosqlite uses a Thread. you'll thank us later
251 if isinstance(connection, Thread):
252 # Connection itself was a thread in version prior to 0.22
253 connection.daemon = True
254 else:
255 # in 0.22+ instead it contains a thread.
256 connection._thread.daemon = True
257
258 return AsyncAdapt_aiosqlite_connection(self, await_(connection))
259
260
261class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
262 def create_server_side_cursor(self) -> DBAPICursor:
263 return self._dbapi_connection.cursor(server_side=True)
264
265
266class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
267 driver = "aiosqlite"
268 supports_statement_cache = True
269
270 is_async = True
271 has_terminate = True
272
273 supports_server_side_cursors = True
274
275 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
276
277 def __init__(self, **kwargs: Any):
278 super().__init__(**kwargs)
279 if self.dbapi and not self.dbapi.has_stop:
280 self.has_terminate = False
281
282 @classmethod
283 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
284 return AsyncAdapt_aiosqlite_dbapi(
285 __import__("aiosqlite"), __import__("sqlite3")
286 )
287
288 @classmethod
289 def get_pool_class(cls, url: URL) -> type[pool.Pool]:
290 if cls._is_url_file_db(url):
291 return pool.AsyncAdaptedQueuePool
292 else:
293 return pool.StaticPool
294
295 def is_disconnect(
296 self,
297 e: DBAPIModule.Error,
298 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
299 cursor: Optional[DBAPICursor],
300 ) -> bool:
301 self.dbapi = cast("DBAPIModule", self.dbapi)
302 if isinstance(e, self.dbapi.OperationalError):
303 err_lower = str(e).lower()
304 if (
305 "no active connection" in err_lower
306 or "connection closed" in err_lower
307 ):
308 return True
309
310 return super().is_disconnect(e, connection, cursor)
311
312 def get_driver_connection(
313 self, connection: DBAPIConnection
314 ) -> AsyncIODBAPIConnection:
315 return connection._connection # type: ignore[no-any-return]
316
317 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
318 dbapi_connection.terminate()
319
320
321dialect = SQLiteDialect_aiosqlite