Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

241 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9r""" 

10 

11.. dialect:: sqlite+aiosqlite 

12 :name: aiosqlite 

13 :dbapi: aiosqlite 

14 :connectstring: sqlite+aiosqlite:///file_path 

15 :url: https://pypi.org/project/aiosqlite/ 

16 

17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

18running on top of pysqlite. 

19 

20aiosqlite is a wrapper around pysqlite that uses a background thread for 

21each connection. It does not actually use non-blocking IO, as SQLite 

22databases are not socket-based. However it does provide a working asyncio 

23interface that's useful for testing and prototyping purposes. 

24 

25Using a special asyncio mediation layer, the aiosqlite dialect is usable 

26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

27extension package. 

28 

29This dialect should normally be used only with the 

30:func:`_asyncio.create_async_engine` engine creation function:: 

31 

32 from sqlalchemy.ext.asyncio import create_async_engine 

33 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52A newly revised version of this important section is now available 

53at the top level of the SQLAlchemy SQLite documentation, in the section 

54:ref:`sqlite_transactions`. 

55 

56 

57.. _aiosqlite_pooling: 

58 

59Pooling Behavior 

60---------------- 

61 

62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

63based on the kind of SQLite database that's requested: 

64 

65* When a ``:memory:`` SQLite database is specified, the dialect by default 

66 will use :class:`.StaticPool`. This pool maintains a single 

67 connection, so that all access to the engine 

68 use the same ``:memory:`` database. 

69* When a file-based database is specified, the dialect will use 

70 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

71 

72 .. versionchanged:: 2.0.38 

73 

74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

76 may be used by specifying it via the 

77 :paramref:`_sa.create_engine.poolclass` parameter. 

78 

79""" # noqa 

80 

81from __future__ import annotations 

82 

83import asyncio 

84from collections import deque 

85from functools import partial 

86from threading import Thread 

87from types import ModuleType 

88from typing import Any 

89from typing import cast 

90from typing import Deque 

91from typing import Iterator 

92from typing import NoReturn 

93from typing import Optional 

94from typing import Sequence 

95from typing import TYPE_CHECKING 

96from typing import Union 

97 

98from .base import SQLiteExecutionContext 

99from .pysqlite import SQLiteDialect_pysqlite 

100from ... import pool 

101from ... import util 

102from ...connectors.asyncio import AsyncAdapt_dbapi_module 

103from ...connectors.asyncio import AsyncAdapt_terminate 

104from ...engine import AdaptedConnection 

105from ...util.concurrency import await_fallback 

106from ...util.concurrency import await_only 

107 

108if TYPE_CHECKING: 

109 from ...connectors.asyncio import AsyncIODBAPIConnection 

110 from ...connectors.asyncio import AsyncIODBAPICursor 

111 from ...engine.interfaces import _DBAPICursorDescription 

112 from ...engine.interfaces import _DBAPIMultiExecuteParams 

113 from ...engine.interfaces import _DBAPISingleExecuteParams 

114 from ...engine.interfaces import DBAPIConnection 

115 from ...engine.interfaces import DBAPICursor 

116 from ...engine.interfaces import DBAPIModule 

117 from ...engine.url import URL 

118 from ...pool.base import PoolProxiedConnection 

119 

120 

121class AsyncAdapt_aiosqlite_cursor: 

122 # TODO: base on connectors/asyncio.py 

123 # see #10415 

124 

125 __slots__ = ( 

126 "_adapt_connection", 

127 "_connection", 

128 "description", 

129 "await_", 

130 "_rows", 

131 "arraysize", 

132 "rowcount", 

133 "lastrowid", 

134 ) 

135 

136 server_side = False 

137 

138 def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection): 

139 self._adapt_connection = adapt_connection 

140 self._connection = adapt_connection._connection 

141 self.await_ = adapt_connection.await_ 

142 self.arraysize = 1 

143 self.rowcount = -1 

144 self.description: Optional[_DBAPICursorDescription] = None 

145 self._rows: Deque[Any] = deque() 

146 

147 async def _async_soft_close(self) -> None: 

148 return 

149 

150 def close(self) -> None: 

151 self._rows.clear() 

152 

153 def execute( 

154 self, 

155 operation: Any, 

156 parameters: Optional[_DBAPISingleExecuteParams] = None, 

157 ) -> Any: 

158 

159 try: 

160 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501 

161 

162 if parameters is None: 

163 self.await_(_cursor.execute(operation)) 

164 else: 

165 self.await_(_cursor.execute(operation, parameters)) 

166 

167 if _cursor.description: 

168 self.description = _cursor.description 

169 self.lastrowid = self.rowcount = -1 

170 

171 if not self.server_side: 

172 self._rows = deque(self.await_(_cursor.fetchall())) 

173 else: 

174 self.description = None 

175 self.lastrowid = _cursor.lastrowid 

176 self.rowcount = _cursor.rowcount 

177 

