1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: sqlite+aiosqlite
12 :name: aiosqlite
13 :dbapi: aiosqlite
14 :connectstring: sqlite+aiosqlite:///file_path
15 :url: https://pypi.org/project/aiosqlite/
16
17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
18running on top of pysqlite.
19
20aiosqlite is a wrapper around pysqlite that uses a background thread for
21each connection. It does not actually use non-blocking IO, as SQLite
22databases are not socket-based. However it does provide a working asyncio
23interface that's useful for testing and prototyping purposes.
24
25Using a special asyncio mediation layer, the aiosqlite dialect is usable
26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
27extension package.
28
29This dialect should normally be used only with the
30:func:`_asyncio.create_async_engine` engine creation function::
31
32 from sqlalchemy.ext.asyncio import create_async_engine
33
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52A newly revised version of this important section is now available
53at the top level of the SQLAlchemy SQLite documentation, in the section
54:ref:`sqlite_transactions`.
55
56
57.. _aiosqlite_pooling:
58
59Pooling Behavior
60----------------
61
62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
63based on the kind of SQLite database that's requested:
64
65* When a ``:memory:`` SQLite database is specified, the dialect by default
66 will use :class:`.StaticPool`. This pool maintains a single
67 connection, so that all access to the engine
68 use the same ``:memory:`` database.
69* When a file-based database is specified, the dialect will use
70 :class:`.AsyncAdaptedQueuePool` as the source of connections.
71
72 .. versionchanged:: 2.0.38
73
74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
76 may be used by specifying it via the
77 :paramref:`_sa.create_engine.poolclass` parameter.
78
79""" # noqa
80from __future__ import annotations
81
82import asyncio
83from collections import deque
84from functools import partial
85from threading import Thread
86from types import ModuleType
87from typing import Any
88from typing import cast
89from typing import Deque
90from typing import Iterator
91from typing import NoReturn
92from typing import Optional
93from typing import Sequence
94from typing import TYPE_CHECKING
95from typing import Union
96
97from .base import SQLiteExecutionContext
98from .pysqlite import SQLiteDialect_pysqlite
99from ... import pool
100from ... import util
101from ...connectors.asyncio import AsyncAdapt_dbapi_module
102from ...connectors.asyncio import AsyncAdapt_terminate
103from ...engine import AdaptedConnection
104from ...util.concurrency import await_fallback
105from ...util.concurrency import await_only
106
107if TYPE_CHECKING:
108 from ...connectors.asyncio import AsyncIODBAPIConnection
109 from ...connectors.asyncio import AsyncIODBAPICursor
110 from ...engine.interfaces import _DBAPICursorDescription
111 from ...engine.interfaces import _DBAPIMultiExecuteParams
112 from ...engine.interfaces import _DBAPISingleExecuteParams
113 from ...engine.interfaces import DBAPIConnection
114 from ...engine.interfaces import DBAPICursor
115 from ...engine.interfaces import DBAPIModule
116 from ...engine.url import URL
117 from ...pool.base import PoolProxiedConnection
118
119
120class AsyncAdapt_aiosqlite_cursor:
121 # TODO: base on connectors/asyncio.py
122 # see #10415
123
124 __slots__ = (
125 "_adapt_connection",
126 "_connection",
127 "description",
128 "await_",
129 "_rows",
130 "arraysize",
131 "rowcount",
132 "lastrowid",
133 )
134
135 server_side = False
136
137 def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection):
138 self._adapt_connection = adapt_connection
139 self._connection = adapt_connection._connection
140 self.await_ = adapt_connection.await_
141 self.arraysize = 1
142 self.rowcount = -1
143 self.description: Optional[_DBAPICursorDescription] = None
144 self._rows: Deque[Any] = deque()
145
146 async def _async_soft_close(self) -> None:
147 return
148
149 def close(self) -> None:
150 self._rows.clear()
151
152 def execute(
153 self,
154 operation: Any,
155 parameters: Optional[_DBAPISingleExecuteParams] = None,
156 ) -> Any:
157
158 try:
159 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
160
161 if parameters is None:
162 self.await_(_cursor.execute(operation))
163 else:
164 self.await_(_cursor.execute(operation, parameters))
165
166 if _cursor.description:
167 self.description = _cursor.description
168 self.lastrowid = self.rowcount = -1
169
170 if not self.server_side:
171 self._rows = deque(self.await_(_cursor.fetchall()))
172 else:
173 self.description = None
174 self.lastrowid = _cursor.lastrowid
175 self.rowcount = _cursor.rowcount
176
177 if not self.server_side:
178 self.await_(_cursor.close())
179 else:
180 self._cursor = _cursor # type: ignore[misc]
181 except Exception as error:
182 self._adapt_connection._handle_exception(error)
183
184 def executemany(
185 self,
186 operation: Any,
187 seq_of_parameters: _DBAPIMultiExecuteParams,
188 ) -> Any:
189 try:
190 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
191 self.await_(_cursor.executemany(operation, seq_of_parameters))
192 self.description = None
193 self.lastrowid = _cursor.lastrowid
194 self.rowcount = _cursor.rowcount
195 self.await_(_cursor.close())
196 except Exception as error:
197 self._adapt_connection._handle_exception(error)
198
199 def setinputsizes(self, *inputsizes: Any) -> None:
200 pass
201
202 def __iter__(self) -> Iterator[Any]:
203 while self._rows:
204 yield self._rows.popleft()
205
206 def fetchone(self) -> Optional[Any]:
207 if self._rows:
208 return self._rows.popleft()
209 else:
210 return None
211
212 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
213 if size is None:
214 size = self.arraysize
215
216 rr = self._rows
217 return [rr.popleft() for _ in range(min(size, len(rr)))]
218
219 def fetchall(self) -> Sequence[Any]:
220 retval = list(self._rows)
221 self._rows.clear()
222 return retval
223
224
225class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
226 # TODO: base on connectors/asyncio.py
227 # see #10415
228 __slots__ = "_cursor"
229
230 server_side = True
231
232 def __init__(self, *arg: Any, **kw: Any) -> None:
233 super().__init__(*arg, **kw)
234 self._cursor: Optional[AsyncIODBAPICursor] = None
235
236 def close(self) -> None:
237 if self._cursor is not None:
238 self.await_(self._cursor.close())
239 self._cursor = None
240
241 def fetchone(self) -> Optional[Any]:
242 assert self._cursor is not None
243 return self.await_(self._cursor.fetchone())
244
245 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
246 assert self._cursor is not None
247 if size is None:
248 size = self.arraysize
249 return self.await_(self._cursor.fetchmany(size=size))
250
251 def fetchall(self) -> Sequence[Any]:
252 assert self._cursor is not None
253 return self.await_(self._cursor.fetchall())
254
255
256class AsyncAdapt_aiosqlite_connection(AsyncAdapt_terminate, AdaptedConnection):
257 await_ = staticmethod(await_only)
258 __slots__ = ("dbapi",)
259
260 def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None:
261 self.dbapi = dbapi
262 self._connection = connection
263
264 @property
265 def isolation_level(self) -> Optional[str]:
266 return cast(str, self._connection.isolation_level)
267
268 @isolation_level.setter
269 def isolation_level(self, value: Optional[str]) -> None:
270 # aiosqlite's isolation_level setter works outside the Thread
271 # that it's supposed to, necessitating setting check_same_thread=False.
272 # for improved stability, we instead invent our own awaitable version
273 # using aiosqlite's async queue directly.
274
275 def set_iso(
276 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
277 ) -> None:
278 connection.isolation_level = value
279
280 function = partial(set_iso, self._connection._conn, value)
281 future = asyncio.get_event_loop().create_future()
282
283 self._connection._tx.put_nowait((future, function))
284
285 try:
286 self.await_(future)
287 except Exception as error:
288 self._handle_exception(error)
289
290 def create_function(self, *args: Any, **kw: Any) -> None:
291 try:
292 self.await_(self._connection.create_function(*args, **kw))
293 except Exception as error:
294 self._handle_exception(error)
295
296 def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor:
297 if server_side:
298 return AsyncAdapt_aiosqlite_ss_cursor(self)
299 else:
300 return AsyncAdapt_aiosqlite_cursor(self)
301
302 def execute(self, *args: Any, **kw: Any) -> Any:
303 return self.await_(self._connection.execute(*args, **kw))
304
305 def rollback(self) -> None:
306 try:
307 self.await_(self._connection.rollback())
308 except Exception as error:
309 self._handle_exception(error)
310
311 def commit(self) -> None:
312 try:
313 self.await_(self._connection.commit())
314 except Exception as error:
315 self._handle_exception(error)
316
317 def close(self) -> None:
318 try:
319 self.await_(self._connection.close())
320 except ValueError:
321 # this is undocumented for aiosqlite, that ValueError
322 # was raised if .close() was called more than once, which is
323 # both not customary for DBAPI and is also not a DBAPI.Error
324 # exception. This is now fixed in aiosqlite via my PR
325 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
326 # assured this will not become some other kind of exception,
327 # since it doesn't raise anymore.
328
329 pass
330 except Exception as error:
331 self._handle_exception(error)
332
333 def _handle_exception(self, error: Exception) -> NoReturn:
334 if (
335 isinstance(error, ValueError)
336 and error.args[0] == "no active connection"
337 ):
338 raise self.dbapi.sqlite.OperationalError(
339 "no active connection"
340 ) from error
341 else:
342 raise error
343
344 async def _terminate_graceful_close(self) -> None:
345 """Try to close connection gracefully"""
346 await self._connection.close()
347
348 def _terminate_force_close(self) -> None:
349 """Terminate the connection"""
350
351 # this was added in aiosqlite 0.22.1. if stop() is not present,
352 # the dialect should indicate has_terminate=False
353 try:
354 meth = self._connection.stop
355 except AttributeError as ae:
356 raise NotImplementedError(
357 "terminate_force_close() not implemented by this DBAPI shim"
358 ) from ae
359 else:
360 meth()
361
362
363class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
364 __slots__ = ()
365
366 await_ = staticmethod(await_fallback)
367
368
369class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
370 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
371 self.aiosqlite = aiosqlite
372 self.sqlite = sqlite
373 self.paramstyle = "qmark"
374 self.has_stop = hasattr(aiosqlite.Connection, "stop")
375 self._init_dbapi_attributes()
376
377 def _init_dbapi_attributes(self) -> None:
378 for name in (
379 "DatabaseError",
380 "Error",
381 "IntegrityError",
382 "NotSupportedError",
383 "OperationalError",
384 "ProgrammingError",
385 "sqlite_version",
386 "sqlite_version_info",
387 ):
388 setattr(self, name, getattr(self.aiosqlite, name))
389
390 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
391 setattr(self, name, getattr(self.sqlite, name))
392
393 for name in ("Binary",):
394 setattr(self, name, getattr(self.sqlite, name))
395
396 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
397 async_fallback = kw.pop("async_fallback", False)
398
399 creator_fn = kw.pop("async_creator_fn", None)
400 if creator_fn:
401 connection = creator_fn(*arg, **kw)
402 else:
403 connection = self.aiosqlite.connect(*arg, **kw)
404
405 # aiosqlite uses a Thread. you'll thank us later
406 if isinstance(connection, Thread):
407 # Connection itself was a thread in version prior to 0.22
408 connection.daemon = True
409 else:
410 # in 0.22+ instead it contains a thread.
411 connection._thread.daemon = True
412
413 if util.asbool(async_fallback):
414 return AsyncAdaptFallback_aiosqlite_connection(
415 self,
416 await_fallback(connection),
417 )
418 else:
419 return AsyncAdapt_aiosqlite_connection(
420 self,
421 await_only(connection),
422 )
423
424
425class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
426 def create_server_side_cursor(self) -> DBAPICursor:
427 return self._dbapi_connection.cursor(server_side=True)
428
429
430class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
431 driver = "aiosqlite"
432 supports_statement_cache = True
433
434 is_async = True
435 has_terminate = True
436
437 supports_server_side_cursors = True
438
439 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
440
441 def __init__(self, **kwargs: Any):
442 super().__init__(**kwargs)
443 if self.dbapi and not self.dbapi.has_stop:
444 self.has_terminate = False
445
446 @classmethod
447 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
448 return AsyncAdapt_aiosqlite_dbapi(
449 __import__("aiosqlite"), __import__("sqlite3")
450 )
451
452 @classmethod
453 def get_pool_class(cls, url: URL) -> type[pool.Pool]:
454 if cls._is_url_file_db(url):
455 return pool.AsyncAdaptedQueuePool
456 else:
457 return pool.StaticPool
458
459 def is_disconnect(
460 self,
461 e: DBAPIModule.Error,
462 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
463 cursor: Optional[DBAPICursor],
464 ) -> bool:
465 self.dbapi = cast("DBAPIModule", self.dbapi)
466 if isinstance(
467 e, self.dbapi.OperationalError
468 ) and "no active connection" in str(e):
469 return True
470
471 return super().is_disconnect(e, connection, cursor)
472
473 def get_driver_connection(
474 self, connection: DBAPIConnection
475 ) -> AsyncIODBAPIConnection:
476 return connection._connection # type: ignore[no-any-return]
477
478 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
479 dbapi_connection.terminate()
480
481
482dialect = SQLiteDialect_aiosqlite