Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

219 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9r""" 

10 

11.. dialect:: sqlite+aiosqlite 

12 :name: aiosqlite 

13 :dbapi: aiosqlite 

14 :connectstring: sqlite+aiosqlite:///file_path 

15 :url: https://pypi.org/project/aiosqlite/ 

16 

17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

18running on top of pysqlite. 

19 

20aiosqlite is a wrapper around pysqlite that uses a background thread for 

21each connection. It does not actually use non-blocking IO, as SQLite 

22databases are not socket-based. However it does provide a working asyncio 

23interface that's useful for testing and prototyping purposes. 

24 

25Using a special asyncio mediation layer, the aiosqlite dialect is usable 

26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

27extension package. 

28 

29This dialect should normally be used only with the 

30:func:`_asyncio.create_async_engine` engine creation function:: 

31 

32 from sqlalchemy.ext.asyncio import create_async_engine 

33 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52A newly revised version of this important section is now available 

53at the top level of the SQLAlchemy SQLite documentation, in the section 

54:ref:`sqlite_transactions`. 

55 

56 

57.. _aiosqlite_pooling: 

58 

59Pooling Behavior 

60---------------- 

61 

62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

63based on the kind of SQLite database that's requested: 

64 

65* When a ``:memory:`` SQLite database is specified, the dialect by default 

66 will use :class:`.StaticPool`. This pool maintains a single 

67 connection, so that all access to the engine 

68 use the same ``:memory:`` database. 

69* When a file-based database is specified, the dialect will use 

70 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

71 

72 .. versionchanged:: 2.0.38 

73 

74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

76 may be used by specifying it via the 

77 :paramref:`_sa.create_engine.poolclass` parameter. 

78 

79""" # noqa 

80from __future__ import annotations 

81 

82import asyncio 

83from collections import deque 

84from functools import partial 

85from types import ModuleType 

86from typing import Any 

87from typing import cast 

88from typing import Deque 

89from typing import Iterator 

90from typing import NoReturn 

91from typing import Optional 

92from typing import Sequence 

93from typing import TYPE_CHECKING 

94from typing import Union 

95 

96from .base import SQLiteExecutionContext 

97from .pysqlite import SQLiteDialect_pysqlite 

98from ... import pool 

99from ... import util 

100from ...connectors.asyncio import AsyncAdapt_dbapi_module 

101from ...engine import AdaptedConnection 

102from ...util.concurrency import await_fallback 

103from ...util.concurrency import await_only 

104 

105if TYPE_CHECKING: 

106 from ...connectors.asyncio import AsyncIODBAPIConnection 

107 from ...connectors.asyncio import AsyncIODBAPICursor 

108 from ...engine.interfaces import _DBAPICursorDescription 

109 from ...engine.interfaces import _DBAPIMultiExecuteParams 

110 from ...engine.interfaces import _DBAPISingleExecuteParams 

111 from ...engine.interfaces import DBAPIConnection 

112 from ...engine.interfaces import DBAPICursor 

113 from ...engine.interfaces import DBAPIModule 

114 from ...engine.url import URL 

115 from ...pool.base import PoolProxiedConnection 

116 

117 

118class AsyncAdapt_aiosqlite_cursor: 

119 # TODO: base on connectors/asyncio.py 

120 # see #10415 

121 

122 __slots__ = ( 

123 "_adapt_connection", 

124 "_connection", 

125 "description", 

126 "await_", 

127 "_rows", 

128 "arraysize", 

129 "rowcount", 

130 "lastrowid", 

131 ) 

132 

133 server_side = False 

134 

135 def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection): 

136 self._adapt_connection = adapt_connection 

137 self._connection = adapt_connection._connection 

138 self.await_ = adapt_connection.await_ 

139 self.arraysize = 1 

140 self.rowcount = -1 

141 self.description: Optional[_DBAPICursorDescription] = None 

142 self._rows: Deque[Any] = deque() 

143 

144 def close(self) -> None: 

145 self._rows.clear() 

146 

147 def execute( 

148 self, 

149 operation: Any, 

150 parameters: Optional[_DBAPISingleExecuteParams] = None, 

151 ) -> Any: 

152 

153 try: 

154 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501 

155 

156 if parameters is None: 

157 self.await_(_cursor.execute(operation)) 

158 else: 

159 self.await_(_cursor.execute(operation, parameters)) 

160 

161 if _cursor.description: 

162 self.description = _cursor.description 