178 if not self.server_side: 

179 self.await_(_cursor.close()) 

180 else: 

181 self._cursor = _cursor # type: ignore[misc] 

182 except Exception as error: 

183 self._adapt_connection._handle_exception(error) 

184 

185 def executemany( 

186 self, 

187 operation: Any, 

188 seq_of_parameters: _DBAPIMultiExecuteParams, 

189 ) -> Any: 

190 try: 

191 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501 

192 self.await_(_cursor.executemany(operation, seq_of_parameters)) 

193 self.description = None 

194 self.lastrowid = _cursor.lastrowid 

195 self.rowcount = _cursor.rowcount 

196 self.await_(_cursor.close()) 

197 except Exception as error: 

198 self._adapt_connection._handle_exception(error) 

199 

200 def setinputsizes(self, *inputsizes: Any) -> None: 

201 pass 

202 

203 def __iter__(self) -> Iterator[Any]: 

204 while self._rows: 

205 yield self._rows.popleft() 

206 

207 def fetchone(self) -> Optional[Any]: 

208 if self._rows: 

209 return self._rows.popleft() 

210 else: 

211 return None 

212 

213 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: 

214 if size is None: 

215 size = self.arraysize 

216 

217 rr = self._rows 

218 return [rr.popleft() for _ in range(min(size, len(rr)))] 

219 

220 def fetchall(self) -> Sequence[Any]: 

221 retval = list(self._rows) 

222 self._rows.clear() 

223 return retval 

224 

225 

226class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): 

227 # TODO: base on connectors/asyncio.py 

228 # see #10415 

229 __slots__ = "_cursor" 

230 

231 server_side = True 

232 

233 def __init__(self, *arg: Any, **kw: Any) -> None: 

234 super().__init__(*arg, **kw) 

235 self._cursor: Optional[AsyncIODBAPICursor] = None 

236 

237 def close(self) -> None: 

238 if self._cursor is not None: 

239 self.await_(self._cursor.close()) 

240 self._cursor = None 

241 

242 def fetchone(self) -> Optional[Any]: 

243 assert self._cursor is not None 

244 return self.await_(self._cursor.fetchone()) 

245 

246 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: 

247 assert self._cursor is not None 

248 if size is None: 

249 size = self.arraysize 

250 return self.await_(self._cursor.fetchmany(size=size)) 

251 

252 def fetchall(self) -> Sequence[Any]: 

253 assert self._cursor is not None 

254 return self.await_(self._cursor.fetchall()) 

255 

256 

257class AsyncAdapt_aiosqlite_connection(AsyncAdapt_terminate, AdaptedConnection): 

258 await_ = staticmethod(await_only) 

259 __slots__ = ("dbapi",) 

260 

261 def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None: 

262 self.dbapi = dbapi 

263 self._connection = connection 

264 

265 @property 

266 def isolation_level(self) -> Optional[str]: 

267 return cast(str, self._connection.isolation_level) 

268 

269 @isolation_level.setter 

270 def isolation_level(self, value: Optional[str]) -> None: 

271 # aiosqlite's isolation_level setter works outside the Thread 

272 # that it's supposed to, necessitating setting check_same_thread=False. 

273 # for improved stability, we instead invent our own awaitable version 

274 # using aiosqlite's async queue directly. 

275 

276 def set_iso( 

277 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 

278 ) -> None: 

279 connection.isolation_level = value 

280 

281 function = partial(set_iso, self._connection._conn, value) 

282 future = asyncio.get_event_loop().create_future() 

283 

284 self._connection._tx.put_nowait((future, function)) 

285 

286 try: 

287 self.await_(future) 

288 except Exception as error: 

289 self._handle_exception(error) 

290 

291 def create_function(self, *args: Any, **kw: Any) -> None: 

292 try: 

293 self.await_(self._connection.create_function(*args, **kw)) 

294 except Exception as error: 

295 self._handle_exception(error) 

296 

297 def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor: 

298 if server_side: 

299 return AsyncAdapt_aiosqlite_ss_cursor(self) 

300 else: 

301 return AsyncAdapt_aiosqlite_cursor(self) 

302 

303 def execute(self, *args: Any, **kw: Any) -> Any: 

304 return self.await_(self._connection.execute(*args, **kw)) 

305 

306 def rollback(self) -> None: 

307 try: 

308 self.await_(self._connection.rollback()) 

309 except Exception as error: 

310 self._handle_exception(error) 

311 

312 def commit(self) -> None: 

313 try: 

314 self.await_(self._connection.commit()) 

315 except Exception as error: 

316 self._handle_exception(error) 

317 

318 def close(self) -> None: 

319 try: 

320 self.await_(self._connection.close()) 

321 except ValueError: 

322 # this is undocumented for aiosqlite, that ValueError 

323 # was raised if .close() was called more than once, which is 

324 # both not customary for DBAPI and is also not a DBAPI.Error 

