Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

221 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9r""" 

10 

11.. dialect:: sqlite+aiosqlite 

12 :name: aiosqlite 

13 :dbapi: aiosqlite 

14 :connectstring: sqlite+aiosqlite:///file_path 

15 :url: https://pypi.org/project/aiosqlite/ 

16 

17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

18running on top of pysqlite. 

19 

20aiosqlite is a wrapper around pysqlite that uses a background thread for 

21each connection. It does not actually use non-blocking IO, as SQLite 

22databases are not socket-based. However it does provide a working asyncio 

23interface that's useful for testing and prototyping purposes. 

24 

25Using a special asyncio mediation layer, the aiosqlite dialect is usable 

26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

27extension package. 

28 

29This dialect should normally be used only with the 

30:func:`_asyncio.create_async_engine` engine creation function:: 

31 

32 from sqlalchemy.ext.asyncio import create_async_engine 

33 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52A newly revised version of this important section is now available 

53at the top level of the SQLAlchemy SQLite documentation, in the section 

54:ref:`sqlite_transactions`. 

55 

56 

57.. _aiosqlite_pooling: 

58 

59Pooling Behavior 

60---------------- 

61 

62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

63based on the kind of SQLite database that's requested: 

64 

65* When a ``:memory:`` SQLite database is specified, the dialect by default 

66 will use :class:`.StaticPool`. This pool maintains a single 

67 connection, so that all access to the engine 

68 use the same ``:memory:`` database. 

69* When a file-based database is specified, the dialect will use 

70 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

71 

72 .. versionchanged:: 2.0.38 

73 

74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

76 may be used by specifying it via the 

77 :paramref:`_sa.create_engine.poolclass` parameter. 

78 

79""" # noqa 

80from __future__ import annotations 

81 

82import asyncio 

83from collections import deque 

84from functools import partial 

85from types import ModuleType 

86from typing import Any 

87from typing import cast 

88from typing import Deque 

89from typing import Iterator 

90from typing import NoReturn 

91from typing import Optional 

92from typing import Sequence 

93from typing import TYPE_CHECKING 

94from typing import Union 

95 

96from .base import SQLiteExecutionContext 

97from .pysqlite import SQLiteDialect_pysqlite 

98from ... import pool 

99from ... import util 

100from ...connectors.asyncio import AsyncAdapt_dbapi_module 

101from ...engine import AdaptedConnection 

102from ...util.concurrency import await_fallback 

103from ...util.concurrency import await_only 

104 

105if TYPE_CHECKING: 

106 from ...connectors.asyncio import AsyncIODBAPIConnection 

107 from ...connectors.asyncio import AsyncIODBAPICursor 

108 from ...engine.interfaces import _DBAPICursorDescription 

109 from ...engine.interfaces import _DBAPIMultiExecuteParams 

110 from ...engine.interfaces import _DBAPISingleExecuteParams 

111 from ...engine.interfaces import DBAPIConnection 

112 from ...engine.interfaces import DBAPICursor 

113 from ...engine.interfaces import DBAPIModule 

114 from ...engine.url import URL 

115 from ...pool.base import PoolProxiedConnection 

116 

117 

118class AsyncAdapt_aiosqlite_cursor: 

119 # TODO: base on connectors/asyncio.py 

120 # see #10415 

121 

122 __slots__ = ( 

123 "_adapt_connection", 

124 "_connection", 

125 "description", 

126 "await_", 

127 "_rows", 

128 "arraysize", 

129 "rowcount", 

130 "lastrowid", 

131 ) 

132 

133 server_side = False 

134 

135 def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection): 

136 self._adapt_connection = adapt_connection 

137 self._connection = adapt_connection._connection 

138 self.await_ = adapt_connection.await_ 

139 self.arraysize = 1 

140 self.rowcount = -1 

141 self.description: Optional[_DBAPICursorDescription] = None 

142 self._rows: Deque[Any] = deque() 

143 

144 async def _async_soft_close(self) -> None: 

145 return 

146 

147 def close(self) -> None: 

148 self._rows.clear() 

149 

150 def execute( 

151 self, 

152 operation: Any, 

153 parameters: Optional[_DBAPISingleExecuteParams] = None, 

154 ) -> Any: 

155 

156 try: 

157 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501 

158 

159 if parameters is None: 

160 self.await_(_cursor.execute(operation)) 

161 else: 

162 self.await_(_cursor.execute(operation, parameters)) 

163 

164 if _cursor.description: 

