Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

241 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9r""" 

10 

11.. dialect:: sqlite+aiosqlite 

12 :name: aiosqlite 

13 :dbapi: aiosqlite 

14 :connectstring: sqlite+aiosqlite:///file_path 

15 :url: https://pypi.org/project/aiosqlite/ 

16 

17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

18running on top of pysqlite. 

19 

20aiosqlite is a wrapper around pysqlite that uses a background thread for 

21each connection. It does not actually use non-blocking IO, as SQLite 

22databases are not socket-based. However it does provide a working asyncio 

23interface that's useful for testing and prototyping purposes. 

24 

25Using a special asyncio mediation layer, the aiosqlite dialect is usable 

26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

27extension package. 

28 

29This dialect should normally be used only with the 

30:func:`_asyncio.create_async_engine` engine creation function:: 

31 

32 from sqlalchemy.ext.asyncio import create_async_engine 

33 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52A newly revised version of this important section is now available 

53at the top level of the SQLAlchemy SQLite documentation, in the section 

54:ref:`sqlite_transactions`. 

55 

56 

57.. _aiosqlite_pooling: 

58 

59Pooling Behavior 

60---------------- 

61 

62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

63based on the kind of SQLite database that's requested: 

64 

65* When a ``:memory:`` SQLite database is specified, the dialect by default 

66 will use :class:`.StaticPool`. This pool maintains a single 

67 connection, so that all access to the engine 

68 use the same ``:memory:`` database. 

69* When a file-based database is specified, the dialect will use 

70 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

71 

72 .. versionchanged:: 2.0.38 

73 

74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

76 may be used by specifying it via the 

77 :paramref:`_sa.create_engine.poolclass` parameter. 

78 

79""" # noqa 

80from __future__ import annotations 

81 

82import asyncio 

83from collections import deque 

84from functools import partial 

85from threading import Thread 

86from types import ModuleType 

87from typing import Any 

88from typing import cast 

89from typing import Deque 

90from typing import Iterator 

91from typing import NoReturn 

92from typing import Optional 

93from typing import Sequence 

94from typing import TYPE_CHECKING 

95from typing import Union 

96 

97from .base import SQLiteExecutionContext 

98from .pysqlite import SQLiteDialect_pysqlite 

99from ... import pool 

100from ... import util 

101from ...connectors.asyncio import AsyncAdapt_dbapi_module 

102from ...connectors.asyncio import AsyncAdapt_terminate 

103from ...engine import AdaptedConnection 

104from ...util.concurrency import await_fallback 

105from ...util.concurrency import await_only 

106 

107if TYPE_CHECKING: 

108 from ...connectors.asyncio import AsyncIODBAPIConnection 

109 from ...connectors.asyncio import AsyncIODBAPICursor 

110 from ...engine.interfaces import _DBAPICursorDescription 

111 from ...engine.interfaces import _DBAPIMultiExecuteParams 

112 from ...engine.interfaces import _DBAPISingleExecuteParams 

113 from ...engine.interfaces import DBAPIConnection 

114 from ...engine.interfaces import DBAPICursor 

115 from ...engine.interfaces import DBAPIModule 

116 from ...engine.url import URL 

117 from ...pool.base import PoolProxiedConnection 

118 

119 

120class AsyncAdapt_aiosqlite_cursor: 

121 # TODO: base on connectors/asyncio.py 

122 # see #10415 

123 

124 __slots__ = ( 

125 "_adapt_connection", 

126 "_connection", 

127 "description", 

128 "await_", 

129 "_rows", 

130 "arraysize", 

131 "rowcount", 

132 "lastrowid", 

133 ) 

134 

135 server_side = False 

136 

137 def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection): 

138 self._adapt_connection = adapt_connection 

139 self._connection = adapt_connection._connection 

140 self.await_ = adapt_connection.await_ 

141 self.arraysize = 1 

142 self.rowcount = -1 

143 self.description: Optional[_DBAPICursorDescription] = None 

144 self._rows: Deque[Any] = deque() 

145 

146 async def _async_soft_close(self) -> None: 

147 return 

148 

149 def close(self) -> None: 

150 self._rows.clear() 

151 

152 def execute( 

153 self, 

154 operation: Any, 

155 parameters: Optional[_DBAPISingleExecuteParams] = None, 

156 ) -> Any: 

157 

158 try: 

159 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501 

160 

161 if parameters is None: 

162 self.await_(_cursor.execute(operation)) 

163 else: 

164 self.await_(_cursor.execute(operation, parameters)) 

165 

166 if _cursor.description: 

167 self.description = _cursor.description 

168 self.lastrowid = self.rowcount = -1 

169 

170 if not self.server_side: 

171 self._rows = deque(self.await_(_cursor.fetchall())) 

172 else: 

173 self.description = None 

174 self.lastrowid = _cursor.lastrowid 

175 self.rowcount = _cursor.rowcount 

176 