325 # exception. This is now fixed in aiosqlite via my PR 

326 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

327 # assured this will not become some other kind of exception, 

328 # since it doesn't raise anymore. 

329 

330 pass 

331 except Exception as error: 

332 self._handle_exception(error) 

333 

334 def _handle_exception(self, error: Exception) -> NoReturn: 

335 if ( 

336 isinstance(error, ValueError) 

337 and error.args[0] == "no active connection" 

338 ): 

339 raise self.dbapi.sqlite.OperationalError( 

340 "no active connection" 

341 ) from error 

342 else: 

343 raise error 

344 

345 async def _terminate_graceful_close(self) -> None: 

346 """Try to close connection gracefully""" 

347 await self._connection.close() 

348 

349 def _terminate_force_close(self) -> None: 

350 """Terminate the connection""" 

351 

352 # this was added in aiosqlite 0.22.1. if stop() is not present, 

353 # the dialect should indicate has_terminate=False 

354 try: 

355 meth = self._connection.stop 

356 except AttributeError as ae: 

357 raise NotImplementedError( 

358 "terminate_force_close() not implemented by this DBAPI shim" 

359 ) from ae 

360 else: 

361 meth() 

362 

363 

364class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): 

365 __slots__ = () 

366 

367 await_ = staticmethod(await_fallback) 

368 

369 

370class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 

371 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 

372 self.aiosqlite = aiosqlite 

373 self.sqlite = sqlite 

374 self.paramstyle = "qmark" 

375 self.has_stop = hasattr(aiosqlite.Connection, "stop") 

376 self._init_dbapi_attributes() 

377 

378 def _init_dbapi_attributes(self) -> None: 

379 for name in ( 

380 "DatabaseError", 

381 "Error", 

382 "IntegrityError", 

383 "NotSupportedError", 

384 "OperationalError", 

385 "ProgrammingError", 

386 "sqlite_version", 

387 "sqlite_version_info", 

388 ): 

389 setattr(self, name, getattr(self.aiosqlite, name)) 

390 

391 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

392 setattr(self, name, getattr(self.sqlite, name)) 

393 

394 for name in ("Binary",): 

395 setattr(self, name, getattr(self.sqlite, name)) 

396 

397 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 

398 async_fallback = kw.pop("async_fallback", False) 

399 

400 creator_fn = kw.pop("async_creator_fn", None) 

401 if creator_fn: 

402 connection = creator_fn(*arg, **kw) 

403 else: 

404 connection = self.aiosqlite.connect(*arg, **kw) 

405 

406 # aiosqlite uses a Thread. you'll thank us later 

407 if isinstance(connection, Thread): 

408 # Connection itself was a thread in version prior to 0.22 

409 connection.daemon = True 

410 else: 

411 # in 0.22+ instead it contains a thread. 

412 connection._thread.daemon = True 

413 

414 if util.asbool(async_fallback): 

415 return AsyncAdaptFallback_aiosqlite_connection( 

416 self, 

417 await_fallback(connection), 

418 ) 

419 else: 

420 return AsyncAdapt_aiosqlite_connection( 

421 self, 

422 await_only(connection), 

423 ) 

424 

425 

426class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

427 def create_server_side_cursor(self) -> DBAPICursor: 

428 return self._dbapi_connection.cursor(server_side=True) 

429 

430 

431class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

432 driver = "aiosqlite" 

433 supports_statement_cache = True 

434 

435 is_async = True 

436 has_terminate = True 

437 

438 supports_server_side_cursors = True 

439 

440 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

441 

442 def __init__(self, **kwargs: Any): 

443 super().__init__(**kwargs) 

444 if self.dbapi and not self.dbapi.has_stop: 

445 self.has_terminate = False 

446 

447 @classmethod 

448 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 

449 return AsyncAdapt_aiosqlite_dbapi( 

450 __import__("aiosqlite"), __import__("sqlite3") 

451 ) 

452 

453 @classmethod 

454 def get_pool_class(cls, url: URL) -> type[pool.Pool]: 

455 if cls._is_url_file_db(url): 

456 return pool.AsyncAdaptedQueuePool 

457 else: 

458 return pool.StaticPool 

459 

460 def is_disconnect( 

461 self, 

462 e: DBAPIModule.Error, 

463 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

464 cursor: Optional[DBAPICursor], 

465 ) -> bool: 

466 self.dbapi = cast("DBAPIModule", self.dbapi) 

467 if isinstance( 

468 e, self.dbapi.OperationalError 

469 ) and "no active connection" in str(e): 

470 return True 

471 

472 return super().is_disconnect(e, connection, cursor) 

473 

474 def get_driver_connection( 

475 self, connection: DBAPIConnection 

476 ) -> AsyncIODBAPIConnection: 

477 return connection._connection # type: ignore[no-any-return] 

478 

479 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: 

480 dbapi_connection.terminate() 

481 

482 

483dialect = SQLiteDialect_aiosqlite