Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

140 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9r""" 

10 

11.. dialect:: sqlite+aiosqlite 

12 :name: aiosqlite 

13 :dbapi: aiosqlite 

14 :connectstring: sqlite+aiosqlite:///file_path 

15 :url: https://pypi.org/project/aiosqlite/ 

16 

17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

18running on top of pysqlite. 

19 

20aiosqlite is a wrapper around pysqlite that uses a background thread for 

21each connection. It does not actually use non-blocking IO, as SQLite 

22databases are not socket-based. However it does provide a working asyncio 

23interface that's useful for testing and prototyping purposes. 

24 

25Using a special asyncio mediation layer, the aiosqlite dialect is usable 

26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

27extension package. 

28 

29This dialect should normally be used only with the 

30:func:`_asyncio.create_async_engine` engine creation function:: 

31 

32 from sqlalchemy.ext.asyncio import create_async_engine 

33 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52A newly revised version of this important section is now available 

53at the top level of the SQLAlchemy SQLite documentation, in the section 

54:ref:`sqlite_transactions`. 

55 

56 

57.. _aiosqlite_pooling: 

58 

59Pooling Behavior 

60---------------- 

61 

62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

63based on the kind of SQLite database that's requested: 

64 

65* When a ``:memory:`` SQLite database is specified, the dialect by default 

66 will use :class:`.StaticPool`. This pool maintains a single 

67 connection, so that all access to the engine 

68 use the same ``:memory:`` database. 

69* When a file-based database is specified, the dialect will use 

70 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

71 

72 .. versionchanged:: 2.0.38 

73 

74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

76 may be used by specifying it via the 

77 :paramref:`_sa.create_engine.poolclass` parameter. 

78 

79""" # noqa 

80from __future__ import annotations 

81 

82import asyncio 

83from functools import partial 

84from threading import Thread 

85from types import ModuleType 

86from typing import Any 

87from typing import cast 

88from typing import NoReturn 

89from typing import Optional 

90from typing import TYPE_CHECKING 

91from typing import Union 

92 

93from .base import SQLiteExecutionContext 

94from .pysqlite import SQLiteDialect_pysqlite 

95from ... import pool 

96from ...connectors.asyncio import AsyncAdapt_dbapi_connection 

97from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 

98from ...connectors.asyncio import AsyncAdapt_dbapi_module 

99from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 

100from ...connectors.asyncio import AsyncAdapt_terminate 

101from ...util.concurrency import await_ 

102 

103if TYPE_CHECKING: 

104 from ...connectors.asyncio import AsyncIODBAPIConnection 

105 from ...engine.interfaces import DBAPIConnection 

106 from ...engine.interfaces import DBAPICursor 

107 from ...engine.interfaces import DBAPIModule 

108 from ...engine.url import URL 

109 from ...pool.base import PoolProxiedConnection 

110 

111 

112class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor): 

113 __slots__ = () 

114 

115 

116class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor): 

117 __slots__ = () 

118 

119 

120class AsyncAdapt_aiosqlite_connection( 

121 AsyncAdapt_terminate, AsyncAdapt_dbapi_connection 

122): 

123 __slots__ = () 

124 

125 _cursor_cls = AsyncAdapt_aiosqlite_cursor 

126 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor 

127 

128 @property 

129 def isolation_level(self) -> Optional[str]: 

130 return cast(str, self._connection.isolation_level) 

131 

132 @isolation_level.setter 

133 def isolation_level(self, value: Optional[str]) -> None: 

134 # aiosqlite's isolation_level setter works outside the Thread 

135 # that it's supposed to, necessitating setting check_same_thread=False. 

136 # for improved stability, we instead invent our own awaitable version 

137 # using aiosqlite's async queue directly. 

138 

139 def set_iso( 

140 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 

141 ) -> None: 

142 connection.isolation_level = value 

143 

144 function = partial(set_iso, self._connection._conn, value) 

145 future = asyncio.get_event_loop().create_future() 

146 

147 self._connection._tx.put_nowait((future, function)) 

148 

149 try: 

150 await_(future) 

151 except Exception as error: 

152 self._handle_exception(error) 

153 

154 def create_function(self, *args: Any, **kw: Any) -> None: 

155 try: 

156 await_(self._connection.create_function(*args, **kw)) 

157 except Exception as error: 

158 self._handle_exception(error) 

159 

160 def rollback(self) -> None: 

161 if self._connection._connection: 

162 super().rollback() 

163 

164 def commit(self) -> None: 

165 if self._connection._connection: 

166 super().commit() 

167 

168 def close(self) -> None: 

169 try: 

170 await_(self._connection.close()) 

171 except ValueError: 

172 # this is undocumented for aiosqlite, that ValueError 

173 # was raised if .close() was called more than once, which is 

174 # both not customary for DBAPI and is also not a DBAPI.Error 