177 if not self.server_side: 

178 self.await_(_cursor.close()) 

179 else: 

180 self._cursor = _cursor # type: ignore[misc] 

181 except Exception as error: 

182 self._adapt_connection._handle_exception(error) 

183 

184 def executemany( 

185 self, 

186 operation: Any, 

187 seq_of_parameters: _DBAPIMultiExecuteParams, 

188 ) -> Any: 

189 try: 

190 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501 

191 self.await_(_cursor.executemany(operation, seq_of_parameters)) 

192 self.description = None 

193 self.lastrowid = _cursor.lastrowid 

194 self.rowcount = _cursor.rowcount 

195 self.await_(_cursor.close()) 

196 except Exception as error: 

197 self._adapt_connection._handle_exception(error) 

198 

199 def setinputsizes(self, *inputsizes: Any) -> None: 

200 pass 

201 

202 def __iter__(self) -> Iterator[Any]: 

203 while self._rows: 

204 yield self._rows.popleft() 

205 

206 def fetchone(self) -> Optional[Any]: 

207 if self._rows: 

208 return self._rows.popleft() 

209 else: 

210 return None 

211 

212 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: 

213 if size is None: 

214 size = self.arraysize 

215 

216 rr = self._rows 

217 return [rr.popleft() for _ in range(min(size, len(rr)))] 

218 

219 def fetchall(self) -> Sequence[Any]: 

220 retval = list(self._rows) 

221 self._rows.clear() 

222 return retval 

223 

224 

225class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): 

226 # TODO: base on connectors/asyncio.py 

227 # see #10415 

228 __slots__ = "_cursor" 

229 

230 server_side = True 

231 

232 def __init__(self, *arg: Any, **kw: Any) -> None: 

233 super().__init__(*arg, **kw) 

234 self._cursor: Optional[AsyncIODBAPICursor] = None 

235 

236 def close(self) -> None: 

237 if self._cursor is not None: 

238 self.await_(self._cursor.close()) 

239 self._cursor = None 

240 

241 def fetchone(self) -> Optional[Any]: 

242 assert self._cursor is not None 

243 return self.await_(self._cursor.fetchone()) 

244 

245 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: 

246 assert self._cursor is not None 

247 if size is None: 

248 size = self.arraysize 

249 return self.await_(self._cursor.fetchmany(size=size)) 

250 

251 def fetchall(self) -> Sequence[Any]: 

252 assert self._cursor is not None 

253 return self.await_(self._cursor.fetchall()) 

254 

255 

256class AsyncAdapt_aiosqlite_connection(AsyncAdapt_terminate, AdaptedConnection): 

257 await_ = staticmethod(await_only) 

258 __slots__ = ("dbapi",) 

259 

260 def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None: 

261 self.dbapi = dbapi 

262 self._connection = connection 

263 

264 @property 

265 def isolation_level(self) -> Optional[str]: 

266 return cast(str, self._connection.isolation_level) 

267 

268 @isolation_level.setter 

269 def isolation_level(self, value: Optional[str]) -> None: 

270 # aiosqlite's isolation_level setter works outside the Thread 

271 # that it's supposed to, necessitating setting check_same_thread=False. 

272 # for improved stability, we instead invent our own awaitable version 

273 # using aiosqlite's async queue directly. 

274 

275 def set_iso( 

276 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 

277 ) -> None: 

278 connection.isolation_level = value 

279 

280 function = partial(set_iso, self._connection._conn, value) 

281 future = asyncio.get_event_loop().create_future() 

282 

283 self._connection._tx.put_nowait((future, function)) 

284 

285 try: 

286 self.await_(future) 

287 except Exception as error: 

288 self._handle_exception(error) 

289 

290 def create_function(self, *args: Any, **kw: Any) -> None: 

291 try: 

292 self.await_(self._connection.create_function(*args, **kw)) 

293 except Exception as error: 

294 self._handle_exception(error) 

295 

296 def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor: 

297 if server_side: 

298 return AsyncAdapt_aiosqlite_ss_cursor(self) 

299 else: 

300 return AsyncAdapt_aiosqlite_cursor(self) 

301 

302 def execute(self, *args: Any, **kw: Any) -> Any: 

303 return self.await_(self._connection.execute(*args, **kw)) 

304 

305 def rollback(self) -> None: 

306 try: 

307 self.await_(self._connection.rollback()) 

308 except Exception as error: 

309 self._handle_exception(error) 

310 

311 def commit(self) -> None: 

312 try: 

313 self.await_(self._connection.commit()) 

314 except Exception as error: 

315 self._handle_exception(error) 

316 

317 def close(self) -> None: 

318 try: 

319 self.await_(self._connection.close()) 

320 except ValueError: 

321 # this is undocumented for aiosqlite, that ValueError 

322 # was raised if .close() was called more than once, which is 

323 # both not customary for DBAPI and is also not a DBAPI.Error 