163 self.lastrowid = self.rowcount = -1 

164 

165 if not self.server_side: 

166 self._rows = deque(self.await_(_cursor.fetchall())) 

167 else: 

168 self.description = None 

169 self.lastrowid = _cursor.lastrowid 

170 self.rowcount = _cursor.rowcount 

171 

172 if not self.server_side: 

173 self.await_(_cursor.close()) 

174 else: 

175 self._cursor = _cursor # type: ignore[misc] 

176 except Exception as error: 

177 self._adapt_connection._handle_exception(error) 

178 

179 def executemany( 

180 self, 

181 operation: Any, 

182 seq_of_parameters: _DBAPIMultiExecuteParams, 

183 ) -> Any: 

184 try: 

185 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501 

186 self.await_(_cursor.executemany(operation, seq_of_parameters)) 

187 self.description = None 

188 self.lastrowid = _cursor.lastrowid 

189 self.rowcount = _cursor.rowcount 

190 self.await_(_cursor.close()) 

191 except Exception as error: 

192 self._adapt_connection._handle_exception(error) 

193 

194 def setinputsizes(self, *inputsizes: Any) -> None: 

195 pass 

196 

197 def __iter__(self) -> Iterator[Any]: 

198 while self._rows: 

199 yield self._rows.popleft() 

200 

201 def fetchone(self) -> Optional[Any]: 

202 if self._rows: 

203 return self._rows.popleft() 

204 else: 

205 return None 

206 

207 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: 

208 if size is None: 

209 size = self.arraysize 

210 

211 rr = self._rows 

212 return [rr.popleft() for _ in range(min(size, len(rr)))] 

213 

214 def fetchall(self) -> Sequence[Any]: 

215 retval = list(self._rows) 

216 self._rows.clear() 

217 return retval 

218 

219 

220class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): 

221 # TODO: base on connectors/asyncio.py 

222 # see #10415 

223 __slots__ = "_cursor" 

224 

225 server_side = True 

226 

227 def __init__(self, *arg: Any, **kw: Any) -> None: 

228 super().__init__(*arg, **kw) 

229 self._cursor: Optional[AsyncIODBAPICursor] = None 

230 

231 def close(self) -> None: 

232 if self._cursor is not None: 

233 self.await_(self._cursor.close()) 

234 self._cursor = None 

235 

236 def fetchone(self) -> Optional[Any]: 

237 assert self._cursor is not None 

238 return self.await_(self._cursor.fetchone()) 

239 

240 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: 

241 assert self._cursor is not None 

242 if size is None: 

243 size = self.arraysize 

244 return self.await_(self._cursor.fetchmany(size=size)) 

245 

246 def fetchall(self) -> Sequence[Any]: 

247 assert self._cursor is not None 

248 return self.await_(self._cursor.fetchall()) 

249 

250 

251class AsyncAdapt_aiosqlite_connection(AdaptedConnection): 

252 await_ = staticmethod(await_only) 

253 __slots__ = ("dbapi",) 

254 

255 def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None: 

256 self.dbapi = dbapi 

257 self._connection = connection 

258 

259 @property 

260 def isolation_level(self) -> Optional[str]: 

261 return cast(str, self._connection.isolation_level) 

262 

263 @isolation_level.setter 

264 def isolation_level(self, value: Optional[str]) -> None: 

265 # aiosqlite's isolation_level setter works outside the Thread 

266 # that it's supposed to, necessitating setting check_same_thread=False. 

267 # for improved stability, we instead invent our own awaitable version 

268 # using aiosqlite's async queue directly. 

269 

270 def set_iso( 

271 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 

272 ) -> None: 

273 connection.isolation_level = value 

274 

275 function = partial(set_iso, self._connection._conn, value) 

276 future = asyncio.get_event_loop().create_future() 

277 

278 self._connection._tx.put_nowait((future, function)) 

279 

280 try: 

281 self.await_(future) 

282 except Exception as error: 

283 self._handle_exception(error) 

284 

285 def create_function(self, *args: Any, **kw: Any) -> None: 

286 try: 

287 self.await_(self._connection.create_function(*args, **kw)) 

288 except Exception as error: 

289 self._handle_exception(error) 

290 

291 def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor: 

292 if server_side: 

293 return AsyncAdapt_aiosqlite_ss_cursor(self) 

294 else: 

295 return AsyncAdapt_aiosqlite_cursor(self) 

296 

297 def execute(self, *args: Any, **kw: Any) -> Any: 

