Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

140 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9r""" 

10 

11.. dialect:: sqlite+aiosqlite 

12 :name: aiosqlite 

13 :dbapi: aiosqlite 

14 :connectstring: sqlite+aiosqlite:///file_path 

15 :url: https://pypi.org/project/aiosqlite/ 

16 

17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

18running on top of pysqlite. 

19 

20aiosqlite is a wrapper around pysqlite that uses a background thread for 

21each connection. It does not actually use non-blocking IO, as SQLite 

22databases are not socket-based. However it does provide a working asyncio 

23interface that's useful for testing and prototyping purposes. 

24 

25Using a special asyncio mediation layer, the aiosqlite dialect is usable 

26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

27extension package. 

28 

29This dialect should normally be used only with the 

30:func:`_asyncio.create_async_engine` engine creation function:: 

31 

32 from sqlalchemy.ext.asyncio import create_async_engine 

33 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52A newly revised version of this important section is now available 

53at the top level of the SQLAlchemy SQLite documentation, in the section 

54:ref:`sqlite_transactions`. 

55 

56 

57.. _aiosqlite_pooling: 

58 

59Pooling Behavior 

60---------------- 

61 

62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

63based on the kind of SQLite database that's requested: 

64 

65* When a ``:memory:`` SQLite database is specified, the dialect by default 

66 will use :class:`.StaticPool`. This pool maintains a single 

67 connection, so that all access to the engine 

68 use the same ``:memory:`` database. 

69* When a file-based database is specified, the dialect will use 

70 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

71 

72 .. versionchanged:: 2.0.38 

73 

74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

76 may be used by specifying it via the 

77 :paramref:`_sa.create_engine.poolclass` parameter. 

78 

79""" # noqa 

80 

81from __future__ import annotations 

82 

83import asyncio 

84from functools import partial 

85from threading import Thread 

86from types import ModuleType 

87from typing import Any 

88from typing import cast 

89from typing import NoReturn 

90from typing import Optional 

91from typing import TYPE_CHECKING 

92from typing import Union 

93 

94from .base import SQLiteExecutionContext 

95from .pysqlite import SQLiteDialect_pysqlite 

96from ... import pool 

97from ...connectors.asyncio import AsyncAdapt_dbapi_connection 

98from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 

99from ...connectors.asyncio import AsyncAdapt_dbapi_module 

100from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 

101from ...connectors.asyncio import AsyncAdapt_terminate 

102from ...util.concurrency import await_ 

103 

104if TYPE_CHECKING: 

105 from ...connectors.asyncio import AsyncIODBAPIConnection 

106 from ...engine.interfaces import DBAPIConnection 

107 from ...engine.interfaces import DBAPICursor 

108 from ...engine.interfaces import DBAPIModule 

109 from ...engine.url import URL 

110 from ...pool.base import PoolProxiedConnection 

111 

112 

113class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor): 

114 __slots__ = () 

115 

116 

117class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor): 

118 __slots__ = () 

119 

120 

121class AsyncAdapt_aiosqlite_connection( 

122 AsyncAdapt_terminate, AsyncAdapt_dbapi_connection 

123): 

124 __slots__ = () 

125 

126 _cursor_cls = AsyncAdapt_aiosqlite_cursor 

127 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor 

128 

129 @property 

130 def isolation_level(self) -> Optional[str]: 

131 return cast(str, self._connection.isolation_level) 

132 

133 @isolation_level.setter 

134 def isolation_level(self, value: Optional[str]) -> None: 

135 # aiosqlite's isolation_level setter works outside the Thread 

136 # that it's supposed to, necessitating setting check_same_thread=False. 

137 # for improved stability, we instead invent our own awaitable version 

138 # using aiosqlite's async queue directly. 

139 

140 def set_iso( 

141 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 

142 ) -> None: 

143 connection.isolation_level = value 

144 

145 function = partial(set_iso, self._connection._conn, value) 

146 future = asyncio.get_event_loop().create_future() 

147 

148 self._connection._tx.put_nowait((future, function)) 

149 

150 try: 

151 await_(future) 

152 except Exception as error: 

153 self._handle_exception(error) 

154 

155 def create_function(self, *args: Any, **kw: Any) -> None: 

156 try: 

157 await_(self._connection.create_function(*args, **kw)) 

158 except Exception as error: 

159 self._handle_exception(error) 

160 

161 def rollback(self) -> None: 

162 if self._connection._connection: 

163 super().rollback() 

164 

165 def commit(self) -> None: 

166 if self._connection._connection: 

167 super().commit() 

168 

169 def close(self) -> None: 

170 try: 

171 await_(self._connection.close()) 

172 except ValueError: 

173 # this is undocumented for aiosqlite, that ValueError 

174 # was raised if .close() was called more than once, which is 

175 # both not customary for DBAPI and is also not a DBAPI.Error 