175 # exception. This is now fixed in aiosqlite via my PR 

176 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

177 # assured this will not become some other kind of exception, 

178 # since it doesn't raise anymore. 

179 

180 pass 

181 except Exception as error: 

182 self._handle_exception(error) 

183 

184 @classmethod 

185 def _handle_exception_no_connection( 

186 cls, dbapi: Any, error: Exception 

187 ) -> NoReturn: 

188 if isinstance(error, ValueError) and error.args[0].lower() in ( 

189 "no active connection", 

190 "connection closed", 

191 ): 

192 raise dbapi.sqlite.OperationalError(error.args[0]) from error 

193 else: 

194 super()._handle_exception_no_connection(dbapi, error) 

195 

196 async def _terminate_graceful_close(self) -> None: 

197 """Try to close connection gracefully""" 

198 await self._connection.close() 

199 

200 def _terminate_force_close(self) -> None: 

201 """Terminate the connection""" 

202 

203 # this was added in aiosqlite 0.22.1. if stop() is not present, 

204 # the dialect should indicate has_terminate=False 

205 try: 

206 meth = self._connection.stop 

207 except AttributeError as ae: 

208 raise NotImplementedError( 

209 "terminate_force_close() not implemented by this DBAPI shim" 

210 ) from ae 

211 else: 

212 meth() 

213 

214 

215class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 

216 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 

217 super().__init__(aiosqlite, dbapi_module=sqlite) 

218 self.aiosqlite = aiosqlite 

219 self.sqlite = sqlite 

220 self.paramstyle = "qmark" 

221 self.has_stop = hasattr(aiosqlite.Connection, "stop") 

222 self._init_dbapi_attributes() 

223 

224 def _init_dbapi_attributes(self) -> None: 

225 for name in ( 

226 "DatabaseError", 

227 "Error", 

228 "IntegrityError", 

229 "NotSupportedError", 

230 "OperationalError", 

231 "ProgrammingError", 

232 "sqlite_version", 

233 "sqlite_version_info", 

234 ): 

235 setattr(self, name, getattr(self.aiosqlite, name)) 

236 

237 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

238 setattr(self, name, getattr(self.sqlite, name)) 

239 

240 for name in ("Binary",): 

241 setattr(self, name, getattr(self.sqlite, name)) 

242 

243 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 

244 creator_fn = kw.pop("async_creator_fn", None) 

245 if creator_fn: 

246 connection = creator_fn(*arg, **kw) 

247 else: 

248 connection = self.aiosqlite.connect(*arg, **kw) 

249 

250 # aiosqlite uses a Thread. you'll thank us later 

251 if isinstance(connection, Thread): 

252 # Connection itself was a thread in version prior to 0.22 

253 connection.daemon = True 

254 else: 

255 # in 0.22+ instead it contains a thread. 

256 connection._thread.daemon = True 

257 

258 return AsyncAdapt_aiosqlite_connection(self, await_(connection)) 

259 

260 

261class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

262 def create_server_side_cursor(self) -> DBAPICursor: 

263 return self._dbapi_connection.cursor(server_side=True) 

264 

265 

266class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

267 driver = "aiosqlite" 

268 supports_statement_cache = True 

269 

270 is_async = True 

271 has_terminate = True 

272 

273 supports_server_side_cursors = True 

274 

275 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

276 

277 def __init__(self, **kwargs: Any): 

278 super().__init__(**kwargs) 

279 if self.dbapi and not self.dbapi.has_stop: 

280 self.has_terminate = False 

281 

282 @classmethod 

283 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 

284 return AsyncAdapt_aiosqlite_dbapi( 

285 __import__("aiosqlite"), __import__("sqlite3") 

286 ) 

287 

288 @classmethod 

289 def get_pool_class(cls, url: URL) -> type[pool.Pool]: 

290 if cls._is_url_file_db(url): 

291 return pool.AsyncAdaptedQueuePool 

292 else: 

293 return pool.StaticPool 

294 

295 def is_disconnect( 

296 self, 

297 e: DBAPIModule.Error, 

298 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

299 cursor: Optional[DBAPICursor], 

300 ) -> bool: 

301 self.dbapi = cast("DBAPIModule", self.dbapi) 

302 if isinstance(e, self.dbapi.OperationalError): 

303 err_lower = str(e).lower() 

304 if ( 

305 "no active connection" in err_lower 

306 or "connection closed" in err_lower 

307 ): 

308 return True 

309 

310 return super().is_disconnect(e, connection, cursor) 

311 

312 def get_driver_connection( 

313 self, connection: DBAPIConnection 

314 ) -> AsyncIODBAPIConnection: 

315 return connection._connection # type: ignore[no-any-return] 

316 

317 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: 

318 dbapi_connection.terminate() 

319 

320 

321dialect = SQLiteDialect_aiosqlite