324 # exception. This is now fixed in aiosqlite via my PR 

325 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

326 # assured this will not become some other kind of exception, 

327 # since it doesn't raise anymore. 

328 

329 pass 

330 except Exception as error: 

331 self._handle_exception(error) 

332 

333 def _handle_exception(self, error: Exception) -> NoReturn: 

334 if ( 

335 isinstance(error, ValueError) 

336 and error.args[0] == "no active connection" 

337 ): 

338 raise self.dbapi.sqlite.OperationalError( 

339 "no active connection" 

340 ) from error 

341 else: 

342 raise error 

343 

344 async def _terminate_graceful_close(self) -> None: 

345 """Try to close connection gracefully""" 

346 await self._connection.close() 

347 

348 def _terminate_force_close(self) -> None: 

349 """Terminate the connection""" 

350 

351 # this was added in aiosqlite 0.22.1. if stop() is not present, 

352 # the dialect should indicate has_terminate=False 

353 try: 

354 meth = self._connection.stop 

355 except AttributeError as ae: 

356 raise NotImplementedError( 

357 "terminate_force_close() not implemented by this DBAPI shim" 

358 ) from ae 

359 else: 

360 meth() 

361 

362 

363class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): 

364 __slots__ = () 

365 

366 await_ = staticmethod(await_fallback) 

367 

368 

369class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 

370 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 

371 self.aiosqlite = aiosqlite 

372 self.sqlite = sqlite 

373 self.paramstyle = "qmark" 

374 self.has_stop = hasattr(aiosqlite.Connection, "stop") 

375 self._init_dbapi_attributes() 

376 

377 def _init_dbapi_attributes(self) -> None: 

378 for name in ( 

379 "DatabaseError", 

380 "Error", 

381 "IntegrityError", 

382 "NotSupportedError", 

383 "OperationalError", 

384 "ProgrammingError", 

385 "sqlite_version", 

386 "sqlite_version_info", 

387 ): 

388 setattr(self, name, getattr(self.aiosqlite, name)) 

389 

390 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

391 setattr(self, name, getattr(self.sqlite, name)) 

392 

393 for name in ("Binary",): 

394 setattr(self, name, getattr(self.sqlite, name)) 

395 

396 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 

397 async_fallback = kw.pop("async_fallback", False) 

398 

399 creator_fn = kw.pop("async_creator_fn", None) 

400 if creator_fn: 

401 connection = creator_fn(*arg, **kw) 

402 else: 

403 connection = self.aiosqlite.connect(*arg, **kw) 

404 

405 # aiosqlite uses a Thread. you'll thank us later 

406 if isinstance(connection, Thread): 

407 # Connection itself was a thread in version prior to 0.22 

408 connection.daemon = True 

409 else: 

410 # in 0.22+ instead it contains a thread. 

411 connection._thread.daemon = True 

412 

413 if util.asbool(async_fallback): 

414 return AsyncAdaptFallback_aiosqlite_connection( 

415 self, 

416 await_fallback(connection), 

417 ) 

418 else: 

419 return AsyncAdapt_aiosqlite_connection( 

420 self, 

421 await_only(connection), 

422 ) 

423 

424 

425class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

426 def create_server_side_cursor(self) -> DBAPICursor: 

427 return self._dbapi_connection.cursor(server_side=True) 

428 

429 

430class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

431 driver = "aiosqlite" 

432 supports_statement_cache = True 

433 

434 is_async = True 

435 has_terminate = True 

436 

437 supports_server_side_cursors = True 

438 

439 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

440 

441 def __init__(self, **kwargs: Any): 

442 super().__init__(**kwargs) 

443 if self.dbapi and not self.dbapi.has_stop: 

444 self.has_terminate = False 

445 

446 @classmethod 

447 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 

448 return AsyncAdapt_aiosqlite_dbapi( 

449 __import__("aiosqlite"), __import__("sqlite3") 

450 ) 

451 

452 @classmethod 

453 def get_pool_class(cls, url: URL) -> type[pool.Pool]: 

454 if cls._is_url_file_db(url): 

455 return pool.AsyncAdaptedQueuePool 

456 else: 

457 return pool.StaticPool 

458 

459 def is_disconnect( 

460 self, 

461 e: DBAPIModule.Error, 

462 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

463 cursor: Optional[DBAPICursor], 

464 ) -> bool: 

465 self.dbapi = cast("DBAPIModule", self.dbapi) 

466 if isinstance( 

467 e, self.dbapi.OperationalError 

468 ) and "no active connection" in str(e): 

469 return True 

470 

471 return super().is_disconnect(e, connection, cursor) 

472 

473 def get_driver_connection( 

474 self, connection: DBAPIConnection 

475 ) -> AsyncIODBAPIConnection: 

476 return connection._connection # type: ignore[no-any-return] 

477 

478 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: 

479 dbapi_connection.terminate() 

480 

481 

482dialect = SQLiteDialect_aiosqlite