176 # exception. This is now fixed in aiosqlite via my PR 

177 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

178 # assured this will not become some other kind of exception, 

179 # since it doesn't raise anymore. 

180 

181 pass 

182 except Exception as error: 

183 self._handle_exception(error) 

184 

185 @classmethod 

186 def _handle_exception_no_connection( 

187 cls, dbapi: Any, error: Exception 

188 ) -> NoReturn: 

189 if isinstance(error, ValueError) and error.args[0].lower() in ( 

190 "no active connection", 

191 "connection closed", 

192 ): 

193 raise dbapi.sqlite.OperationalError(error.args[0]) from error 

194 else: 

195 super()._handle_exception_no_connection(dbapi, error) 

196 

197 async def _terminate_graceful_close(self) -> None: 

198 """Try to close connection gracefully""" 

199 await self._connection.close() 

200 

201 def _terminate_force_close(self) -> None: 

202 """Terminate the connection""" 

203 

204 # this was added in aiosqlite 0.22.1. if stop() is not present, 

205 # the dialect should indicate has_terminate=False 

206 try: 

207 meth = self._connection.stop 

208 except AttributeError as ae: 

209 raise NotImplementedError( 

210 "terminate_force_close() not implemented by this DBAPI shim" 

211 ) from ae 

212 else: 

213 meth() 

214 

215 

216class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 

217 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 

218 super().__init__(aiosqlite, dbapi_module=sqlite) 

219 self.aiosqlite = aiosqlite 

220 self.sqlite = sqlite 

221 self.paramstyle = "qmark" 

222 self.has_stop = hasattr(aiosqlite.Connection, "stop") 

223 self._init_dbapi_attributes() 

224 

225 def _init_dbapi_attributes(self) -> None: 

226 for name in ( 

227 "DatabaseError", 

228 "Error", 

229 "IntegrityError", 

230 "NotSupportedError", 

231 "OperationalError", 

232 "ProgrammingError", 

233 "sqlite_version", 

234 "sqlite_version_info", 

235 ): 

236 setattr(self, name, getattr(self.aiosqlite, name)) 

237 

238 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

239 setattr(self, name, getattr(self.sqlite, name)) 

240 

241 for name in ("Binary",): 

242 setattr(self, name, getattr(self.sqlite, name)) 

243 

244 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 

245 creator_fn = kw.pop("async_creator_fn", None) 

246 if creator_fn: 

247 connection = creator_fn(*arg, **kw) 

248 else: 

249 connection = self.aiosqlite.connect(*arg, **kw) 

250 

251 # aiosqlite uses a Thread. you'll thank us later 

252 if isinstance(connection, Thread): 

253 # Connection itself was a thread in version prior to 0.22 

254 connection.daemon = True 

255 else: 

256 # in 0.22+ instead it contains a thread. 

257 connection._thread.daemon = True 

258 

259 return AsyncAdapt_aiosqlite_connection(self, await_(connection)) 

260 

261 

262class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

263 def create_server_side_cursor(self) -> DBAPICursor: 

264 return self._dbapi_connection.cursor(server_side=True) 

265 

266 

267class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

268 driver = "aiosqlite" 

269 supports_statement_cache = True 

270 

271 is_async = True 

272 has_terminate = True 

273 

274 supports_server_side_cursors = True 

275 

276 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

277 

278 def __init__(self, **kwargs: Any): 

279 super().__init__(**kwargs) 

280 if self.dbapi and not self.dbapi.has_stop: 

281 self.has_terminate = False 

282 

283 @classmethod 

284 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 

285 return AsyncAdapt_aiosqlite_dbapi( 

286 __import__("aiosqlite"), __import__("sqlite3") 

287 ) 

288 

289 @classmethod 

290 def get_pool_class(cls, url: URL) -> type[pool.Pool]: 

291 if cls._is_url_file_db(url): 

292 return pool.AsyncAdaptedQueuePool 

293 else: 

294 return pool.StaticPool 

295 

296 def is_disconnect( 

297 self, 

298 e: DBAPIModule.Error, 

299 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

300 cursor: Optional[DBAPICursor], 

301 ) -> bool: 

302 self.dbapi = cast("DBAPIModule", self.dbapi) 

303 if isinstance(e, self.dbapi.OperationalError): 

304 err_lower = str(e).lower() 

305 if ( 

306 "no active connection" in err_lower 

307 or "connection closed" in err_lower 

308 ): 

309 return True 

310 

311 return super().is_disconnect(e, connection, cursor) 

312 

313 def get_driver_connection( 

314 self, connection: DBAPIConnection 

315 ) -> AsyncIODBAPIConnection: 

316 return connection._connection # type: ignore[no-any-return] 

317 

318 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: 

319 dbapi_connection.terminate() 

320 

321 

322dialect = SQLiteDialect_aiosqlite