1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: sqlite+aiosqlite
12 :name: aiosqlite
13 :dbapi: aiosqlite
14 :connectstring: sqlite+aiosqlite:///file_path
15 :url: https://pypi.org/project/aiosqlite/
16
17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
18running on top of pysqlite.
19
20aiosqlite is a wrapper around pysqlite that uses a background thread for
21each connection. It does not actually use non-blocking IO, as SQLite
22databases are not socket-based. However it does provide a working asyncio
23interface that's useful for testing and prototyping purposes.
24
25Using a special asyncio mediation layer, the aiosqlite dialect is usable
26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
27extension package.
28
29This dialect should normally be used only with the
30:func:`_asyncio.create_async_engine` engine creation function::
31
32 from sqlalchemy.ext.asyncio import create_async_engine
33
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52A newly revised version of this important section is now available
53at the top level of the SQLAlchemy SQLite documentation, in the section
54:ref:`sqlite_transactions`.
55
56
57.. _aiosqlite_pooling:
58
59Pooling Behavior
60----------------
61
62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
63based on the kind of SQLite database that's requested:
64
65* When a ``:memory:`` SQLite database is specified, the dialect by default
66 will use :class:`.StaticPool`. This pool maintains a single
67 connection, so that all access to the engine
68 use the same ``:memory:`` database.
69* When a file-based database is specified, the dialect will use
70 :class:`.AsyncAdaptedQueuePool` as the source of connections.
71
72 .. versionchanged:: 2.0.38
73
74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
76 may be used by specifying it via the
77 :paramref:`_sa.create_engine.poolclass` parameter.
78
79""" # noqa
80from __future__ import annotations
81
82import asyncio
83from collections import deque
84from functools import partial
85from types import ModuleType
86from typing import Any
87from typing import cast
88from typing import Deque
89from typing import Iterator
90from typing import NoReturn
91from typing import Optional
92from typing import Sequence
93from typing import TYPE_CHECKING
94from typing import Union
95
96from .base import SQLiteExecutionContext
97from .pysqlite import SQLiteDialect_pysqlite
98from ... import pool
99from ... import util
100from ...connectors.asyncio import AsyncAdapt_dbapi_module
101from ...engine import AdaptedConnection
102from ...util.concurrency import await_fallback
103from ...util.concurrency import await_only
104
105if TYPE_CHECKING:
106 from ...connectors.asyncio import AsyncIODBAPIConnection
107 from ...connectors.asyncio import AsyncIODBAPICursor
108 from ...engine.interfaces import _DBAPICursorDescription
109 from ...engine.interfaces import _DBAPIMultiExecuteParams
110 from ...engine.interfaces import _DBAPISingleExecuteParams
111 from ...engine.interfaces import DBAPIConnection
112 from ...engine.interfaces import DBAPICursor
113 from ...engine.interfaces import DBAPIModule
114 from ...engine.url import URL
115 from ...pool.base import PoolProxiedConnection
116
117
118class AsyncAdapt_aiosqlite_cursor:
119 # TODO: base on connectors/asyncio.py
120 # see #10415
121
122 __slots__ = (
123 "_adapt_connection",
124 "_connection",
125 "description",
126 "await_",
127 "_rows",
128 "arraysize",
129 "rowcount",
130 "lastrowid",
131 )
132
133 server_side = False
134
135 def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection):
136 self._adapt_connection = adapt_connection
137 self._connection = adapt_connection._connection
138 self.await_ = adapt_connection.await_
139 self.arraysize = 1
140 self.rowcount = -1
141 self.description: Optional[_DBAPICursorDescription] = None
142 self._rows: Deque[Any] = deque()
143
144 async def _async_soft_close(self) -> None:
145 return
146
147 def close(self) -> None:
148 self._rows.clear()
149
150 def execute(
151 self,
152 operation: Any,
153 parameters: Optional[_DBAPISingleExecuteParams] = None,
154 ) -> Any:
155
156 try:
157 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
158
159 if parameters is None:
160 self.await_(_cursor.execute(operation))
161 else:
162 self.await_(_cursor.execute(operation, parameters))
163
164 if _cursor.description:
165 self.description = _cursor.description
166 self.lastrowid = self.rowcount = -1
167
168 if not self.server_side:
169 self._rows = deque(self.await_(_cursor.fetchall()))
170 else:
171 self.description = None
172 self.lastrowid = _cursor.lastrowid
173 self.rowcount = _cursor.rowcount
174
175 if not self.server_side:
176 self.await_(_cursor.close())
177 else:
178 self._cursor = _cursor # type: ignore[misc]
179 except Exception as error:
180 self._adapt_connection._handle_exception(error)
181
182 def executemany(
183 self,
184 operation: Any,
185 seq_of_parameters: _DBAPIMultiExecuteParams,
186 ) -> Any:
187 try:
188 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
189 self.await_(_cursor.executemany(operation, seq_of_parameters))
190 self.description = None
191 self.lastrowid = _cursor.lastrowid
192 self.rowcount = _cursor.rowcount
193 self.await_(_cursor.close())
194 except Exception as error:
195 self._adapt_connection._handle_exception(error)
196
197 def setinputsizes(self, *inputsizes: Any) -> None:
198 pass
199
200 def __iter__(self) -> Iterator[Any]:
201 while self._rows:
202 yield self._rows.popleft()
203
204 def fetchone(self) -> Optional[Any]:
205 if self._rows:
206 return self._rows.popleft()
207 else:
208 return None
209
210 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
211 if size is None:
212 size = self.arraysize
213
214 rr = self._rows
215 return [rr.popleft() for _ in range(min(size, len(rr)))]
216
217 def fetchall(self) -> Sequence[Any]:
218 retval = list(self._rows)
219 self._rows.clear()
220 return retval
221
222
223class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
224 # TODO: base on connectors/asyncio.py
225 # see #10415
226 __slots__ = "_cursor"
227
228 server_side = True
229
230 def __init__(self, *arg: Any, **kw: Any) -> None:
231 super().__init__(*arg, **kw)
232 self._cursor: Optional[AsyncIODBAPICursor] = None
233
234 def close(self) -> None:
235 if self._cursor is not None:
236 self.await_(self._cursor.close())
237 self._cursor = None
238
239 def fetchone(self) -> Optional[Any]:
240 assert self._cursor is not None
241 return self.await_(self._cursor.fetchone())
242
243 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
244 assert self._cursor is not None
245 if size is None:
246 size = self.arraysize
247 return self.await_(self._cursor.fetchmany(size=size))
248
249 def fetchall(self) -> Sequence[Any]:
250 assert self._cursor is not None
251 return self.await_(self._cursor.fetchall())
252
253
254class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
255 await_ = staticmethod(await_only)
256 __slots__ = ("dbapi",)
257
258 def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None:
259 self.dbapi = dbapi
260 self._connection = connection
261
262 @property
263 def isolation_level(self) -> Optional[str]:
264 return cast(str, self._connection.isolation_level)
265
266 @isolation_level.setter
267 def isolation_level(self, value: Optional[str]) -> None:
268 # aiosqlite's isolation_level setter works outside the Thread
269 # that it's supposed to, necessitating setting check_same_thread=False.
270 # for improved stability, we instead invent our own awaitable version
271 # using aiosqlite's async queue directly.
272
273 def set_iso(
274 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
275 ) -> None:
276 connection.isolation_level = value
277
278 function = partial(set_iso, self._connection._conn, value)
279 future = asyncio.get_event_loop().create_future()
280
281 self._connection._tx.put_nowait((future, function))
282
283 try:
284 self.await_(future)
285 except Exception as error:
286 self._handle_exception(error)
287
288 def create_function(self, *args: Any, **kw: Any) -> None:
289 try:
290 self.await_(self._connection.create_function(*args, **kw))
291 except Exception as error:
292 self._handle_exception(error)
293
294 def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor:
295 if server_side:
296 return AsyncAdapt_aiosqlite_ss_cursor(self)
297 else:
298 return AsyncAdapt_aiosqlite_cursor(self)
299
300 def execute(self, *args: Any, **kw: Any) -> Any:
301 return self.await_(self._connection.execute(*args, **kw))
302
303 def rollback(self) -> None:
304 try:
305 self.await_(self._connection.rollback())
306 except Exception as error:
307 self._handle_exception(error)
308
309 def commit(self) -> None:
310 try:
311 self.await_(self._connection.commit())
312 except Exception as error:
313 self._handle_exception(error)
314
315 def close(self) -> None:
316 try:
317 self.await_(self._connection.close())
318 except ValueError:
319 # this is undocumented for aiosqlite, that ValueError
320 # was raised if .close() was called more than once, which is
321 # both not customary for DBAPI and is also not a DBAPI.Error
322 # exception. This is now fixed in aiosqlite via my PR
323 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
324 # assured this will not become some other kind of exception,
325 # since it doesn't raise anymore.
326
327 pass
328 except Exception as error:
329 self._handle_exception(error)
330
331 def _handle_exception(self, error: Exception) -> NoReturn:
332 if (
333 isinstance(error, ValueError)
334 and error.args[0] == "no active connection"
335 ):
336 raise self.dbapi.sqlite.OperationalError(
337 "no active connection"
338 ) from error
339 else:
340 raise error
341
342
343class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
344 __slots__ = ()
345
346 await_ = staticmethod(await_fallback)
347
348
349class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
350 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
351 self.aiosqlite = aiosqlite
352 self.sqlite = sqlite
353 self.paramstyle = "qmark"
354 self._init_dbapi_attributes()
355
356 def _init_dbapi_attributes(self) -> None:
357 for name in (
358 "DatabaseError",
359 "Error",
360 "IntegrityError",
361 "NotSupportedError",
362 "OperationalError",
363 "ProgrammingError",
364 "sqlite_version",
365 "sqlite_version_info",
366 ):
367 setattr(self, name, getattr(self.aiosqlite, name))
368
369 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
370 setattr(self, name, getattr(self.sqlite, name))
371
372 for name in ("Binary",):
373 setattr(self, name, getattr(self.sqlite, name))
374
375 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
376 async_fallback = kw.pop("async_fallback", False)
377
378 creator_fn = kw.pop("async_creator_fn", None)
379 if creator_fn:
380 connection = creator_fn(*arg, **kw)
381 else:
382 connection = self.aiosqlite.connect(*arg, **kw)
383 # it's a Thread. you'll thank us later
384 connection.daemon = True
385
386 if util.asbool(async_fallback):
387 return AsyncAdaptFallback_aiosqlite_connection(
388 self,
389 await_fallback(connection),
390 )
391 else:
392 return AsyncAdapt_aiosqlite_connection(
393 self,
394 await_only(connection),
395 )
396
397
398class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
399 def create_server_side_cursor(self) -> DBAPICursor:
400 return self._dbapi_connection.cursor(server_side=True)
401
402
403class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
404 driver = "aiosqlite"
405 supports_statement_cache = True
406
407 is_async = True
408
409 supports_server_side_cursors = True
410
411 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
412
413 @classmethod
414 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
415 return AsyncAdapt_aiosqlite_dbapi(
416 __import__("aiosqlite"), __import__("sqlite3")
417 )
418
419 @classmethod
420 def get_pool_class(cls, url: URL) -> type[pool.Pool]:
421 if cls._is_url_file_db(url):
422 return pool.AsyncAdaptedQueuePool
423 else:
424 return pool.StaticPool
425
426 def is_disconnect(
427 self,
428 e: DBAPIModule.Error,
429 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
430 cursor: Optional[DBAPICursor],
431 ) -> bool:
432 self.dbapi = cast("DBAPIModule", self.dbapi)
433 if isinstance(
434 e, self.dbapi.OperationalError
435 ) and "no active connection" in str(e):
436 return True
437
438 return super().is_disconnect(e, connection, cursor)
439
440 def get_driver_connection(
441 self, connection: DBAPIConnection
442 ) -> AsyncIODBAPIConnection:
443 return connection._connection # type: ignore[no-any-return]
444
445
446dialect = SQLiteDialect_aiosqlite