165 self.description = _cursor.description 

166 self.lastrowid = self.rowcount = -1 

167 

168 if not self.server_side: 

169 self._rows = deque(self.await_(_cursor.fetchall())) 

170 else: 

171 self.description = None 

172 self.lastrowid = _cursor.lastrowid 

173 self.rowcount = _cursor.rowcount 

174 

175 if not self.server_side: 

176 self.await_(_cursor.close()) 

177 else: 

178 self._cursor = _cursor # type: ignore[misc] 

179 except Exception as error: 

180 self._adapt_connection._handle_exception(error) 

181 

182 def executemany( 

183 self, 

184 operation: Any, 

185 seq_of_parameters: _DBAPIMultiExecuteParams, 

186 ) -> Any: 

187 try: 

188 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501 

189 self.await_(_cursor.executemany(operation, seq_of_parameters)) 

190 self.description = None 

191 self.lastrowid = _cursor.lastrowid 

192 self.rowcount = _cursor.rowcount 

193 self.await_(_cursor.close()) 

194 except Exception as error: 

195 self._adapt_connection._handle_exception(error) 

196 

197 def setinputsizes(self, *inputsizes: Any) -> None: 

198 pass 

199 

200 def __iter__(self) -> Iterator[Any]: 

201 while self._rows: 

202 yield self._rows.popleft() 

203 

204 def fetchone(self) -> Optional[Any]: 

205 if self._rows: 

206 return self._rows.popleft() 

207 else: 

208 return None 

209 

210 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: 

211 if size is None: 

212 size = self.arraysize 

213 

214 rr = self._rows 

215 return [rr.popleft() for _ in range(min(size, len(rr)))] 

216 

217 def fetchall(self) -> Sequence[Any]: 

218 retval = list(self._rows) 

219 self._rows.clear() 

220 return retval 

221 

222 

223class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): 

224 # TODO: base on connectors/asyncio.py 

225 # see #10415 

226 __slots__ = "_cursor" 

227 

228 server_side = True 

229 

230 def __init__(self, *arg: Any, **kw: Any) -> None: 

231 super().__init__(*arg, **kw) 

232 self._cursor: Optional[AsyncIODBAPICursor] = None 

233 

234 def close(self) -> None: 

235 if self._cursor is not None: 

236 self.await_(self._cursor.close()) 

237 self._cursor = None 

238 

239 def fetchone(self) -> Optional[Any]: 

240 assert self._cursor is not None 

241 return self.await_(self._cursor.fetchone()) 

242 

243 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: 

244 assert self._cursor is not None 

245 if size is None: 

246 size = self.arraysize 

247 return self.await_(self._cursor.fetchmany(size=size)) 

248 

249 def fetchall(self) -> Sequence[Any]: 

250 assert self._cursor is not None 

251 return self.await_(self._cursor.fetchall()) 

252 

253 

254class AsyncAdapt_aiosqlite_connection(AdaptedConnection): 

255 await_ = staticmethod(await_only) 

256 __slots__ = ("dbapi",) 

257 

258 def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None: 

259 self.dbapi = dbapi 

260 self._connection = connection 

261 

262 @property 

263 def isolation_level(self) -> Optional[str]: 

264 return cast(str, self._connection.isolation_level) 

265 

266 @isolation_level.setter 

267 def isolation_level(self, value: Optional[str]) -> None: 

268 # aiosqlite's isolation_level setter works outside the Thread 

269 # that it's supposed to, necessitating setting check_same_thread=False. 

270 # for improved stability, we instead invent our own awaitable version 

271 # using aiosqlite's async queue directly. 

272 

273 def set_iso( 

274 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 

275 ) -> None: 

276 connection.isolation_level = value 

277 

278 function = partial(set_iso, self._connection._conn, value) 

279 future = asyncio.get_event_loop().create_future() 

280 

281 self._connection._tx.put_nowait((future, function)) 

282 

283 try: 

284 self.await_(future) 

285 except Exception as error: 

286 self._handle_exception(error) 

287 

288 def create_function(self, *args: Any, **kw: Any) -> None: 

289 try: 

290 self.await_(self._connection.create_function(*args, **kw)) 

291 except Exception as error: 

292 self._handle_exception(error) 

293 

294 def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor: 

295 if server_side: 

296 return AsyncAdapt_aiosqlite_ss_cursor(self) 

297 else: 

298 return AsyncAdapt_aiosqlite_cursor(self) 

299 

300 def execute(self, *args: Any, **kw: Any) -> Any: 