298 return self.await_(self._connection.execute(*args, **kw)) 

299 

300 def rollback(self) -> None: 

301 try: 

302 self.await_(self._connection.rollback()) 

303 except Exception as error: 

304 self._handle_exception(error) 

305 

306 def commit(self) -> None: 

307 try: 

308 self.await_(self._connection.commit()) 

309 except Exception as error: 

310 self._handle_exception(error) 

311 

312 def close(self) -> None: 

313 try: 

314 self.await_(self._connection.close()) 

315 except ValueError: 

316 # this is undocumented for aiosqlite, that ValueError 

317 # was raised if .close() was called more than once, which is 

318 # both not customary for DBAPI and is also not a DBAPI.Error 

319 # exception. This is now fixed in aiosqlite via my PR 

320 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

321 # assured this will not become some other kind of exception, 

322 # since it doesn't raise anymore. 

323 

324 pass 

325 except Exception as error: 

326 self._handle_exception(error) 

327 

328 def _handle_exception(self, error: Exception) -> NoReturn: 

329 if ( 

330 isinstance(error, ValueError) 

331 and error.args[0] == "no active connection" 

332 ): 

333 raise self.dbapi.sqlite.OperationalError( 

334 "no active connection" 

335 ) from error 

336 else: 

337 raise error 

338 

339 

340class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): 

341 __slots__ = () 

342 

343 await_ = staticmethod(await_fallback) 

344 

345 

346class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 

347 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 

348 self.aiosqlite = aiosqlite 

349 self.sqlite = sqlite 

350 self.paramstyle = "qmark" 

351 self._init_dbapi_attributes() 

352 

353 def _init_dbapi_attributes(self) -> None: 

354 for name in ( 

355 "DatabaseError", 

356 "Error", 

357 "IntegrityError", 

358 "NotSupportedError", 

359 "OperationalError", 

360 "ProgrammingError", 

361 "sqlite_version", 

362 "sqlite_version_info", 

363 ): 

364 setattr(self, name, getattr(self.aiosqlite, name)) 

365 

366 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

367 setattr(self, name, getattr(self.sqlite, name)) 

368 

369 for name in ("Binary",): 

370 setattr(self, name, getattr(self.sqlite, name)) 

371 

372 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 

373 async_fallback = kw.pop("async_fallback", False) 

374 

375 creator_fn = kw.pop("async_creator_fn", None) 

376 if creator_fn: 

377 connection = creator_fn(*arg, **kw) 

378 else: 

379 connection = self.aiosqlite.connect(*arg, **kw) 

380 # it's a Thread. you'll thank us later 

381 connection.daemon = True 

382 

383 if util.asbool(async_fallback): 

384 return AsyncAdaptFallback_aiosqlite_connection( 

385 self, 

386 await_fallback(connection), 

387 ) 

388 else: 

389 return AsyncAdapt_aiosqlite_connection( 

390 self, 

391 await_only(connection), 

392 ) 

393 

394 

395class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

396 def create_server_side_cursor(self) -> DBAPICursor: 

397 return self._dbapi_connection.cursor(server_side=True) 

398 

399 

400class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

401 driver = "aiosqlite" 

402 supports_statement_cache = True 

403 

404 is_async = True 

405 

406 supports_server_side_cursors = True 

407 

408 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

409 

410 @classmethod 

411 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 

412 return AsyncAdapt_aiosqlite_dbapi( 

413 __import__("aiosqlite"), __import__("sqlite3") 

414 ) 

415 

416 @classmethod 

417 def get_pool_class(cls, url: URL) -> type[pool.Pool]: 

418 if cls._is_url_file_db(url): 

419 return pool.AsyncAdaptedQueuePool 

420 else: 

421 return pool.StaticPool 

422 

423 def is_disconnect( 

424 self, 

425 e: DBAPIModule.Error, 

426 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

427 cursor: Optional[DBAPICursor], 

428 ) -> bool: 

429 self.dbapi = cast("DBAPIModule", self.dbapi) 

430 if isinstance( 

431 e, self.dbapi.OperationalError 

432 ) and "no active connection" in str(e): 

433 return True 

434 

435 return super().is_disconnect(e, connection, cursor) 

436 

437 def get_driver_connection( 

438 self, connection: DBAPIConnection 

439 ) -> AsyncIODBAPIConnection: 

440 return connection._connection # type: ignore[no-any-return] 

441 

442 

443dialect = SQLiteDialect_aiosqlite