1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: sqlite+aiosqlite
12 :name: aiosqlite
13 :dbapi: aiosqlite
14 :connectstring: sqlite+aiosqlite:///file_path
15 :url: https://pypi.org/project/aiosqlite/
16
17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
18running on top of pysqlite.
19
20aiosqlite is a wrapper around pysqlite that uses a background thread for
21each connection. It does not actually use non-blocking IO, as SQLite
22databases are not socket-based. However it does provide a working asyncio
23interface that's useful for testing and prototyping purposes.
24
25Using a special asyncio mediation layer, the aiosqlite dialect is usable
26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
27extension package.
28
29This dialect should normally be used only with the
30:func:`_asyncio.create_async_engine` engine creation function::
31
32 from sqlalchemy.ext.asyncio import create_async_engine
33
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52A newly revised version of this important section is now available
53at the top level of the SQLAlchemy SQLite documentation, in the section
54:ref:`sqlite_transactions`.
55
56
57.. _aiosqlite_pooling:
58
59Pooling Behavior
60----------------
61
62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
63based on the kind of SQLite database that's requested:
64
65* When a ``:memory:`` SQLite database is specified, the dialect by default
66 will use :class:`.StaticPool`. This pool maintains a single
67 connection, so that all access to the engine
68 use the same ``:memory:`` database.
69* When a file-based database is specified, the dialect will use
70 :class:`.AsyncAdaptedQueuePool` as the source of connections.
71
72 .. versionchanged:: 2.0.38
73
74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
76 may be used by specifying it via the
77 :paramref:`_sa.create_engine.poolclass` parameter.
78
79""" # noqa
80from __future__ import annotations
81
82import asyncio
83from functools import partial
84from types import ModuleType
85from typing import Any
86from typing import cast
87from typing import NoReturn
88from typing import Optional
89from typing import TYPE_CHECKING
90from typing import Union
91
92from .base import SQLiteExecutionContext
93from .pysqlite import SQLiteDialect_pysqlite
94from ... import pool
95from ...connectors.asyncio import AsyncAdapt_dbapi_connection
96from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
97from ...connectors.asyncio import AsyncAdapt_dbapi_module
98from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
99from ...util.concurrency import await_
100
101if TYPE_CHECKING:
102 from ...connectors.asyncio import AsyncIODBAPIConnection
103 from ...engine.interfaces import DBAPIConnection
104 from ...engine.interfaces import DBAPICursor
105 from ...engine.interfaces import DBAPIModule
106 from ...engine.url import URL
107 from ...pool.base import PoolProxiedConnection
108
109
110class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor):
111 __slots__ = ()
112
113
114class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor):
115 __slots__ = ()
116
117
118class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection):
119 __slots__ = ()
120
121 _cursor_cls = AsyncAdapt_aiosqlite_cursor
122 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor
123
124 @property
125 def isolation_level(self) -> Optional[str]:
126 return cast(str, self._connection.isolation_level)
127
128 @isolation_level.setter
129 def isolation_level(self, value: Optional[str]) -> None:
130 # aiosqlite's isolation_level setter works outside the Thread
131 # that it's supposed to, necessitating setting check_same_thread=False.
132 # for improved stability, we instead invent our own awaitable version
133 # using aiosqlite's async queue directly.
134
135 def set_iso(
136 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
137 ) -> None:
138 connection.isolation_level = value
139
140 function = partial(set_iso, self._connection._conn, value)
141 future = asyncio.get_event_loop().create_future()
142
143 self._connection._tx.put_nowait((future, function))
144
145 try:
146 await_(future)
147 except Exception as error:
148 self._handle_exception(error)
149
150 def create_function(self, *args: Any, **kw: Any) -> None:
151 try:
152 await_(self._connection.create_function(*args, **kw))
153 except Exception as error:
154 self._handle_exception(error)
155
156 def rollback(self) -> None:
157 if self._connection._connection:
158 super().rollback()
159
160 def commit(self) -> None:
161 if self._connection._connection:
162 super().commit()
163
164 def close(self) -> None:
165 try:
166 await_(self._connection.close())
167 except ValueError:
168 # this is undocumented for aiosqlite, that ValueError
169 # was raised if .close() was called more than once, which is
170 # both not customary for DBAPI and is also not a DBAPI.Error
171 # exception. This is now fixed in aiosqlite via my PR
172 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
173 # assured this will not become some other kind of exception,
174 # since it doesn't raise anymore.
175
176 pass
177 except Exception as error:
178 self._handle_exception(error)
179
180 def _handle_exception(self, error: Exception) -> NoReturn:
181 if isinstance(error, ValueError) and error.args[0].lower() in (
182 "no active connection",
183 "connection closed",
184 ):
185 raise self.dbapi.sqlite.OperationalError(error.args[0]) from error
186 else:
187 super()._handle_exception(error)
188
189
190class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
191 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
192 self.aiosqlite = aiosqlite
193 self.sqlite = sqlite
194 self.paramstyle = "qmark"
195 self._init_dbapi_attributes()
196
197 def _init_dbapi_attributes(self) -> None:
198 for name in (
199 "DatabaseError",
200 "Error",
201 "IntegrityError",
202 "NotSupportedError",
203 "OperationalError",
204 "ProgrammingError",
205 "sqlite_version",
206 "sqlite_version_info",
207 ):
208 setattr(self, name, getattr(self.aiosqlite, name))
209
210 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
211 setattr(self, name, getattr(self.sqlite, name))
212
213 for name in ("Binary",):
214 setattr(self, name, getattr(self.sqlite, name))
215
216 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
217 creator_fn = kw.pop("async_creator_fn", None)
218 if creator_fn:
219 connection = creator_fn(*arg, **kw)
220 else:
221 connection = self.aiosqlite.connect(*arg, **kw)
222 # it's a Thread. you'll thank us later
223 connection.daemon = True
224
225 return AsyncAdapt_aiosqlite_connection(
226 self,
227 await_(connection),
228 )
229
230
231class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
232 def create_server_side_cursor(self) -> DBAPICursor:
233 return self._dbapi_connection.cursor(server_side=True)
234
235
236class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
237 driver = "aiosqlite"
238 supports_statement_cache = True
239
240 is_async = True
241
242 supports_server_side_cursors = True
243
244 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
245
246 @classmethod
247 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
248 return AsyncAdapt_aiosqlite_dbapi(
249 __import__("aiosqlite"), __import__("sqlite3")
250 )
251
252 @classmethod
253 def get_pool_class(cls, url: URL) -> type[pool.Pool]:
254 if cls._is_url_file_db(url):
255 return pool.AsyncAdaptedQueuePool
256 else:
257 return pool.StaticPool
258
259 def is_disconnect(
260 self,
261 e: DBAPIModule.Error,
262 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
263 cursor: Optional[DBAPICursor],
264 ) -> bool:
265 self.dbapi = cast("DBAPIModule", self.dbapi)
266 if isinstance(e, self.dbapi.OperationalError):
267 err_lower = str(e).lower()
268 if (
269 "no active connection" in err_lower
270 or "connection closed" in err_lower
271 ):
272 return True
273
274 return super().is_disconnect(e, connection, cursor)
275
276 def get_driver_connection(
277 self, connection: DBAPIConnection
278 ) -> AsyncIODBAPIConnection:
279 return connection._connection # type: ignore[no-any-return]
280
281
282dialect = SQLiteDialect_aiosqlite