301 return self.await_(self._connection.execute(*args, **kw)) 

302 

303 def rollback(self) -> None: 

304 try: 

305 self.await_(self._connection.rollback()) 

306 except Exception as error: 

307 self._handle_exception(error) 

308 

309 def commit(self) -> None: 

310 try: 

311 self.await_(self._connection.commit()) 

312 except Exception as error: 

313 self._handle_exception(error) 

314 

315 def close(self) -> None: 

316 try: 

317 self.await_(self._connection.close()) 

318 except ValueError: 

319 # this is undocumented for aiosqlite, that ValueError 

320 # was raised if .close() was called more than once, which is 

321 # both not customary for DBAPI and is also not a DBAPI.Error 

322 # exception. This is now fixed in aiosqlite via my PR 

323 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

324 # assured this will not become some other kind of exception, 

325 # since it doesn't raise anymore. 

326 

327 pass 

328 except Exception as error: 

329 self._handle_exception(error) 

330 

331 def _handle_exception(self, error: Exception) -> NoReturn: 

332 if ( 

333 isinstance(error, ValueError) 

334 and error.args[0] == "no active connection" 

335 ): 

336 raise self.dbapi.sqlite.OperationalError( 

337 "no active connection" 

338 ) from error 

339 else: 

340 raise error 

341 

342 

343class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): 

344 __slots__ = () 

345 

346 await_ = staticmethod(await_fallback) 

347 

348 

349class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 

350 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 

351 self.aiosqlite = aiosqlite 

352 self.sqlite = sqlite 

353 self.paramstyle = "qmark" 

354 self._init_dbapi_attributes() 

355 

356 def _init_dbapi_attributes(self) -> None: 

357 for name in ( 

358 "DatabaseError", 

359 "Error", 

360 "IntegrityError", 

361 "NotSupportedError", 

362 "OperationalError", 

363 "ProgrammingError", 

364 "sqlite_version", 

365 "sqlite_version_info", 

366 ): 

367 setattr(self, name, getattr(self.aiosqlite, name)) 

368 

369 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

370 setattr(self, name, getattr(self.sqlite, name)) 

371 

372 for name in ("Binary",): 

373 setattr(self, name, getattr(self.sqlite, name)) 

374 

375 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 

376 async_fallback = kw.pop("async_fallback", False) 

377 

378 creator_fn = kw.pop("async_creator_fn", None) 

379 if creator_fn: 

380 connection = creator_fn(*arg, **kw) 

381 else: 

382 connection = self.aiosqlite.connect(*arg, **kw) 

383 # it's a Thread. you'll thank us later 

384 connection.daemon = True 

385 

386 if util.asbool(async_fallback): 

387 return AsyncAdaptFallback_aiosqlite_connection( 

388 self, 

389 await_fallback(connection), 

390 ) 

391 else: 

392 return AsyncAdapt_aiosqlite_connection( 

393 self, 

394 await_only(connection), 

395 ) 

396 

397 

398class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

399 def create_server_side_cursor(self) -> DBAPICursor: 

400 return self._dbapi_connection.cursor(server_side=True) 

401 

402 

403class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

404 driver = "aiosqlite" 

405 supports_statement_cache = True 

406 

407 is_async = True 

408 

409 supports_server_side_cursors = True 

410 

411 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

412 

413 @classmethod 

414 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 

415 return AsyncAdapt_aiosqlite_dbapi( 

416 __import__("aiosqlite"), __import__("sqlite3") 

417 ) 

418 

419 @classmethod 

420 def get_pool_class(cls, url: URL) -> type[pool.Pool]: 

421 if cls._is_url_file_db(url): 

422 return pool.AsyncAdaptedQueuePool 

423 else: 

424 return pool.StaticPool 

425 

426 def is_disconnect( 

427 self, 

428 e: DBAPIModule.Error, 

429 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

430 cursor: Optional[DBAPICursor], 

431 ) -> bool: 

432 self.dbapi = cast("DBAPIModule", self.dbapi) 

433 if isinstance( 

434 e, self.dbapi.OperationalError 

435 ) and "no active connection" in str(e): 

436 return True 

437 

438 return super().is_disconnect(e, connection, cursor) 

439 

440 def get_driver_connection( 

441 self, connection: DBAPIConnection 

442 ) -> AsyncIODBAPIConnection: 

443 return connection._connection # type: ignore[no-any-return] 

444 

445 

446dialect = SQLiteDialect_aiosqlite