Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

826 statements  

1# dialects/sqlite/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r''' 

11.. dialect:: sqlite 

12 :name: SQLite 

13 :normal_support: 3.12+ 

14 :best_effort: 3.7.16+ 

15 

16.. _sqlite_datetime: 

17 

18Date and Time Types 

19------------------- 

20 

21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does 

22not provide out of the box functionality for translating values between Python 

23`datetime` objects and a SQLite-supported format. SQLAlchemy's own 

24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting 

25and parsing functionality when SQLite is used. The implementation classes are 

26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. 

27These types represent dates and times as ISO formatted strings, which also 

28nicely support ordering. There's no reliance on typical "libc" internals for 

29these functions so historical dates are fully supported. 

30 

31Ensuring Text affinity 

32^^^^^^^^^^^^^^^^^^^^^^ 

33 

34The DDL rendered for these types is the standard ``DATE``, ``TIME`` 

35and ``DATETIME`` indicators. However, custom storage formats can also be 

36applied to these types. When the 

37storage format is detected as containing no alpha characters, the DDL for 

38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, 

39so that the column continues to have textual affinity. 

40 

41.. seealso:: 

42 

43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - 

44 in the SQLite documentation 

45 

46.. _sqlite_autoincrement: 

47 

48SQLite Auto Incrementing Behavior 

49---------------------------------- 

50 

51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html 

52 

53Key concepts: 

54 

55* SQLite has an implicit "auto increment" feature that takes place for any 

56 non-composite primary-key column that is specifically created using 

57 "INTEGER PRIMARY KEY" for the type + primary key. 

58 

59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** 

60 equivalent to the implicit autoincrement feature; this keyword is not 

61 recommended for general use. SQLAlchemy does not render this keyword 

62 unless a special SQLite-specific directive is used (see below). However, 

63 it still requires that the column's type is named "INTEGER". 

64 

65Using the AUTOINCREMENT Keyword 

66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

67 

68To specifically render the AUTOINCREMENT keyword on the primary key column 

69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table 

70construct:: 

71 

72 Table( 

73 "sometable", 

74 metadata, 

75 Column("id", Integer, primary_key=True), 

76 sqlite_autoincrement=True, 

77 ) 

78 

79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER 

80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

81 

82SQLite's typing model is based on naming conventions. Among other things, this 

83means that any type name which contains the substring ``"INT"`` will be 

84determined to be of "integer affinity". A type named ``"BIGINT"``, 

85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be 

86of "integer" affinity. However, **the SQLite autoincrement feature, whether 

87implicitly or explicitly enabled, requires that the name of the column's type 

88is exactly the string "INTEGER"**. Therefore, if an application uses a type 

89like :class:`.BigInteger` for a primary key, on SQLite this type will need to 

90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE 

91TABLE`` statement in order for the autoincrement behavior to be available. 

92 

93One approach to achieve this is to use :class:`.Integer` on SQLite 

94only using :meth:`.TypeEngine.with_variant`:: 

95 

96 table = Table( 

97 "my_table", 

98 metadata, 

99 Column( 

100 "id", 

101 BigInteger().with_variant(Integer, "sqlite"), 

102 primary_key=True, 

103 ), 

104 ) 

105 

106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL 

107name to be ``INTEGER`` when compiled against SQLite:: 

108 

109 from sqlalchemy import BigInteger 

110 from sqlalchemy.ext.compiler import compiles 

111 

112 

113 class SLBigInteger(BigInteger): 

114 pass 

115 

116 

117 @compiles(SLBigInteger, "sqlite") 

118 def bi_c(element, compiler, **kw): 

119 return "INTEGER" 

120 

121 

122 @compiles(SLBigInteger) 

123 def bi_c(element, compiler, **kw): 

124 return compiler.visit_BIGINT(element, **kw) 

125 

126 

127 table = Table( 

128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True) 

129 ) 

130 

131.. seealso:: 

132 

133 :meth:`.TypeEngine.with_variant` 

134 

135 :ref:`sqlalchemy.ext.compiler_toplevel` 

136 

137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ 

138 

139.. _sqlite_transactions: 

140 

141Transactions with SQLite and the sqlite3 driver 

142----------------------------------------------- 

143 

144As a file-based database, SQLite's approach to transactions differs from 

145traditional databases in many ways. Additionally, the ``sqlite3`` driver 

146standard with Python (as well as the async version ``aiosqlite`` which builds 

147on top of it) has several quirks, workarounds, and API features in the 

148area of transaction control, all of which generally need to be addressed when 

149constructing a SQLAlchemy application that uses SQLite. 

150 

151Legacy Transaction Mode with the sqlite3 driver 

152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

153 

154The most important aspect of transaction handling with the sqlite3 driver is 

155that it defaults (which will continue through Python 3.15 before being 

156removed in Python 3.16) to legacy transactional behavior which does 

157not strictly follow :pep:`249`. The way in which the driver diverges from the 

158PEP is that it does not "begin" a transaction automatically as dictated by 

159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and 

160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon 

161the first SQL statement of any kind, so that all subsequent operations will 

162be established within a transaction until ``connection.commit()`` has been 

163called. The ``sqlite3`` driver, in an effort to be easier to use in 

164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, 

165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy 

166reasons. Statements such as SAVEPOINT are also skipped. 

167 

168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy 

169mode of operation is referred to as 

170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in 

171effect by default due to the ``Connection.autocommit`` parameter being set to 

172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12, 

173the ``Connection.autocommit`` attribute did not exist. 

174 

175The implications of legacy transaction mode include: 

176 

177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE, 

178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not 

179 started already, leading to the changes by each statement being 

180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very 

181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these 

182 operations even if a transaction were present, however this is no longer the 

183 case. 

184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation 

185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file- 

186 based system that locks the database file entirely for write operations, 

187 preventing COMMIT until all reader transactions (and associated file locks) 

188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT 

189 statements, which causes these SELECT statements to no longer be "repeatable", 

190 failing one of the consistency guarantees of SERIALIZABLE. 

191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not 

192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its 

193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK 

194 of the transaction will not rollback elements that were part of a released 

195 savepoint. 

196 

197Legacy transaction mode first existed in order to facilitate working around 

198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to 

199get "database is locked" errors, particularly when newer features like "write 

200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy 

201transaction mode is still the default mode of operation; disabling it will 

202produce behavior that is more susceptible to locked database errors. However 

203note that **legacy transaction mode will no longer be the default** in a future 

204Python version (3.16 as of this writing). 

205 

206.. _sqlite_enabling_transactions: 

207 

208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver 

209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

210 

211Current SQLAlchemy support allows either for setting the 

212``.Connection.autocommit`` attribute, most directly by using a 

213:func:`._sa.create_engine` parameter, or if on an older version of Python where 

214the attribute is not available, using event hooks to control the behavior of 

215BEGIN. 

216 

217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above) 

218 

219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_, 

220 the most straightforward approach is to set the attribute to its recommended value 

221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``:: 

222 

223 from sqlalchemy import create_engine 

224 

225 engine = create_engine( 

226 "sqlite:///myfile.db", connect_args={"autocommit": False} 

227 ) 

228 

229 This parameter is also passed through when using the aiosqlite driver:: 

230 

231 from sqlalchemy.ext.asyncio import create_async_engine 

232 

233 engine = create_async_engine( 

234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False} 

235 ) 

236 

237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect` 

238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this 

239 attribute on its ``Connection`` object:: 

240 

241 from sqlalchemy import create_engine, event 

242 

243 engine = create_engine("sqlite:///myfile.db") 

244 

245 

246 @event.listens_for(engine, "connect") 

247 def do_connect(dbapi_connection, connection_record): 

248 # enable autocommit=False mode 

249 dbapi_connection.autocommit = False 

250 

251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite) 

252 

253 For older versions of ``sqlite3`` or for cross-compatibility with older and 

254 newer versions, SQLAlchemy can also take over the job of transaction control. 

255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook 

256 to emit the "BEGIN" command directly, while also disabling SQLite's control 

257 of this command using the :meth:`.PoolEvents.connect` event hook to set the 

258 ``Connection.isolation_level`` attribute to ``None``:: 

259 

260 

261 from sqlalchemy import create_engine, event 

262 

263 engine = create_engine("sqlite:///myfile.db") 

264 

265 

266 @event.listens_for(engine, "connect") 

267 def do_connect(dbapi_connection, connection_record): 

268 # disable sqlite3's emitting of the BEGIN statement entirely. 

269 dbapi_connection.isolation_level = None 

270 

271 

272 @event.listens_for(engine, "begin") 

273 def do_begin(conn): 

274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly 

275 conn.exec_driver_sql("BEGIN") 

276 

277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine`` 

278 as in the example below:: 

279 

280 from sqlalchemy import create_engine, event 

281 from sqlalchemy.ext.asyncio import create_async_engine 

282 

283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 

284 

285 

286 @event.listens_for(engine.sync_engine, "connect") 

287 def do_connect(dbapi_connection, connection_record): 

288 # disable aiosqlite's emitting of the BEGIN statement entirely. 

289 dbapi_connection.isolation_level = None 

290 

291 

292 @event.listens_for(engine.sync_engine, "begin") 

293 def do_begin(conn): 

294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly 

295 conn.exec_driver_sql("BEGIN") 

296 

297.. _sqlite_isolation_level: 

298 

299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite 

300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

301 

302SQLAlchemy has a comprehensive database isolation feature with optional 

303autocommit support that is introduced in the section :ref:`dbapi_autocommit`. 

304 

305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes 

306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible 

307with the non-legacy isolation mode hooks documented in the previous 

308section at :ref:`sqlite_enabling_transactions`. 

309 

310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit, 

311create an engine setting the :paramref:`_sa.create_engine.isolation_level` 

312parameter to "AUTOCOMMIT":: 

313 

314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT") 

315 

316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit`` 

317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL`` 

318as well as hooks that emit ``BEGIN`` should be disabled. 

319 

320Additional Reading for SQLite / sqlite3 transaction control 

321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

322 

323Links with important information on SQLite, the sqlite3 driver, 

324as well as long historical conversations on how things got to their current state: 

325 

326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website 

327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well 

328 as the legacy isolation_level attribute. 

329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github 

330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github 

331 

332 

333INSERT/UPDATE/DELETE...RETURNING 

334--------------------------------- 

335 

336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING`` 

337syntax. ``INSERT..RETURNING`` may be used 

338automatically in some cases in order to fetch newly generated identifiers in 

339place of the traditional approach of using ``cursor.lastrowid``, however 

340``cursor.lastrowid`` is currently still preferred for simple single-statement 

341cases for its better performance. 

342 

343To specify an explicit ``RETURNING`` clause, use the 

344:meth:`._UpdateBase.returning` method on a per-statement basis:: 

345 

346 # INSERT..RETURNING 

347 result = connection.execute( 

348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2) 

349 ) 

350 print(result.all()) 

351 

352 # UPDATE..RETURNING 

353 result = connection.execute( 

354 table.update() 

355 .where(table.c.name == "foo") 

356 .values(name="bar") 

357 .returning(table.c.col1, table.c.col2) 

358 ) 

359 print(result.all()) 

360 

361 # DELETE..RETURNING 

362 result = connection.execute( 

363 table.delete() 

364 .where(table.c.name == "foo") 

365 .returning(table.c.col1, table.c.col2) 

366 ) 

367 print(result.all()) 

368 

369.. versionadded:: 2.0 Added support for SQLite RETURNING 

370 

371 

372.. _sqlite_foreign_keys: 

373 

374Foreign Key Support 

375------------------- 

376 

377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, 

378however by default these constraints have no effect on the operation of the 

379table. 

380 

381Constraint checking on SQLite has three prerequisites: 

382 

383* At least version 3.6.19 of SQLite must be in use 

384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY 

385 or SQLITE_OMIT_TRIGGER symbols enabled. 

386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all 

387 connections before use -- including the initial call to 

388 :meth:`sqlalchemy.schema.MetaData.create_all`. 

389 

390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for 

391new connections through the usage of events:: 

392 

393 from sqlalchemy.engine import Engine 

394 from sqlalchemy import event 

395 

396 

397 @event.listens_for(Engine, "connect") 

398 def set_sqlite_pragma(dbapi_connection, connection_record): 

399 # the sqlite3 driver will not set PRAGMA foreign_keys 

400 # if autocommit=False; set to True temporarily 

401 ac = dbapi_connection.autocommit 

402 dbapi_connection.autocommit = True 

403 

404 cursor = dbapi_connection.cursor() 

405 cursor.execute("PRAGMA foreign_keys=ON") 

406 cursor.close() 

407 

408 # restore previous autocommit setting 

409 dbapi_connection.autocommit = ac 

410 

411.. warning:: 

412 

413 When SQLite foreign keys are enabled, it is **not possible** 

414 to emit CREATE or DROP statements for tables that contain 

415 mutually-dependent foreign key constraints; 

416 to emit the DDL for these tables requires that ALTER TABLE be used to 

417 create or drop these constraints separately, for which SQLite has 

418 no support. 

419 

420.. seealso:: 

421 

422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ 

423 - on the SQLite web site. 

424 

425 :ref:`event_toplevel` - SQLAlchemy event API. 

426 

427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling 

428 mutually-dependent foreign key constraints. 

429 

430.. _sqlite_on_conflict_ddl: 

431 

432ON CONFLICT support for constraints 

433----------------------------------- 

434 

435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for 

436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as 

437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. 

438 

439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied 

440to primary key, unique, check, and not null constraints. In DDL, it is 

441rendered either within the "CONSTRAINT" clause or within the column definition 

442itself depending on the location of the target constraint. To render this 

443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be 

444specified with a string conflict resolution algorithm within the 

445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, 

446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object, 

447there 

448are individual parameters ``sqlite_on_conflict_not_null``, 

449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each 

450correspond to the three types of relevant constraint types that can be 

451indicated from a :class:`_schema.Column` object. 

452 

453.. seealso:: 

454 

455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite 

456 documentation 

457 

458The ``sqlite_on_conflict`` parameters accept a string argument which is just 

459the resolution name to be chosen, which on SQLite can be one of ROLLBACK, 

460ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint 

461that specifies the IGNORE algorithm:: 

462 

463 some_table = Table( 

464 "some_table", 

465 metadata, 

466 Column("id", Integer, primary_key=True), 

467 Column("data", Integer), 

468 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"), 

469 ) 

470 

471The above renders CREATE TABLE DDL as: 

472 

473.. sourcecode:: sql 

474 

475 CREATE TABLE some_table ( 

476 id INTEGER NOT NULL, 

477 data INTEGER, 

478 PRIMARY KEY (id), 

479 UNIQUE (id, data) ON CONFLICT IGNORE 

480 ) 

481 

482 

483When using the :paramref:`_schema.Column.unique` 

484flag to add a UNIQUE constraint 

485to a single column, the ``sqlite_on_conflict_unique`` parameter can 

486be added to the :class:`_schema.Column` as well, which will be added to the 

487UNIQUE constraint in the DDL:: 

488 

489 some_table = Table( 

490 "some_table", 

491 metadata, 

492 Column("id", Integer, primary_key=True), 

493 Column( 

494 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE" 

495 ), 

496 ) 

497 

498rendering: 

499 

500.. sourcecode:: sql 

501 

502 CREATE TABLE some_table ( 

503 id INTEGER NOT NULL, 

504 data INTEGER, 

505 PRIMARY KEY (id), 

506 UNIQUE (data) ON CONFLICT IGNORE 

507 ) 

508 

509To apply the FAIL algorithm for a NOT NULL constraint, 

510``sqlite_on_conflict_not_null`` is used:: 

511 

512 some_table = Table( 

513 "some_table", 

514 metadata, 

515 Column("id", Integer, primary_key=True), 

516 Column( 

517 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL" 

518 ), 

519 ) 

520 

521this renders the column inline ON CONFLICT phrase: 

522 

523.. sourcecode:: sql 

524 

525 CREATE TABLE some_table ( 

526 id INTEGER NOT NULL, 

527 data INTEGER NOT NULL ON CONFLICT FAIL, 

528 PRIMARY KEY (id) 

529 ) 

530 

531 

532Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: 

533 

534 some_table = Table( 

535 "some_table", 

536 metadata, 

537 Column( 

538 "id", 

539 Integer, 

540 primary_key=True, 

541 sqlite_on_conflict_primary_key="FAIL", 

542 ), 

543 ) 

544 

545SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict 

546resolution algorithm is applied to the constraint itself: 

547 

548.. sourcecode:: sql 

549 

550 CREATE TABLE some_table ( 

551 id INTEGER NOT NULL, 

552 PRIMARY KEY (id) ON CONFLICT FAIL 

553 ) 

554 

555.. _sqlite_on_conflict_insert: 

556 

557INSERT...ON CONFLICT (Upsert) 

558----------------------------- 

559 

560.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for 

561 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as 

562 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. 

563 

564From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) 

565of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` 

566statement. A candidate row will only be inserted if that row does not violate 

567any unique or primary key constraints. In the case of a unique constraint violation, a 

568secondary action can occur which can be either "DO UPDATE", indicating that 

569the data in the target row should be updated, or "DO NOTHING", which indicates 

570to silently skip this row. 

571 

572Conflicts are determined using columns that are part of existing unique 

573constraints and indexes. These constraints are identified by stating the 

574columns and conditions that comprise the indexes. 

575 

576SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific 

577:func:`_sqlite.insert()` function, which provides 

578the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` 

579and :meth:`_sqlite.Insert.on_conflict_do_nothing`: 

580 

581.. sourcecode:: pycon+sql 

582 

583 >>> from sqlalchemy.dialects.sqlite import insert 

584 

585 >>> insert_stmt = insert(my_table).values( 

586 ... id="some_existing_id", data="inserted value" 

587 ... ) 

588 

589 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

590 ... index_elements=["id"], set_=dict(data="updated value") 

591 ... ) 

592 

593 >>> print(do_update_stmt) 

594 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

595 ON CONFLICT (id) DO UPDATE SET data = ?{stop} 

596 

597 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

598 

599 >>> print(do_nothing_stmt) 

600 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

601 ON CONFLICT (id) DO NOTHING 

602 

603.. versionadded:: 1.4 

604 

605.. seealso:: 

606 

607 `Upsert 

608 <https://sqlite.org/lang_UPSERT.html>`_ 

609 - in the SQLite documentation. 

610 

611 

612Specifying the Target 

613^^^^^^^^^^^^^^^^^^^^^ 

614 

615Both methods supply the "target" of the conflict using column inference: 

616 

617* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument 

618 specifies a sequence containing string column names, :class:`_schema.Column` 

619 objects, and/or SQL expression elements, which would identify a unique index 

620 or unique constraint. 

621 

622* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` 

623 to infer an index, a partial index can be inferred by also specifying the 

624 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: 

625 

626 .. sourcecode:: pycon+sql 

627 

628 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

629 

630 >>> do_update_stmt = stmt.on_conflict_do_update( 

631 ... index_elements=[my_table.c.user_email], 

632 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

633 ... set_=dict(data=stmt.excluded.data), 

634 ... ) 

635 

636 >>> print(do_update_stmt) 

637 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) 

638 ON CONFLICT (user_email) 

639 WHERE user_email LIKE '%@gmail.com' 

640 DO UPDATE SET data = excluded.data 

641 

642The SET Clause 

643^^^^^^^^^^^^^^^ 

644 

645``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

646existing row, using any combination of new values as well as values 

647from the proposed insertion. These values are specified using the 

648:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This 

649parameter accepts a dictionary which consists of direct values 

650for UPDATE: 

651 

652.. sourcecode:: pycon+sql 

653 

654 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

655 

656 >>> do_update_stmt = stmt.on_conflict_do_update( 

657 ... index_elements=["id"], set_=dict(data="updated value") 

658 ... ) 

659 

660 >>> print(do_update_stmt) 

661 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

662 ON CONFLICT (id) DO UPDATE SET data = ? 

663 

664.. warning:: 

665 

666 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take 

667 into account Python-side default UPDATE values or generation functions, 

668 e.g. those specified using :paramref:`_schema.Column.onupdate`. These 

669 values will not be exercised for an ON CONFLICT style of UPDATE, unless 

670 they are manually specified in the 

671 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. 

672 

673Updating using the Excluded INSERT Values 

674^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

675 

676In order to refer to the proposed insertion row, the special alias 

677:attr:`~.sqlite.Insert.excluded` is available as an attribute on 

678the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix 

679on a column, that informs the DO UPDATE to update the row with the value that 

680would have been inserted had the constraint not failed: 

681 

682.. sourcecode:: pycon+sql 

683 

684 >>> stmt = insert(my_table).values( 

685 ... id="some_id", data="inserted value", author="jlh" 

686 ... ) 

687 

688 >>> do_update_stmt = stmt.on_conflict_do_update( 

689 ... index_elements=["id"], 

690 ... set_=dict(data="updated value", author=stmt.excluded.author), 

691 ... ) 

692 

693 >>> print(do_update_stmt) 

694 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

695 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

696 

697Additional WHERE Criteria 

698^^^^^^^^^^^^^^^^^^^^^^^^^ 

699 

700The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts 

701a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` 

702parameter, which will limit those rows which receive an UPDATE: 

703 

704.. sourcecode:: pycon+sql 

705 

706 >>> stmt = insert(my_table).values( 

707 ... id="some_id", data="inserted value", author="jlh" 

708 ... ) 

709 

710 >>> on_update_stmt = stmt.on_conflict_do_update( 

711 ... index_elements=["id"], 

712 ... set_=dict(data="updated value", author=stmt.excluded.author), 

713 ... where=(my_table.c.status == 2), 

714 ... ) 

715 >>> print(on_update_stmt) 

716 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

717 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

718 WHERE my_table.status = ? 

719 

720 

721Skipping Rows with DO NOTHING 

722^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

723 

724``ON CONFLICT`` may be used to skip inserting a row entirely 

725if any conflict with a unique constraint occurs; below this is illustrated 

726using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: 

727 

728.. sourcecode:: pycon+sql 

729 

730 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

731 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

732 >>> print(stmt) 

733 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING 

734 

735 

736If ``DO NOTHING`` is used without specifying any columns or constraint, 

737it has the effect of skipping the INSERT for any unique violation which 

738occurs: 

739 

740.. sourcecode:: pycon+sql 

741 

742 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

743 >>> stmt = stmt.on_conflict_do_nothing() 

744 >>> print(stmt) 

745 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING 

746 

747.. _sqlite_type_reflection: 

748 

749Type Reflection 

750--------------- 

751 

752SQLite types are unlike those of most other database backends, in that 

753the string name of the type usually does not correspond to a "type" in a 

754one-to-one fashion. Instead, SQLite links per-column typing behavior 

755to one of five so-called "type affinities" based on a string matching 

756pattern for the type. 

757 

758SQLAlchemy's reflection process, when inspecting types, uses a simple 

759lookup table to link the keywords returned to provided SQLAlchemy types. 

760This lookup table is present within the SQLite dialect as it is for all 

761other dialects. However, the SQLite dialect has a different "fallback" 

762routine for when a particular type name is not located in the lookup map; 

763it instead implements the SQLite "type affinity" scheme located at 

764https://www.sqlite.org/datatype3.html section 2.1. 

765 

766The provided typemap will make direct associations from an exact string 

767name match for the following types: 

768 

769:class:`_types.BIGINT`, :class:`_types.BLOB`, 

770:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, 

771:class:`_types.CHAR`, :class:`_types.DATE`, 

772:class:`_types.DATETIME`, :class:`_types.FLOAT`, 

773:class:`_types.DECIMAL`, :class:`_types.FLOAT`, 

774:class:`_types.INTEGER`, :class:`_types.INTEGER`, 

775:class:`_types.NUMERIC`, :class:`_types.REAL`, 

776:class:`_types.SMALLINT`, :class:`_types.TEXT`, 

777:class:`_types.TIME`, :class:`_types.TIMESTAMP`, 

778:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, 

779:class:`_types.NCHAR` 

780 

781When a type name does not match one of the above types, the "type affinity" 

782lookup is used instead: 

783 

784* :class:`_types.INTEGER` is returned if the type name includes the 

785 string ``INT`` 

786* :class:`_types.TEXT` is returned if the type name includes the 

787 string ``CHAR``, ``CLOB`` or ``TEXT`` 

788* :class:`_types.NullType` is returned if the type name includes the 

789 string ``BLOB`` 

790* :class:`_types.REAL` is returned if the type name includes the string 

791 ``REAL``, ``FLOA`` or ``DOUB``. 

792* Otherwise, the :class:`_types.NUMERIC` type is used. 

793 

794.. _sqlite_partial_index: 

795 

796Partial Indexes 

797--------------- 

798 

799A partial index, e.g. one which uses a WHERE clause, can be specified 

800with the DDL system using the argument ``sqlite_where``:: 

801 

802 tbl = Table("testtbl", m, Column("data", Integer)) 

803 idx = Index( 

804 "test_idx1", 

805 tbl.c.data, 

806 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10), 

807 ) 

808 

809The index will be rendered at create time as: 

810 

811.. sourcecode:: sql 

812 

813 CREATE INDEX test_idx1 ON testtbl (data) 

814 WHERE data > 5 AND data < 10 

815 

816.. _sqlite_dotted_column_names: 

817 

818Dotted Column Names 

819------------------- 

820 

821Using table or column names that explicitly have periods in them is 

822**not recommended**. While this is generally a bad idea for relational 

823databases in general, as the dot is a syntactically significant character, 

824the SQLite driver up until version **3.10.0** of SQLite has a bug which 

825requires that SQLAlchemy filter out these dots in result sets. 

826 

827The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: 

828 

829 import sqlite3 

830 

831 assert sqlite3.sqlite_version_info < ( 

832 3, 

833 10, 

834 0, 

835 ), "bug is fixed in this version" 

836 

837 conn = sqlite3.connect(":memory:") 

838 cursor = conn.cursor() 

839 

840 cursor.execute("create table x (a integer, b integer)") 

841 cursor.execute("insert into x (a, b) values (1, 1)") 

842 cursor.execute("insert into x (a, b) values (2, 2)") 

843 

844 cursor.execute("select x.a, x.b from x") 

845 assert [c[0] for c in cursor.description] == ["a", "b"] 

846 

847 cursor.execute(""" 

848 select x.a, x.b from x where a=1 

849 union 

850 select x.a, x.b from x where a=2 

851 """) 

852 assert [c[0] for c in cursor.description] == ["a", "b"], [ 

853 c[0] for c in cursor.description 

854 ] 

855 

856The second assertion fails: 

857 

858.. sourcecode:: text 

859 

860 Traceback (most recent call last): 

861 File "test.py", line 19, in <module> 

862 [c[0] for c in cursor.description] 

863 AssertionError: ['x.a', 'x.b'] 

864 

865Where above, the driver incorrectly reports the names of the columns 

866including the name of the table, which is entirely inconsistent vs. 

867when the UNION is not present. 

868 

869SQLAlchemy relies upon column names being predictable in how they match 

870to the original statement, so the SQLAlchemy dialect has no choice but 

871to filter these out:: 

872 

873 

874 from sqlalchemy import create_engine 

875 

876 eng = create_engine("sqlite://") 

877 conn = eng.connect() 

878 

879 conn.exec_driver_sql("create table x (a integer, b integer)") 

880 conn.exec_driver_sql("insert into x (a, b) values (1, 1)") 

881 conn.exec_driver_sql("insert into x (a, b) values (2, 2)") 

882 

883 result = conn.exec_driver_sql("select x.a, x.b from x") 

884 assert result.keys() == ["a", "b"] 

885 

886 result = conn.exec_driver_sql(""" 

887 select x.a, x.b from x where a=1 

888 union 

889 select x.a, x.b from x where a=2 

890 """) 

891 assert result.keys() == ["a", "b"] 

892 

893Note that above, even though SQLAlchemy filters out the dots, *both 

894names are still addressable*:: 

895 

896 >>> row = result.first() 

897 >>> row["a"] 

898 1 

899 >>> row["x.a"] 

900 1 

901 >>> row["b"] 

902 1 

903 >>> row["x.b"] 

904 1 

905 

906Therefore, the workaround applied by SQLAlchemy only impacts 

907:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In 

908the very specific case where an application is forced to use column names that 

909contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and 

910:meth:`.Row.keys()` is required to return these dotted names unmodified, 

911the ``sqlite_raw_colnames`` execution option may be provided, either on a 

912per-:class:`_engine.Connection` basis:: 

913 

914 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql( 

915 """ 

916 select x.a, x.b from x where a=1 

917 union 

918 select x.a, x.b from x where a=2 

919 """ 

920 ) 

921 assert result.keys() == ["x.a", "x.b"] 

922 

923or on a per-:class:`_engine.Engine` basis:: 

924 

925 engine = create_engine( 

926 "sqlite://", execution_options={"sqlite_raw_colnames": True} 

927 ) 

928 

929When using the per-:class:`_engine.Engine` execution option, note that 

930**Core and ORM queries that use UNION may not function properly**. 

931 

932SQLite-specific table options 

933----------------------------- 

934 

935One option for CREATE TABLE is supported directly by the SQLite 

936dialect in conjunction with the :class:`_schema.Table` construct: 

937 

938* ``WITHOUT ROWID``:: 

939 

940 Table("some_table", metadata, ..., sqlite_with_rowid=False) 

941 

942* 

943 ``STRICT``:: 

944 

945 Table("some_table", metadata, ..., sqlite_strict=True) 

946 

947 .. versionadded:: 2.0.37 

948 

949.. seealso:: 

950 

951 `SQLite CREATE TABLE options 

952 <https://www.sqlite.org/lang_createtable.html>`_ 

953 

954.. _sqlite_include_internal: 

955 

956Reflecting internal schema tables 

957---------------------------------- 

958 

959Reflection methods that return lists of tables will omit so-called 

960"SQLite internal schema object" names, which are considered by SQLite 

961as any object name that is prefixed with ``sqlite_``. An example of 

962such an object is the ``sqlite_sequence`` table that's generated when 

963the ``AUTOINCREMENT`` column parameter is used. In order to return 

964these objects, the parameter ``sqlite_include_internal=True`` may be 

965passed to methods such as :meth:`_schema.MetaData.reflect` or 

966:meth:`.Inspector.get_table_names`. 

967 

968.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter. 

969 Previously, these tables were not ignored by SQLAlchemy reflection 

970 methods. 

971 

972.. note:: 

973 

974 The ``sqlite_include_internal`` parameter does not refer to the 

975 "system" tables that are present in schemas such as ``sqlite_master``. 

976 

977.. seealso:: 

978 

979 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite 

980 documentation. 

981 

982''' # noqa 

983 

984from __future__ import annotations 

985 

986import datetime 

987import numbers 

988import re 

989from typing import Any 

990from typing import Callable 

991from typing import Optional 

992from typing import TYPE_CHECKING 

993 

994from .json import JSON 

995from .json import JSONB 

996from .json import JSONIndexType 

997from .json import JSONPathType 

998from ... import exc 

999from ... import schema as sa_schema 

1000from ... import sql 

1001from ... import text 

1002from ... import types as sqltypes 

1003from ... import util 

1004from ...engine import default 

1005from ...engine import processors 

1006from ...engine import reflection 

1007from ...engine.reflection import ReflectionDefaults 

1008from ...sql import coercions 

1009from ...sql import compiler 

1010from ...sql import ddl as sa_ddl 

1011from ...sql import elements 

1012from ...sql import roles 

1013from ...sql import schema 

1014from ...types import BLOB # noqa 

1015from ...types import BOOLEAN # noqa 

1016from ...types import CHAR # noqa 

1017from ...types import DECIMAL # noqa 

1018from ...types import FLOAT # noqa 

1019from ...types import INTEGER # noqa 

1020from ...types import NUMERIC # noqa 

1021from ...types import REAL # noqa 

1022from ...types import SMALLINT # noqa 

1023from ...types import TEXT # noqa 

1024from ...types import TIMESTAMP # noqa 

1025from ...types import VARCHAR # noqa 

1026 

1027if TYPE_CHECKING: 

1028 from ...engine.interfaces import DBAPIConnection 

1029 from ...engine.interfaces import Dialect 

1030 from ...engine.interfaces import IsolationLevel 

1031 from ...sql.sqltypes import _JSON_VALUE 

1032 from ...sql.type_api import _BindProcessorType 

1033 from ...sql.type_api import _ResultProcessorType 

1034 

1035 

1036class _SQliteJson(JSON): 

1037 def result_processor(self, dialect, coltype): 

1038 default_processor = super().result_processor(dialect, coltype) 

1039 

1040 def process(value): 

1041 try: 

1042 return default_processor(value) 

1043 except TypeError: 

1044 if isinstance(value, numbers.Number): 

1045 return value 

1046 else: 

1047 raise 

1048 

1049 return process 

1050 

1051 

1052class _DateTimeMixin: 

1053 _reg = None 

1054 _storage_format = None 

1055 

1056 def __init__(self, storage_format=None, regexp=None, **kw): 

1057 super().__init__(**kw) 

1058 if regexp is not None: 

1059 self._reg = re.compile(regexp) 

1060 if storage_format is not None: 

1061 self._storage_format = storage_format 

1062 

1063 @property 

1064 def format_is_text_affinity(self): 

1065 """return True if the storage format will automatically imply 

1066 a TEXT affinity. 

1067 

1068 If the storage format contains no non-numeric characters, 

1069 it will imply a NUMERIC storage format on SQLite; in this case, 

1070 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, 

1071 TIME_CHAR. 

1072 

1073 """ 

1074 spec = self._storage_format % { 

1075 "year": 0, 

1076 "month": 0, 

1077 "day": 0, 

1078 "hour": 0, 

1079 "minute": 0, 

1080 "second": 0, 

1081 "microsecond": 0, 

1082 } 

1083 return bool(re.search(r"[^0-9]", spec)) 

1084 

1085 def adapt(self, cls, **kw): 

1086 if issubclass(cls, _DateTimeMixin): 

1087 if self._storage_format: 

1088 kw["storage_format"] = self._storage_format 

1089 if self._reg: 

1090 kw["regexp"] = self._reg 

1091 return super().adapt(cls, **kw) 

1092 

1093 def literal_processor(self, dialect): 

1094 bp = self.bind_processor(dialect) 

1095 

1096 def process(value): 

1097 return "'%s'" % bp(value) 

1098 

1099 return process 

1100 

1101 

1102class DATETIME(_DateTimeMixin, sqltypes.DateTime): 

1103 r"""Represent a Python datetime object in SQLite using a string. 

1104 

1105 The default string storage format is:: 

1106 

1107 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1108 

1109 e.g.: 

1110 

1111 .. sourcecode:: text 

1112 

1113 2021-03-15 12:05:57.105542 

1114 

1115 The incoming storage format is by default parsed using the 

1116 Python ``datetime.fromisoformat()`` function. 

1117 

1118 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default 

1119 datetime string parsing. 

1120 

1121 The storage format can be customized to some degree using the 

1122 ``storage_format`` and ``regexp`` parameters, such as:: 

1123 

1124 import re 

1125 from sqlalchemy.dialects.sqlite import DATETIME 

1126 

1127 dt = DATETIME( 

1128 storage_format=( 

1129 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d" 

1130 ), 

1131 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)", 

1132 ) 

1133 

1134 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1135 from the datetime. Can't be specified together with ``storage_format`` 

1136 or ``regexp``. 

1137 

1138 :param storage_format: format string which will be applied to the dict 

1139 with keys year, month, day, hour, minute, second, and microsecond. 

1140 

1141 :param regexp: regular expression which will be applied to incoming result 

1142 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1143 strings. If the regexp contains named groups, the resulting match dict is 

1144 applied to the Python datetime() constructor as keyword arguments. 

1145 Otherwise, if positional groups are used, the datetime() constructor 

1146 is called with positional arguments via 

1147 ``*map(int, match_obj.groups(0))``. 

1148 

1149 """ # noqa 

1150 

1151 _storage_format = ( 

1152 "%(year)04d-%(month)02d-%(day)02d " 

1153 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1154 ) 

1155 

1156 def __init__(self, *args, **kwargs): 

1157 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1158 super().__init__(*args, **kwargs) 

1159 if truncate_microseconds: 

1160 assert "storage_format" not in kwargs, ( 

1161 "You can specify only " 

1162 "one of truncate_microseconds or storage_format." 

1163 ) 

1164 assert "regexp" not in kwargs, ( 

1165 "You can specify only one of " 

1166 "truncate_microseconds or regexp." 

1167 ) 

1168 self._storage_format = ( 

1169 "%(year)04d-%(month)02d-%(day)02d " 

1170 "%(hour)02d:%(minute)02d:%(second)02d" 

1171 ) 

1172 

1173 def bind_processor( 

1174 self, dialect: Dialect 

1175 ) -> Optional[_BindProcessorType[Any]]: 

1176 datetime_datetime = datetime.datetime 

1177 datetime_date = datetime.date 

1178 format_ = self._storage_format 

1179 

1180 def process(value): 

1181 if value is None: 

1182 return None 

1183 elif isinstance(value, datetime_datetime): 

1184 return format_ % { 

1185 "year": value.year, 

1186 "month": value.month, 

1187 "day": value.day, 

1188 "hour": value.hour, 

1189 "minute": value.minute, 

1190 "second": value.second, 

1191 "microsecond": value.microsecond, 

1192 } 

1193 elif isinstance(value, datetime_date): 

1194 return format_ % { 

1195 "year": value.year, 

1196 "month": value.month, 

1197 "day": value.day, 

1198 "hour": 0, 

1199 "minute": 0, 

1200 "second": 0, 

1201 "microsecond": 0, 

1202 } 

1203 else: 

1204 raise TypeError( 

1205 "SQLite DateTime type only accepts Python " 

1206 "datetime and date objects as input." 

1207 ) 

1208 

1209 return process 

1210 

1211 def result_processor( 

1212 self, dialect: Dialect, coltype: object 

1213 ) -> Optional[_ResultProcessorType[Any]]: 

1214 if self._reg: 

1215 return processors.str_to_datetime_processor_factory( 

1216 self._reg, datetime.datetime 

1217 ) 

1218 else: 

1219 return processors.str_to_datetime 

1220 

1221 

1222class DATE(_DateTimeMixin, sqltypes.Date): 

1223 r"""Represent a Python date object in SQLite using a string. 

1224 

1225 The default string storage format is:: 

1226 

1227 "%(year)04d-%(month)02d-%(day)02d" 

1228 

1229 e.g.: 

1230 

1231 .. sourcecode:: text 

1232 

1233 2011-03-15 

1234 

1235 The incoming storage format is by default parsed using the 

1236 Python ``date.fromisoformat()`` function. 

1237 

1238 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default 

1239 date string parsing. 

1240 

1241 

1242 The storage format can be customized to some degree using the 

1243 ``storage_format`` and ``regexp`` parameters, such as:: 

1244 

1245 import re 

1246 from sqlalchemy.dialects.sqlite import DATE 

1247 

1248 d = DATE( 

1249 storage_format="%(month)02d/%(day)02d/%(year)04d", 

1250 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"), 

1251 ) 

1252 

1253 :param storage_format: format string which will be applied to the 

1254 dict with keys year, month, and day. 

1255 

1256 :param regexp: regular expression which will be applied to 

1257 incoming result rows, replacing the use of ``date.fromisoformat()`` to 

1258 parse incoming strings. If the regexp contains named groups, the resulting 

1259 match dict is applied to the Python date() constructor as keyword 

1260 arguments. Otherwise, if positional groups are used, the date() 

1261 constructor is called with positional arguments via 

1262 ``*map(int, match_obj.groups(0))``. 

1263 

1264 """ 

1265 

1266 _storage_format = "%(year)04d-%(month)02d-%(day)02d" 

1267 

1268 def bind_processor( 

1269 self, dialect: Dialect 

1270 ) -> Optional[_BindProcessorType[Any]]: 

1271 datetime_date = datetime.date 

1272 format_ = self._storage_format 

1273 

1274 def process(value): 

1275 if value is None: 

1276 return None 

1277 elif isinstance(value, datetime_date): 

1278 return format_ % { 

1279 "year": value.year, 

1280 "month": value.month, 

1281 "day": value.day, 

1282 } 

1283 else: 

1284 raise TypeError( 

1285 "SQLite Date type only accepts Python " 

1286 "date objects as input." 

1287 ) 

1288 

1289 return process 

1290 

1291 def result_processor( 

1292 self, dialect: Dialect, coltype: object 

1293 ) -> Optional[_ResultProcessorType[Any]]: 

1294 if self._reg: 

1295 return processors.str_to_datetime_processor_factory( 

1296 self._reg, datetime.date 

1297 ) 

1298 else: 

1299 return processors.str_to_date 

1300 

1301 

1302class TIME(_DateTimeMixin, sqltypes.Time): 

1303 r"""Represent a Python time object in SQLite using a string. 

1304 

1305 The default string storage format is:: 

1306 

1307 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1308 

1309 e.g.: 

1310 

1311 .. sourcecode:: text 

1312 

1313 12:05:57.10558 

1314 

1315 The incoming storage format is by default parsed using the 

1316 Python ``time.fromisoformat()`` function. 

1317 

1318 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default 

1319 time string parsing. 

1320 

1321 The storage format can be customized to some degree using the 

1322 ``storage_format`` and ``regexp`` parameters, such as:: 

1323 

1324 import re 

1325 from sqlalchemy.dialects.sqlite import TIME 

1326 

1327 t = TIME( 

1328 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d", 

1329 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"), 

1330 ) 

1331 

1332 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1333 from the time. Can't be specified together with ``storage_format`` 

1334 or ``regexp``. 

1335 

1336 :param storage_format: format string which will be applied to the dict 

1337 with keys hour, minute, second, and microsecond. 

1338 

1339 :param regexp: regular expression which will be applied to incoming result 

1340 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1341 strings. If the regexp contains named groups, the resulting match dict is 

1342 applied to the Python time() constructor as keyword arguments. Otherwise, 

1343 if positional groups are used, the time() constructor is called with 

1344 positional arguments via ``*map(int, match_obj.groups(0))``. 

1345 

1346 """ 

1347 

1348 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1349 

1350 def __init__(self, *args, **kwargs): 

1351 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1352 super().__init__(*args, **kwargs) 

1353 if truncate_microseconds: 

1354 assert "storage_format" not in kwargs, ( 

1355 "You can specify only " 

1356 "one of truncate_microseconds or storage_format." 

1357 ) 

1358 assert "regexp" not in kwargs, ( 

1359 "You can specify only one of " 

1360 "truncate_microseconds or regexp." 

1361 ) 

1362 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" 

1363 

1364 def bind_processor(self, dialect): 

1365 datetime_time = datetime.time 

1366 format_ = self._storage_format 

1367 

1368 def process(value): 

1369 if value is None: 

1370 return None 

1371 elif isinstance(value, datetime_time): 

1372 return format_ % { 

1373 "hour": value.hour, 

1374 "minute": value.minute, 

1375 "second": value.second, 

1376 "microsecond": value.microsecond, 

1377 } 

1378 else: 

1379 raise TypeError( 

1380 "SQLite Time type only accepts Python " 

1381 "time objects as input." 

1382 ) 

1383 

1384 return process 

1385 

1386 def result_processor(self, dialect, coltype): 

1387 if self._reg: 

1388 return processors.str_to_datetime_processor_factory( 

1389 self._reg, datetime.time 

1390 ) 

1391 else: 

1392 return processors.str_to_time 

1393 

1394 

1395colspecs = { 

1396 sqltypes.Date: DATE, 

1397 sqltypes.DateTime: DATETIME, 

1398 sqltypes.JSON: _SQliteJson, 

1399 sqltypes.JSON.JSONIndexType: JSONIndexType, 

1400 sqltypes.JSON.JSONPathType: JSONPathType, 

1401 sqltypes.Time: TIME, 

1402 JSONB: JSONB, 

1403} 

1404 

1405ischema_names = { 

1406 "BIGINT": sqltypes.BIGINT, 

1407 "BLOB": sqltypes.BLOB, 

1408 "BOOL": sqltypes.BOOLEAN, 

1409 "BOOLEAN": sqltypes.BOOLEAN, 

1410 "CHAR": sqltypes.CHAR, 

1411 "DATE": sqltypes.DATE, 

1412 "DATE_CHAR": sqltypes.DATE, 

1413 "DATETIME": sqltypes.DATETIME, 

1414 "DATETIME_CHAR": sqltypes.DATETIME, 

1415 "DOUBLE": sqltypes.DOUBLE, 

1416 "DECIMAL": sqltypes.DECIMAL, 

1417 "FLOAT": sqltypes.FLOAT, 

1418 "INT": sqltypes.INTEGER, 

1419 "INTEGER": sqltypes.INTEGER, 

1420 "JSON": JSON, 

1421 "JSONB": JSONB, 

1422 "NUMERIC": sqltypes.NUMERIC, 

1423 "REAL": sqltypes.REAL, 

1424 "SMALLINT": sqltypes.SMALLINT, 

1425 "TEXT": sqltypes.TEXT, 

1426 "TIME": sqltypes.TIME, 

1427 "TIME_CHAR": sqltypes.TIME, 

1428 "TIMESTAMP": sqltypes.TIMESTAMP, 

1429 "VARCHAR": sqltypes.VARCHAR, 

1430 "NVARCHAR": sqltypes.NVARCHAR, 

1431 "NCHAR": sqltypes.NCHAR, 

1432} 

1433 

1434 

1435class SQLiteCompiler(compiler.SQLCompiler): 

1436 extract_map = util.update_copy( 

1437 compiler.SQLCompiler.extract_map, 

1438 { 

1439 "month": "%m", 

1440 "day": "%d", 

1441 "year": "%Y", 

1442 "second": "%S", 

1443 "hour": "%H", 

1444 "doy": "%j", 

1445 "minute": "%M", 

1446 "epoch": "%s", 

1447 "dow": "%w", 

1448 "week": "%W", 

1449 }, 

1450 ) 

1451 

1452 def visit_truediv_binary(self, binary, operator, **kw): 

1453 return ( 

1454 self.process(binary.left, **kw) 

1455 + " / " 

1456 + "(%s + 0.0)" % self.process(binary.right, **kw) 

1457 ) 

1458 

1459 def visit_now_func(self, fn, **kw): 

1460 return "CURRENT_TIMESTAMP" 

1461 

1462 def visit_localtimestamp_func(self, func, **kw): 

1463 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')" 

1464 

1465 def visit_true(self, expr, **kw): 

1466 return "1" 

1467 

1468 def visit_false(self, expr, **kw): 

1469 return "0" 

1470 

1471 def visit_char_length_func(self, fn, **kw): 

1472 return "length%s" % self.function_argspec(fn) 

1473 

1474 def visit_aggregate_strings_func(self, fn, **kw): 

1475 return super().visit_aggregate_strings_func( 

1476 fn, use_function_name="group_concat", **kw 

1477 ) 

1478 

1479 def visit_cast(self, cast, **kwargs): 

1480 if self.dialect.supports_cast: 

1481 return super().visit_cast(cast, **kwargs) 

1482 else: 

1483 return self.process(cast.clause, **kwargs) 

1484 

1485 def visit_extract(self, extract, **kw): 

1486 try: 

1487 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( 

1488 self.extract_map[extract.field], 

1489 self.process(extract.expr, **kw), 

1490 ) 

1491 except KeyError as err: 

1492 raise exc.CompileError( 

1493 "%s is not a valid extract argument." % extract.field 

1494 ) from err 

1495 

1496 def returning_clause( 

1497 self, 

1498 stmt, 

1499 returning_cols, 

1500 *, 

1501 populate_result_map, 

1502 **kw, 

1503 ): 

1504 kw["include_table"] = False 

1505 return super().returning_clause( 

1506 stmt, returning_cols, populate_result_map=populate_result_map, **kw 

1507 ) 

1508 

1509 def limit_clause(self, select, **kw): 

1510 text = "" 

1511 if select._limit_clause is not None: 

1512 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

1513 if select._offset_clause is not None: 

1514 if select._limit_clause is None: 

1515 text += "\n LIMIT " + self.process(sql.literal(-1)) 

1516 text += " OFFSET " + self.process(select._offset_clause, **kw) 

1517 else: 

1518 text += " OFFSET " + self.process(sql.literal(0), **kw) 

1519 return text 

1520 

1521 def for_update_clause(self, select, **kw): 

1522 # sqlite has no "FOR UPDATE" AFAICT 

1523 return "" 

1524 

1525 def update_from_clause( 

1526 self, update_stmt, from_table, extra_froms, from_hints, **kw 

1527 ): 

1528 kw["asfrom"] = True 

1529 return "FROM " + ", ".join( 

1530 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

1531 for t in extra_froms 

1532 ) 

1533 

1534 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1535 return "%s IS NOT %s" % ( 

1536 self.process(binary.left), 

1537 self.process(binary.right), 

1538 ) 

1539 

1540 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1541 return "%s IS %s" % ( 

1542 self.process(binary.left), 

1543 self.process(binary.right), 

1544 ) 

1545 

1546 def visit_json_getitem_op_binary( 

1547 self, binary, operator, _cast_applied=False, **kw 

1548 ): 

1549 if ( 

1550 not _cast_applied 

1551 and binary.type._type_affinity is not sqltypes.JSON 

1552 ): 

1553 kw["_cast_applied"] = True 

1554 return self.process(sql.cast(binary, binary.type), **kw) 

1555 

1556 if binary.type._type_affinity is sqltypes.JSON: 

1557 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1558 else: 

1559 expr = "JSON_EXTRACT(%s, %s)" 

1560 

1561 return expr % ( 

1562 self.process(binary.left, **kw), 

1563 self.process(binary.right, **kw), 

1564 ) 

1565 

1566 def visit_json_path_getitem_op_binary( 

1567 self, binary, operator, _cast_applied=False, **kw 

1568 ): 

1569 if ( 

1570 not _cast_applied 

1571 and binary.type._type_affinity is not sqltypes.JSON 

1572 ): 

1573 kw["_cast_applied"] = True 

1574 return self.process(sql.cast(binary, binary.type), **kw) 

1575 

1576 if binary.type._type_affinity is sqltypes.JSON: 

1577 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1578 else: 

1579 expr = "JSON_EXTRACT(%s, %s)" 

1580 

1581 return expr % ( 

1582 self.process(binary.left, **kw), 

1583 self.process(binary.right, **kw), 

1584 ) 

1585 

1586 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

1587 # slightly old SQLite versions don't seem to be able to handle 

1588 # the empty set impl 

1589 return self.visit_empty_set_expr(type_) 

1590 

1591 def visit_empty_set_expr(self, element_types, **kw): 

1592 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( 

1593 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1594 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1595 ) 

1596 

1597 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1598 return self._generate_generic_binary(binary, " REGEXP ", **kw) 

1599 

1600 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1601 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) 

1602 

1603 def _on_conflict_target(self, clause, **kw): 

1604 if clause.inferred_target_elements is not None: 

1605 target_text = "(%s)" % ", ".join( 

1606 ( 

1607 self.preparer.quote(c) 

1608 if isinstance(c, str) 

1609 else self.process(c, include_table=False, use_schema=False) 

1610 ) 

1611 for c in clause.inferred_target_elements 

1612 ) 

1613 if clause.inferred_target_whereclause is not None: 

1614 whereclause_kw = dict(kw) 

1615 whereclause_kw.update( 

1616 include_table=False, 

1617 use_schema=False, 

1618 literal_execute=True, 

1619 ) 

1620 target_text += " WHERE %s" % self.process( 

1621 clause.inferred_target_whereclause, 

1622 **whereclause_kw, 

1623 ) 

1624 

1625 else: 

1626 target_text = "" 

1627 

1628 return target_text 

1629 

1630 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

1631 target_text = self._on_conflict_target(on_conflict, **kw) 

1632 

1633 if target_text: 

1634 return "ON CONFLICT %s DO NOTHING" % target_text 

1635 else: 

1636 return "ON CONFLICT DO NOTHING" 

1637 

1638 def visit_on_conflict_do_update(self, on_conflict, **kw): 

1639 clause = on_conflict 

1640 

1641 target_text = self._on_conflict_target(on_conflict, **kw) 

1642 

1643 action_set_ops = [] 

1644 

1645 set_parameters = dict(clause.update_values_to_set) 

1646 # create a list of column assignment clauses as tuples 

1647 

1648 insert_statement = self.stack[-1]["selectable"] 

1649 cols = insert_statement.table.c 

1650 set_kw = dict(kw) 

1651 set_kw.update(use_schema=False) 

1652 for c in cols: 

1653 col_key = c.key 

1654 

1655 if col_key in set_parameters: 

1656 value = set_parameters.pop(col_key) 

1657 elif c in set_parameters: 

1658 value = set_parameters.pop(c) 

1659 else: 

1660 continue 

1661 

1662 if ( 

1663 isinstance(value, elements.BindParameter) 

1664 and value.type._isnull 

1665 ): 

1666 value = value._with_binary_element_type(c.type) 

1667 

1668 value_text = self.process( 

1669 value.self_group(), is_upsert_set=True, **set_kw 

1670 ) 

1671 

1672 key_text = self.preparer.quote(c.name) 

1673 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1674 

1675 # check for names that don't match columns 

1676 if set_parameters: 

1677 util.warn( 

1678 "Additional column names not matching " 

1679 "any column keys in table '%s': %s" 

1680 % ( 

1681 self.current_executable.table.name, 

1682 (", ".join("'%s'" % c for c in set_parameters)), 

1683 ) 

1684 ) 

1685 for k, v in set_parameters.items(): 

1686 key_text = ( 

1687 self.preparer.quote(k) 

1688 if isinstance(k, str) 

1689 else self.process(k, **set_kw) 

1690 ) 

1691 value_text = self.process( 

1692 coercions.expect(roles.ExpressionElementRole, v), 

1693 is_upsert_set=True, 

1694 **set_kw, 

1695 ) 

1696 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1697 

1698 action_text = ", ".join(action_set_ops) 

1699 if clause.update_whereclause is not None: 

1700 where_kw = dict(kw) 

1701 where_kw.update(include_table=True, use_schema=False) 

1702 action_text += " WHERE %s" % self.process( 

1703 clause.update_whereclause, **where_kw 

1704 ) 

1705 

1706 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

1707 

1708 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1709 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)". 

1710 kw["eager_grouping"] = True 

1711 or_ = self._generate_generic_binary(binary, " | ", **kw) 

1712 and_ = self._generate_generic_binary(binary, " & ", **kw) 

1713 return f"({or_} - {and_})" 

1714 

1715 

1716class SQLiteDDLCompiler(compiler.DDLCompiler): 

1717 def get_column_specification(self, column, **kwargs): 

1718 coltype = self.dialect.type_compiler_instance.process( 

1719 column.type, type_expression=column 

1720 ) 

1721 colspec = self.preparer.format_column(column) + " " + coltype 

1722 default = self.get_column_default_string(column) 

1723 if default is not None: 

1724 

1725 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match( 

1726 r".*\W.*", default 

1727 ): 

1728 colspec += f" DEFAULT ({default})" 

1729 else: 

1730 colspec += f" DEFAULT {default}" 

1731 

1732 if not column.nullable: 

1733 colspec += " NOT NULL" 

1734 

1735 on_conflict_clause = column.dialect_options["sqlite"][ 

1736 "on_conflict_not_null" 

1737 ] 

1738 if on_conflict_clause is not None: 

1739 colspec += " ON CONFLICT " + on_conflict_clause 

1740 

1741 if column.primary_key: 

1742 if ( 

1743 column.autoincrement is True 

1744 and len(column.table.primary_key.columns) != 1 

1745 ): 

1746 raise exc.CompileError( 

1747 "SQLite does not support autoincrement for " 

1748 "composite primary keys" 

1749 ) 

1750 

1751 if ( 

1752 column.table.dialect_options["sqlite"]["autoincrement"] 

1753 and len(column.table.primary_key.columns) == 1 

1754 and issubclass(column.type._type_affinity, sqltypes.Integer) 

1755 and not column.foreign_keys 

1756 ): 

1757 colspec += " PRIMARY KEY" 

1758 

1759 on_conflict_clause = column.dialect_options["sqlite"][ 

1760 "on_conflict_primary_key" 

1761 ] 

1762 if on_conflict_clause is not None: 

1763 colspec += " ON CONFLICT " + on_conflict_clause 

1764 

1765 colspec += " AUTOINCREMENT" 

1766 

1767 if column.computed is not None: 

1768 colspec += " " + self.process(column.computed) 

1769 

1770 return colspec 

1771 

1772 def visit_primary_key_constraint(self, constraint, **kw): 

1773 # for columns with sqlite_autoincrement=True, 

1774 # the PRIMARY KEY constraint can only be inline 

1775 # with the column itself. 

1776 if len(constraint.columns) == 1: 

1777 c = list(constraint)[0] 

1778 if ( 

1779 c.primary_key 

1780 and c.table.dialect_options["sqlite"]["autoincrement"] 

1781 and issubclass(c.type._type_affinity, sqltypes.Integer) 

1782 and not c.foreign_keys 

1783 ): 

1784 return None 

1785 

1786 text = super().visit_primary_key_constraint(constraint) 

1787 

1788 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1789 "on_conflict" 

1790 ] 

1791 if on_conflict_clause is None and len(constraint.columns) == 1: 

1792 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ 

1793 "on_conflict_primary_key" 

1794 ] 

1795 

1796 if on_conflict_clause is not None: 

1797 text += " ON CONFLICT " + on_conflict_clause 

1798 

1799 return text 

1800 

1801 def visit_unique_constraint(self, constraint, **kw): 

1802 text = super().visit_unique_constraint(constraint) 

1803 

1804 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1805 "on_conflict" 

1806 ] 

1807 if on_conflict_clause is None and len(constraint.columns) == 1: 

1808 col1 = list(constraint)[0] 

1809 if isinstance(col1, schema.SchemaItem): 

1810 on_conflict_clause = list(constraint)[0].dialect_options[ 

1811 "sqlite" 

1812 ]["on_conflict_unique"] 

1813 

1814 if on_conflict_clause is not None: 

1815 text += " ON CONFLICT " + on_conflict_clause 

1816 

1817 return text 

1818 

1819 def visit_check_constraint(self, constraint, **kw): 

1820 text = super().visit_check_constraint(constraint) 

1821 

1822 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1823 "on_conflict" 

1824 ] 

1825 

1826 if on_conflict_clause is not None: 

1827 text += " ON CONFLICT " + on_conflict_clause 

1828 

1829 return text 

1830 

1831 def visit_column_check_constraint(self, constraint, **kw): 

1832 text = super().visit_column_check_constraint(constraint) 

1833 

1834 if constraint.dialect_options["sqlite"]["on_conflict"] is not None: 

1835 raise exc.CompileError( 

1836 "SQLite does not support on conflict clause for " 

1837 "column check constraint" 

1838 ) 

1839 

1840 return text 

1841 

1842 def visit_foreign_key_constraint(self, constraint, **kw): 

1843 local_table = constraint.elements[0].parent.table 

1844 remote_table = constraint.elements[0].column.table 

1845 

1846 if local_table.schema != remote_table.schema: 

1847 return None 

1848 else: 

1849 return super().visit_foreign_key_constraint(constraint) 

1850 

1851 def define_constraint_remote_table(self, constraint, table, preparer): 

1852 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

1853 

1854 return preparer.format_table(table, use_schema=False) 

1855 

1856 def visit_create_index( 

1857 self, create, include_schema=False, include_table_schema=True, **kw 

1858 ): 

1859 index = create.element 

1860 self._verify_index_table(index) 

1861 preparer = self.preparer 

1862 text = "CREATE " 

1863 if index.unique: 

1864 text += "UNIQUE " 

1865 

1866 text += "INDEX " 

1867 

1868 if create.if_not_exists: 

1869 text += "IF NOT EXISTS " 

1870 

1871 text += "%s ON %s (%s)" % ( 

1872 self._prepared_index_name(index, include_schema=True), 

1873 preparer.format_table(index.table, use_schema=False), 

1874 ", ".join( 

1875 self.sql_compiler.process( 

1876 expr, include_table=False, literal_binds=True 

1877 ) 

1878 for expr in index.expressions 

1879 ), 

1880 ) 

1881 

1882 whereclause = index.dialect_options["sqlite"]["where"] 

1883 if whereclause is not None: 

1884 where_compiled = self.sql_compiler.process( 

1885 whereclause, include_table=False, literal_binds=True 

1886 ) 

1887 text += " WHERE " + where_compiled 

1888 

1889 return text 

1890 

1891 def post_create_table(self, table): 

1892 table_options = [] 

1893 

1894 if not table.dialect_options["sqlite"]["with_rowid"]: 

1895 table_options.append("WITHOUT ROWID") 

1896 

1897 if table.dialect_options["sqlite"]["strict"]: 

1898 table_options.append("STRICT") 

1899 

1900 if table_options: 

1901 return "\n " + ",\n ".join(table_options) 

1902 else: 

1903 return "" 

1904 

1905 def visit_create_view(self, create, **kw): 

1906 """Handle SQLite if_not_exists dialect option for CREATE VIEW.""" 

1907 # Get the if_not_exists dialect option from the CreateView object 

1908 if_not_exists = create.dialect_options["sqlite"].get( 

1909 "if_not_exists", False 

1910 ) 

1911 

1912 # Pass if_not_exists through kw to the parent's _generate_table_select 

1913 kw["if_not_exists"] = if_not_exists 

1914 return super().visit_create_view(create, **kw) 

1915 

1916 

1917class SQLiteTypeCompiler(compiler.GenericTypeCompiler): 

1918 def visit_large_binary(self, type_, **kw): 

1919 return self.visit_BLOB(type_) 

1920 

1921 def visit_DATETIME(self, type_, **kw): 

1922 if ( 

1923 not isinstance(type_, _DateTimeMixin) 

1924 or type_.format_is_text_affinity 

1925 ): 

1926 return super().visit_DATETIME(type_) 

1927 else: 

1928 return "DATETIME_CHAR" 

1929 

1930 def visit_DATE(self, type_, **kw): 

1931 if ( 

1932 not isinstance(type_, _DateTimeMixin) 

1933 or type_.format_is_text_affinity 

1934 ): 

1935 return super().visit_DATE(type_) 

1936 else: 

1937 return "DATE_CHAR" 

1938 

1939 def visit_TIME(self, type_, **kw): 

1940 if ( 

1941 not isinstance(type_, _DateTimeMixin) 

1942 or type_.format_is_text_affinity 

1943 ): 

1944 return super().visit_TIME(type_) 

1945 else: 

1946 return "TIME_CHAR" 

1947 

1948 def visit_JSON(self, type_, **kw): 

1949 # note this name provides NUMERIC affinity, not TEXT. 

1950 # should not be an issue unless the JSON value consists of a single 

1951 # numeric value. JSONTEXT can be used if this case is required. 

1952 return "JSON" 

1953 

1954 def visit_JSONB(self, type_, **kw): 

1955 return "JSONB" 

1956 

1957 

1958class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 

1959 reserved_words = { 

1960 "add", 

1961 "after", 

1962 "all", 

1963 "alter", 

1964 "analyze", 

1965 "and", 

1966 "as", 

1967 "asc", 

1968 "attach", 

1969 "autoincrement", 

1970 "before", 

1971 "begin", 

1972 "between", 

1973 "by", 

1974 "cascade", 

1975 "case", 

1976 "cast", 

1977 "check", 

1978 "collate", 

1979 "column", 

1980 "commit", 

1981 "conflict", 

1982 "constraint", 

1983 "create", 

1984 "cross", 

1985 "current_date", 

1986 "current_time", 

1987 "current_timestamp", 

1988 "database", 

1989 "default", 

1990 "deferrable", 

1991 "deferred", 

1992 "delete", 

1993 "desc", 

1994 "detach", 

1995 "distinct", 

1996 "drop", 

1997 "each", 

1998 "else", 

1999 "end", 

2000 "escape", 

2001 "except", 

2002 "exclusive", 

2003 "exists", 

2004 "explain", 

2005 "false", 

2006 "fail", 

2007 "for", 

2008 "foreign", 

2009 "from", 

2010 "full", 

2011 "glob", 

2012 "group", 

2013 "having", 

2014 "if", 

2015 "ignore", 

2016 "immediate", 

2017 "in", 

2018 "index", 

2019 "indexed", 

2020 "initially", 

2021 "inner", 

2022 "insert", 

2023 "instead", 

2024 "intersect", 

2025 "into", 

2026 "is", 

2027 "isnull", 

2028 "join", 

2029 "key", 

2030 "left", 

2031 "like", 

2032 "limit", 

2033 "match", 

2034 "natural", 

2035 "not", 

2036 "notnull", 

2037 "null", 

2038 "of", 

2039 "offset", 

2040 "on", 

2041 "or", 

2042 "order", 

2043 "outer", 

2044 "plan", 

2045 "pragma", 

2046 "primary", 

2047 "query", 

2048 "raise", 

2049 "references", 

2050 "reindex", 

2051 "rename", 

2052 "replace", 

2053 "restrict", 

2054 "right", 

2055 "rollback", 

2056 "row", 

2057 "select", 

2058 "set", 

2059 "table", 

2060 "temp", 

2061 "temporary", 

2062 "then", 

2063 "to", 

2064 "transaction", 

2065 "trigger", 

2066 "true", 

2067 "union", 

2068 "unique", 

2069 "update", 

2070 "using", 

2071 "vacuum", 

2072 "values", 

2073 "view", 

2074 "virtual", 

2075 "when", 

2076 "where", 

2077 } 

2078 

2079 

2080class SQLiteExecutionContext(default.DefaultExecutionContext): 

2081 @util.memoized_property 

2082 def _preserve_raw_colnames(self): 

2083 return ( 

2084 not self.dialect._broken_dotted_colnames 

2085 or self.execution_options.get("sqlite_raw_colnames", False) 

2086 ) 

2087 

2088 def _translate_colname(self, colname): 

2089 # TODO: detect SQLite version 3.10.0 or greater; 

2090 # see [ticket:3633] 

2091 

2092 # adjust for dotted column names. SQLite 

2093 # in the case of UNION may store col names as 

2094 # "tablename.colname", or if using an attached database, 

2095 # "database.tablename.colname", in cursor.description 

2096 if not self._preserve_raw_colnames and "." in colname: 

2097 return colname.split(".")[-1], colname 

2098 else: 

2099 return colname, None 

2100 

2101 

2102class SQLiteDialect(default.DefaultDialect): 

2103 name = "sqlite" 

2104 supports_alter = False 

2105 

2106 # SQlite supports "DEFAULT VALUES" but *does not* support 

2107 # "VALUES (DEFAULT)" 

2108 supports_default_values = True 

2109 supports_default_metavalue = False 

2110 

2111 # sqlite issue: 

2112 # https://github.com/python/cpython/issues/93421 

2113 # note this parameter is no longer used by the ORM or default dialect 

2114 # see #9414 

2115 supports_sane_rowcount_returning = False 

2116 

2117 supports_empty_insert = False 

2118 supports_cast = True 

2119 supports_multivalues_insert = True 

2120 use_insertmanyvalues = True 

2121 tuple_in_values = True 

2122 supports_statement_cache = True 

2123 insert_null_pk_still_autoincrements = True 

2124 insert_returning = True 

2125 update_returning = True 

2126 update_returning_multifrom = True 

2127 delete_returning = True 

2128 update_returning_multifrom = True 

2129 

2130 supports_default_metavalue = True 

2131 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

2132 

2133 default_metavalue_token = "NULL" 

2134 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

2135 parenthesis.""" 

2136 

2137 default_paramstyle = "qmark" 

2138 execution_ctx_cls = SQLiteExecutionContext 

2139 statement_compiler = SQLiteCompiler 

2140 ddl_compiler = SQLiteDDLCompiler 

2141 type_compiler_cls = SQLiteTypeCompiler 

2142 preparer = SQLiteIdentifierPreparer 

2143 ischema_names = ischema_names 

2144 colspecs = colspecs 

2145 

2146 construct_arguments = [ 

2147 ( 

2148 sa_schema.Table, 

2149 { 

2150 "autoincrement": False, 

2151 "with_rowid": True, 

2152 "strict": False, 

2153 }, 

2154 ), 

2155 (sa_schema.Index, {"where": None}), 

2156 ( 

2157 sa_schema.Column, 

2158 { 

2159 "on_conflict_primary_key": None, 

2160 "on_conflict_not_null": None, 

2161 "on_conflict_unique": None, 

2162 }, 

2163 ), 

2164 (sa_schema.Constraint, {"on_conflict": None}), 

2165 (sa_ddl.CreateView, {"if_not_exists": False}), 

2166 ] 

2167 

2168 _broken_fk_pragma_quotes = False 

2169 _broken_dotted_colnames = False 

2170 

2171 def __init__( 

2172 self, 

2173 native_datetime: bool = False, 

2174 json_serializer: Callable[[_JSON_VALUE], str] | None = None, 

2175 json_deserializer: Callable[[str], _JSON_VALUE] | None = None, 

2176 **kwargs: Any, 

2177 ) -> None: 

2178 default.DefaultDialect.__init__(self, **kwargs) 

2179 

2180 self._json_serializer = json_serializer 

2181 self._json_deserializer = json_deserializer 

2182 

2183 # this flag used by pysqlite dialect, and perhaps others in the 

2184 # future, to indicate the driver is handling date/timestamp 

2185 # conversions (and perhaps datetime/time as well on some hypothetical 

2186 # driver ?) 

2187 self.native_datetime = native_datetime 

2188 

2189 if self.dbapi is not None: 

2190 if self.dbapi.sqlite_version_info < (3, 7, 16): 

2191 util.warn( 

2192 "SQLite version %s is older than 3.7.16, and will not " 

2193 "support right nested joins, as are sometimes used in " 

2194 "more complex ORM scenarios. SQLAlchemy 1.4 and above " 

2195 "no longer tries to rewrite these joins." 

2196 % (self.dbapi.sqlite_version_info,) 

2197 ) 

2198 

2199 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These 

2200 # version checks are getting very stale. 

2201 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( 

2202 3, 

2203 10, 

2204 0, 

2205 ) 

2206 self.supports_default_values = self.dbapi.sqlite_version_info >= ( 

2207 3, 

2208 3, 

2209 8, 

2210 ) 

2211 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) 

2212 self.supports_multivalues_insert = ( 

2213 # https://www.sqlite.org/releaselog/3_7_11.html 

2214 self.dbapi.sqlite_version_info 

2215 >= (3, 7, 11) 

2216 ) 

2217 # see https://www.sqlalchemy.org/trac/ticket/2568 

2218 # as well as https://www.sqlite.org/src/info/600482d161 

2219 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( 

2220 3, 

2221 6, 

2222 14, 

2223 ) 

2224 

2225 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: 

2226 self.update_returning = self.delete_returning = ( 

2227 self.insert_returning 

2228 ) = False 

2229 

2230 if self.dbapi.sqlite_version_info < (3, 32, 0): 

2231 # https://www.sqlite.org/limits.html 

2232 self.insertmanyvalues_max_parameters = 999 

2233 

2234 _isolation_lookup = util.immutabledict( 

2235 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} 

2236 ) 

2237 

2238 def get_isolation_level_values(self, dbapi_connection): 

2239 return list(self._isolation_lookup) 

2240 

2241 def set_isolation_level( 

2242 self, dbapi_connection: DBAPIConnection, level: IsolationLevel 

2243 ) -> None: 

2244 isolation_level = self._isolation_lookup[level] 

2245 

2246 cursor = dbapi_connection.cursor() 

2247 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") 

2248 cursor.close() 

2249 

2250 def get_isolation_level(self, dbapi_connection): 

2251 cursor = dbapi_connection.cursor() 

2252 cursor.execute("PRAGMA read_uncommitted") 

2253 res = cursor.fetchone() 

2254 if res: 

2255 value = res[0] 

2256 else: 

2257 # https://www.sqlite.org/changes.html#version_3_3_3 

2258 # "Optional READ UNCOMMITTED isolation (instead of the 

2259 # default isolation level of SERIALIZABLE) and 

2260 # table level locking when database connections 

2261 # share a common cache."" 

2262 # pre-SQLite 3.3.0 default to 0 

2263 value = 0 

2264 cursor.close() 

2265 if value == 0: 

2266 return "SERIALIZABLE" 

2267 elif value == 1: 

2268 return "READ UNCOMMITTED" 

2269 else: 

2270 assert False, "Unknown isolation level %s" % value 

2271 

2272 @reflection.cache 

2273 def get_schema_names(self, connection, **kw): 

2274 s = "PRAGMA database_list" 

2275 dl = connection.exec_driver_sql(s) 

2276 

2277 return [db[1] for db in dl if db[1] != "temp"] 

2278 

2279 def _format_schema(self, schema, table_name): 

2280 if schema is not None: 

2281 qschema = self.identifier_preparer.quote_identifier(schema) 

2282 name = f"{qschema}.{table_name}" 

2283 else: 

2284 name = table_name 

2285 return name 

2286 

2287 def _sqlite_main_query( 

2288 self, 

2289 table: str, 

2290 type_: str, 

2291 schema: Optional[str], 

2292 sqlite_include_internal: bool, 

2293 ): 

2294 main = self._format_schema(schema, table) 

2295 if not sqlite_include_internal: 

2296 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" 

2297 else: 

2298 filter_table = "" 

2299 query = ( 

2300 f"SELECT name FROM {main} " 

2301 f"WHERE type='{type_}'{filter_table} " 

2302 "ORDER BY name" 

2303 ) 

2304 return query 

2305 

2306 @reflection.cache 

2307 def get_table_names( 

2308 self, connection, schema=None, sqlite_include_internal=False, **kw 

2309 ): 

2310 query = self._sqlite_main_query( 

2311 "sqlite_master", "table", schema, sqlite_include_internal 

2312 ) 

2313 names = connection.exec_driver_sql(query).scalars().all() 

2314 return names 

2315 

2316 @reflection.cache 

2317 def get_temp_table_names( 

2318 self, connection, sqlite_include_internal=False, **kw 

2319 ): 

2320 query = self._sqlite_main_query( 

2321 "sqlite_temp_master", "table", None, sqlite_include_internal 

2322 ) 

2323 names = connection.exec_driver_sql(query).scalars().all() 

2324 return names 

2325 

2326 @reflection.cache 

2327 def get_temp_view_names( 

2328 self, connection, sqlite_include_internal=False, **kw 

2329 ): 

2330 query = self._sqlite_main_query( 

2331 "sqlite_temp_master", "view", None, sqlite_include_internal 

2332 ) 

2333 names = connection.exec_driver_sql(query).scalars().all() 

2334 return names 

2335 

2336 @reflection.cache 

2337 def has_table(self, connection, table_name, schema=None, **kw): 

2338 self._ensure_has_table_connection(connection) 

2339 

2340 if schema is not None and schema not in self.get_schema_names( 

2341 connection, **kw 

2342 ): 

2343 return False 

2344 

2345 info = self._get_table_pragma( 

2346 connection, "table_info", table_name, schema=schema 

2347 ) 

2348 return bool(info) 

2349 

2350 def _get_default_schema_name(self, connection): 

2351 return "main" 

2352 

2353 @reflection.cache 

2354 def get_view_names( 

2355 self, connection, schema=None, sqlite_include_internal=False, **kw 

2356 ): 

2357 query = self._sqlite_main_query( 

2358 "sqlite_master", "view", schema, sqlite_include_internal 

2359 ) 

2360 names = connection.exec_driver_sql(query).scalars().all() 

2361 return names 

2362 

2363 @reflection.cache 

2364 def get_view_definition(self, connection, view_name, schema=None, **kw): 

2365 if schema is not None: 

2366 qschema = self.identifier_preparer.quote_identifier(schema) 

2367 master = f"{qschema}.sqlite_master" 

2368 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( 

2369 master, 

2370 ) 

2371 rs = connection.exec_driver_sql(s, (view_name,)) 

2372 else: 

2373 try: 

2374 s = ( 

2375 "SELECT sql FROM " 

2376 " (SELECT * FROM sqlite_master UNION ALL " 

2377 " SELECT * FROM sqlite_temp_master) " 

2378 "WHERE name = ? " 

2379 "AND type='view'" 

2380 ) 

2381 rs = connection.exec_driver_sql(s, (view_name,)) 

2382 except exc.DBAPIError: 

2383 s = ( 

2384 "SELECT sql FROM sqlite_master WHERE name = ? " 

2385 "AND type='view'" 

2386 ) 

2387 rs = connection.exec_driver_sql(s, (view_name,)) 

2388 

2389 result = rs.fetchall() 

2390 if result: 

2391 return result[0].sql 

2392 else: 

2393 raise exc.NoSuchTableError( 

2394 f"{schema}.{view_name}" if schema else view_name 

2395 ) 

2396 

2397 @reflection.cache 

2398 def get_columns(self, connection, table_name, schema=None, **kw): 

2399 pragma = "table_info" 

2400 # computed columns are threaded as hidden, they require table_xinfo 

2401 if self.server_version_info >= (3, 31): 

2402 pragma = "table_xinfo" 

2403 info = self._get_table_pragma( 

2404 connection, pragma, table_name, schema=schema 

2405 ) 

2406 columns = [] 

2407 tablesql = None 

2408 for row in info: 

2409 name = row[1] 

2410 type_ = row[2].upper() 

2411 nullable = not row[3] 

2412 default = row[4] 

2413 primary_key = row[5] 

2414 hidden = row[6] if pragma == "table_xinfo" else 0 

2415 

2416 # hidden has value 0 for normal columns, 1 for hidden columns, 

2417 # 2 for computed virtual columns and 3 for computed stored columns 

2418 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b 

2419 if hidden == 1: 

2420 continue 

2421 

2422 generated = bool(hidden) 

2423 persisted = hidden == 3 

2424 

2425 if tablesql is None and generated: 

2426 tablesql = self._get_table_sql( 

2427 connection, table_name, schema, **kw 

2428 ) 

2429 # remove create table 

2430 match = re.match( 

2431 ( 

2432 r"create table .*?\((.*)\)" 

2433 r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$" 

2434 ), 

2435 tablesql.strip(), 

2436 re.DOTALL | re.IGNORECASE, 

2437 ) 

2438 assert match, f"create table not found in {tablesql}" 

2439 tablesql = match.group(1).strip() 

2440 

2441 columns.append( 

2442 self._get_column_info( 

2443 name, 

2444 type_, 

2445 nullable, 

2446 default, 

2447 primary_key, 

2448 generated, 

2449 persisted, 

2450 tablesql, 

2451 ) 

2452 ) 

2453 if columns: 

2454 return columns 

2455 elif not self.has_table(connection, table_name, schema): 

2456 raise exc.NoSuchTableError( 

2457 f"{schema}.{table_name}" if schema else table_name 

2458 ) 

2459 else: 

2460 return ReflectionDefaults.columns() 

2461 

2462 def _get_column_info( 

2463 self, 

2464 name, 

2465 type_, 

2466 nullable, 

2467 default, 

2468 primary_key, 

2469 generated, 

2470 persisted, 

2471 tablesql, 

2472 ): 

2473 if generated: 

2474 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" 

2475 # somehow is "INTEGER GENERATED ALWAYS" 

2476 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) 

2477 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() 

2478 

2479 coltype = self._resolve_type_affinity(type_) 

2480 

2481 if default is not None: 

2482 default = str(default) 

2483 

2484 colspec = { 

2485 "name": name, 

2486 "type": coltype, 

2487 "nullable": nullable, 

2488 "default": default, 

2489 "primary_key": primary_key, 

2490 } 

2491 if generated: 

2492 sqltext = "" 

2493 if tablesql: 

2494 pattern = ( 

2495 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS" 

2496 r"\s+\((.*)\)\s*(?:virtual|stored)?" 

2497 ) 

2498 match = re.search( 

2499 re.escape(name) + pattern, tablesql, re.IGNORECASE 

2500 ) 

2501 if match: 

2502 sqltext = match.group(1) 

2503 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} 

2504 return colspec 

2505 

2506 def _resolve_type_affinity(self, type_): 

2507 """Return a data type from a reflected column, using affinity rules. 

2508 

2509 SQLite's goal for universal compatibility introduces some complexity 

2510 during reflection, as a column's defined type might not actually be a 

2511 type that SQLite understands - or indeed, my not be defined *at all*. 

2512 Internally, SQLite handles this with a 'data type affinity' for each 

2513 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', 

2514 'REAL', or 'NONE' (raw bits). The algorithm that determines this is 

2515 listed in https://www.sqlite.org/datatype3.html section 2.1. 

2516 

2517 This method allows SQLAlchemy to support that algorithm, while still 

2518 providing access to smarter reflection utilities by recognizing 

2519 column definitions that SQLite only supports through affinity (like 

2520 DATE and DOUBLE). 

2521 

2522 """ 

2523 match = re.match(r"([\w ]+)(\(.*?\))?", type_) 

2524 if match: 

2525 coltype = match.group(1) 

2526 args = match.group(2) 

2527 else: 

2528 coltype = "" 

2529 args = "" 

2530 

2531 if coltype in self.ischema_names: 

2532 coltype = self.ischema_names[coltype] 

2533 elif "INT" in coltype: 

2534 coltype = sqltypes.INTEGER 

2535 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: 

2536 coltype = sqltypes.TEXT 

2537 elif "BLOB" in coltype or not coltype: 

2538 coltype = sqltypes.NullType 

2539 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: 

2540 coltype = sqltypes.REAL 

2541 else: 

2542 coltype = sqltypes.NUMERIC 

2543 

2544 if args is not None: 

2545 args = re.findall(r"(\d+)", args) 

2546 try: 

2547 coltype = coltype(*[int(a) for a in args]) 

2548 except TypeError: 

2549 util.warn( 

2550 "Could not instantiate type %s with " 

2551 "reflected arguments %s; using no arguments." 

2552 % (coltype, args) 

2553 ) 

2554 coltype = coltype() 

2555 else: 

2556 coltype = coltype() 

2557 

2558 return coltype 

2559 

2560 @reflection.cache 

2561 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

2562 constraint_name = None 

2563 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2564 if table_data: 

2565 PK_PATTERN = r'CONSTRAINT +(?:"(.+?)"|(\w+)) +PRIMARY KEY' 

2566 result = re.search(PK_PATTERN, table_data, re.I) 

2567 if result: 

2568 constraint_name = result.group(1) or result.group(2) 

2569 else: 

2570 constraint_name = None 

2571 

2572 cols = self.get_columns(connection, table_name, schema, **kw) 

2573 # consider only pk columns. This also avoids sorting the cached 

2574 # value returned by get_columns 

2575 cols = [col for col in cols if col.get("primary_key", 0) > 0] 

2576 cols.sort(key=lambda col: col.get("primary_key")) 

2577 pkeys = [col["name"] for col in cols] 

2578 

2579 if pkeys: 

2580 return {"constrained_columns": pkeys, "name": constraint_name} 

2581 else: 

2582 return ReflectionDefaults.pk_constraint() 

2583 

2584 @reflection.cache 

2585 def get_foreign_keys(self, connection, table_name, schema=None, **kw): 

2586 # sqlite makes this *extremely difficult*. 

2587 # First, use the pragma to get the actual FKs. 

2588 pragma_fks = self._get_table_pragma( 

2589 connection, "foreign_key_list", table_name, schema=schema 

2590 ) 

2591 

2592 fks = {} 

2593 

2594 for row in pragma_fks: 

2595 numerical_id, rtbl, lcol, rcol = (row[0], row[2], row[3], row[4]) 

2596 

2597 if not rcol: 

2598 # no referred column, which means it was not named in the 

2599 # original DDL. The referred columns of the foreign key 

2600 # constraint are therefore the primary key of the referred 

2601 # table. 

2602 try: 

2603 referred_pk = self.get_pk_constraint( 

2604 connection, rtbl, schema=schema, **kw 

2605 ) 

2606 referred_columns = referred_pk["constrained_columns"] 

2607 except exc.NoSuchTableError: 

2608 # ignore not existing parents 

2609 referred_columns = [] 

2610 else: 

2611 # note we use this list only if this is the first column 

2612 # in the constraint. for subsequent columns we ignore the 

2613 # list and append "rcol" if present. 

2614 referred_columns = [] 

2615 

2616 if self._broken_fk_pragma_quotes: 

2617 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) 

2618 

2619 if numerical_id in fks: 

2620 fk = fks[numerical_id] 

2621 else: 

2622 fk = fks[numerical_id] = { 

2623 "name": None, 

2624 "constrained_columns": [], 

2625 "referred_schema": schema, 

2626 "referred_table": rtbl, 

2627 "referred_columns": referred_columns, 

2628 "options": {}, 

2629 } 

2630 fks[numerical_id] = fk 

2631 

2632 fk["constrained_columns"].append(lcol) 

2633 

2634 if rcol: 

2635 fk["referred_columns"].append(rcol) 

2636 

2637 def fk_sig(constrained_columns, referred_table, referred_columns): 

2638 return ( 

2639 tuple(constrained_columns) 

2640 + (referred_table,) 

2641 + tuple(referred_columns) 

2642 ) 

2643 

2644 # then, parse the actual SQL and attempt to find DDL that matches 

2645 # the names as well. SQLite saves the DDL in whatever format 

2646 # it was typed in as, so need to be liberal here. 

2647 

2648 keys_by_signature = { 

2649 fk_sig( 

2650 fk["constrained_columns"], 

2651 fk["referred_table"], 

2652 fk["referred_columns"], 

2653 ): fk 

2654 for fk in fks.values() 

2655 } 

2656 

2657 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2658 

2659 def parse_fks(): 

2660 if table_data is None: 

2661 # system tables, etc. 

2662 return 

2663 

2664 # note that we already have the FKs from PRAGMA above. This whole 

2665 # regexp thing is trying to locate additional detail about the 

2666 # FKs, namely the name of the constraint and other options. 

2667 # so parsing the columns is really about matching it up to what 

2668 # we already have. 

2669 FK_PATTERN = ( 

2670 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?' 

2671 r"FOREIGN KEY *\( *(.+?) *\) +" 

2672 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501 

2673 r"((?:ON (?:DELETE|UPDATE) " 

2674 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" 

2675 r"((?:NOT +)?DEFERRABLE)?" 

2676 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" 

2677 ) 

2678 for match in re.finditer(FK_PATTERN, table_data, re.I): 

2679 ( 

2680 constraint_quoted_name, 

2681 constraint_name, 

2682 constrained_columns, 

2683 referred_quoted_name, 

2684 referred_name, 

2685 referred_columns, 

2686 onupdatedelete, 

2687 deferrable, 

2688 initially, 

2689 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8, 9) 

2690 constraint_name = constraint_quoted_name or constraint_name 

2691 constrained_columns = list( 

2692 self._find_cols_in_sig(constrained_columns) 

2693 ) 

2694 if not referred_columns: 

2695 referred_columns = constrained_columns 

2696 else: 

2697 referred_columns = list( 

2698 self._find_cols_in_sig(referred_columns) 

2699 ) 

2700 referred_name = referred_quoted_name or referred_name 

2701 options = {} 

2702 

2703 for token in re.split(r" *\bON\b *", onupdatedelete.upper()): 

2704 if token.startswith("DELETE"): 

2705 ondelete = token[6:].strip() 

2706 if ondelete and ondelete != "NO ACTION": 

2707 options["ondelete"] = ondelete 

2708 elif token.startswith("UPDATE"): 

2709 onupdate = token[6:].strip() 

2710 if onupdate and onupdate != "NO ACTION": 

2711 options["onupdate"] = onupdate 

2712 

2713 if deferrable: 

2714 options["deferrable"] = "NOT" not in deferrable.upper() 

2715 if initially: 

2716 options["initially"] = initially.upper() 

2717 

2718 yield ( 

2719 constraint_name, 

2720 constrained_columns, 

2721 referred_name, 

2722 referred_columns, 

2723 options, 

2724 ) 

2725 

2726 fkeys = [] 

2727 

2728 for ( 

2729 constraint_name, 

2730 constrained_columns, 

2731 referred_name, 

2732 referred_columns, 

2733 options, 

2734 ) in parse_fks(): 

2735 sig = fk_sig(constrained_columns, referred_name, referred_columns) 

2736 if sig not in keys_by_signature: 

2737 util.warn( 

2738 "WARNING: SQL-parsed foreign key constraint " 

2739 "'%s' could not be located in PRAGMA " 

2740 "foreign_keys for table %s" % (sig, table_name) 

2741 ) 

2742 continue 

2743 key = keys_by_signature.pop(sig) 

2744 key["name"] = constraint_name 

2745 key["options"] = options 

2746 fkeys.append(key) 

2747 # assume the remainders are the unnamed, inline constraints, just 

2748 # use them as is as it's extremely difficult to parse inline 

2749 # constraints 

2750 fkeys.extend(keys_by_signature.values()) 

2751 if fkeys: 

2752 return fkeys 

2753 else: 

2754 return ReflectionDefaults.foreign_keys() 

2755 

2756 def _find_cols_in_sig(self, sig): 

2757 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): 

2758 yield match.group(1) or match.group(2) 

2759 

2760 @reflection.cache 

2761 def get_unique_constraints( 

2762 self, connection, table_name, schema=None, **kw 

2763 ): 

2764 auto_index_by_sig = {} 

2765 for idx in self.get_indexes( 

2766 connection, 

2767 table_name, 

2768 schema=schema, 

2769 include_auto_indexes=True, 

2770 **kw, 

2771 ): 

2772 if not idx["name"].startswith("sqlite_autoindex"): 

2773 continue 

2774 sig = tuple(idx["column_names"]) 

2775 auto_index_by_sig[sig] = idx 

2776 

2777 table_data = self._get_table_sql( 

2778 connection, table_name, schema=schema, **kw 

2779 ) 

2780 unique_constraints = [] 

2781 

2782 def parse_uqs(): 

2783 if table_data is None: 

2784 return 

2785 UNIQUE_PATTERN = ( 

2786 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?UNIQUE *\((.+?)\)' 

2787 ) 

2788 INLINE_UNIQUE_PATTERN = ( 

2789 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]' 

2790 r"+[a-z0-9_ ]+?[\t ]+UNIQUE" 

2791 ) 

2792 

2793 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): 

2794 quoted_name, unquoted_name, cols = match.group(1, 2, 3) 

2795 name = quoted_name or unquoted_name 

2796 yield name, list(self._find_cols_in_sig(cols)) 

2797 

2798 # we need to match inlines as well, as we seek to differentiate 

2799 # a UNIQUE constraint from a UNIQUE INDEX, even though these 

2800 # are kind of the same thing :) 

2801 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): 

2802 cols = list( 

2803 self._find_cols_in_sig(match.group(1) or match.group(2)) 

2804 ) 

2805 yield None, cols 

2806 

2807 for name, cols in parse_uqs(): 

2808 sig = tuple(cols) 

2809 if sig in auto_index_by_sig: 

2810 auto_index_by_sig.pop(sig) 

2811 parsed_constraint = {"name": name, "column_names": cols} 

2812 unique_constraints.append(parsed_constraint) 

2813 # NOTE: auto_index_by_sig might not be empty here, 

2814 # the PRIMARY KEY may have an entry. 

2815 if unique_constraints: 

2816 return unique_constraints 

2817 else: 

2818 return ReflectionDefaults.unique_constraints() 

2819 

2820 @reflection.cache 

2821 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

2822 table_data = self._get_table_sql( 

2823 connection, table_name, schema=schema, **kw 

2824 ) 

2825 

2826 # Extract CHECK constraints by properly handling balanced parentheses 

2827 # and avoiding false matches when CHECK/CONSTRAINT appear in table 

2828 # names. See #12924 for context. 

2829 # 

2830 # SQLite supports 4 identifier quote styles (see 

2831 # sqlite.org/lang_keywords.html): 

2832 # - Double quotes "..." (standard SQL) 

2833 # - Brackets [...] (MS Access/SQL Server compatibility) 

2834 # - Backticks `...` (MySQL compatibility) 

2835 # - Single quotes '...' (SQLite extension) 

2836 # 

2837 # NOTE: there is not currently a way to parse CHECK constraints that 

2838 # contain newlines as the approach here relies upon each individual 

2839 # CHECK constraint being on a single line by itself. This necessarily 

2840 # makes assumptions as to how the CREATE TABLE was emitted. 

2841 CHECK_PATTERN = re.compile( 

2842 r""" 

2843 (?<![A-Za-z0-9_]) # Negative lookbehind: ensure CHECK is not 

2844 # part of an identifier (e.g., table name 

2845 # like "tableCHECK") 

2846 

2847 (?: # Optional CONSTRAINT clause 

2848 CONSTRAINT\s+ 

2849 ( # Group 1: Constraint name (quoted or unquoted) 

2850 "(?:[^"]|"")+" # Double-quoted: "name" or "na""me" 

2851 |'(?:[^']|'')+' # Single-quoted: 'name' or 'na''me' 

2852 |\[(?:[^\]]|\]\])+\] # Bracket-quoted: [name] or [na]]me] 

2853 |`(?:[^`]|``)+` # Backtick-quoted: `name` or `na``me` 

2854 |\S+ # Unquoted: simple_name 

2855 ) 

2856 \s+ 

2857 )? 

2858 

2859 CHECK\s*\( # CHECK keyword followed by opening paren 

2860 """, 

2861 re.VERBOSE | re.IGNORECASE, 

2862 ) 

2863 cks = [] 

2864 

2865 for match in re.finditer(CHECK_PATTERN, table_data or ""): 

2866 constraint_name = match.group(1) 

2867 

2868 if constraint_name: 

2869 # Remove surrounding quotes if present 

2870 # Double quotes: "name" -> name 

2871 # Single quotes: 'name' -> name 

2872 # Brackets: [name] -> name 

2873 # Backticks: `name` -> name 

2874 constraint_name = re.sub( 

2875 r'^(["\'`])(.+)\1$|^\[(.+)\]$', 

2876 lambda m: m.group(2) or m.group(3), 

2877 constraint_name, 

2878 flags=re.DOTALL, 

2879 ) 

2880 

2881 # Find the matching closing parenthesis by counting balanced parens 

2882 # Must track string context to ignore parens inside string literals 

2883 start = match.end() # Position after 'CHECK (' 

2884 paren_count = 1 

2885 in_single_quote = False 

2886 in_double_quote = False 

2887 

2888 for pos, char in enumerate(table_data[start:], start): 

2889 # Track string literal context 

2890 if char == "'" and not in_double_quote: 

2891 in_single_quote = not in_single_quote 

2892 elif char == '"' and not in_single_quote: 

2893 in_double_quote = not in_double_quote 

2894 # Only count parens when not inside a string literal 

2895 elif not in_single_quote and not in_double_quote: 

2896 if char == "(": 

2897 paren_count += 1 

2898 elif char == ")": 

2899 paren_count -= 1 

2900 if paren_count == 0: 

2901 # Successfully found matching closing parenthesis 

2902 sqltext = table_data[start:pos].strip() 

2903 cks.append( 

2904 {"sqltext": sqltext, "name": constraint_name} 

2905 ) 

2906 break 

2907 

2908 cks.sort(key=lambda d: d["name"] or "~") # sort None as last 

2909 if cks: 

2910 return cks 

2911 else: 

2912 return ReflectionDefaults.check_constraints() 

2913 

2914 @reflection.cache 

2915 def get_indexes(self, connection, table_name, schema=None, **kw): 

2916 pragma_indexes = self._get_table_pragma( 

2917 connection, "index_list", table_name, schema=schema 

2918 ) 

2919 indexes = [] 

2920 

2921 # regular expression to extract the filter predicate of a partial 

2922 # index. this could fail to extract the predicate correctly on 

2923 # indexes created like 

2924 # CREATE INDEX i ON t (col || ') where') WHERE col <> '' 

2925 # but as this function does not support expression-based indexes 

2926 # this case does not occur. 

2927 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) 

2928 

2929 if schema: 

2930 schema_expr = "%s." % self.identifier_preparer.quote_identifier( 

2931 schema 

2932 ) 

2933 else: 

2934 schema_expr = "" 

2935 

2936 include_auto_indexes = kw.pop("include_auto_indexes", False) 

2937 for row in pragma_indexes: 

2938 # ignore implicit primary key index. 

2939 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html 

2940 if not include_auto_indexes and row[1].startswith( 

2941 "sqlite_autoindex" 

2942 ): 

2943 continue 

2944 indexes.append( 

2945 dict( 

2946 name=row[1], 

2947 column_names=[], 

2948 unique=row[2], 

2949 dialect_options={}, 

2950 ) 

2951 ) 

2952 

2953 # check partial indexes 

2954 if len(row) >= 5 and row[4]: 

2955 s = ( 

2956 "SELECT sql FROM %(schema)ssqlite_master " 

2957 "WHERE name = ? " 

2958 "AND type = 'index'" % {"schema": schema_expr} 

2959 ) 

2960 rs = connection.exec_driver_sql(s, (row[1],)) 

2961 index_sql = rs.scalar() 

2962 predicate_match = partial_pred_re.search(index_sql) 

2963 if predicate_match is None: 

2964 # unless the regex is broken this case shouldn't happen 

2965 # because we know this is a partial index, so the 

2966 # definition sql should match the regex 

2967 util.warn( 

2968 "Failed to look up filter predicate of " 

2969 "partial index %s" % row[1] 

2970 ) 

2971 else: 

2972 predicate = predicate_match.group(1) 

2973 indexes[-1]["dialect_options"]["sqlite_where"] = text( 

2974 predicate 

2975 ) 

2976 

2977 # loop thru unique indexes to get the column names. 

2978 for idx in list(indexes): 

2979 pragma_index = self._get_table_pragma( 

2980 connection, "index_info", idx["name"], schema=schema 

2981 ) 

2982 

2983 for row in pragma_index: 

2984 if row[2] is None: 

2985 util.warn( 

2986 "Skipped unsupported reflection of " 

2987 "expression-based index %s" % idx["name"] 

2988 ) 

2989 indexes.remove(idx) 

2990 break 

2991 else: 

2992 idx["column_names"].append(row[2]) 

2993 

2994 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last 

2995 if indexes: 

2996 return indexes 

2997 elif not self.has_table(connection, table_name, schema): 

2998 raise exc.NoSuchTableError( 

2999 f"{schema}.{table_name}" if schema else table_name 

3000 ) 

3001 else: 

3002 return ReflectionDefaults.indexes() 

3003 

3004 def _is_sys_table(self, table_name): 

3005 return table_name in { 

3006 "sqlite_schema", 

3007 "sqlite_master", 

3008 "sqlite_temp_schema", 

3009 "sqlite_temp_master", 

3010 } 

3011 

3012 @reflection.cache 

3013 def _get_table_sql(self, connection, table_name, schema=None, **kw): 

3014 if schema: 

3015 schema_expr = "%s." % ( 

3016 self.identifier_preparer.quote_identifier(schema) 

3017 ) 

3018 else: 

3019 schema_expr = "" 

3020 try: 

3021 s = ( 

3022 "SELECT sql FROM " 

3023 " (SELECT * FROM %(schema)ssqlite_master UNION ALL " 

3024 " SELECT * FROM %(schema)ssqlite_temp_master) " 

3025 "WHERE name = ? " 

3026 "AND type in ('table', 'view')" % {"schema": schema_expr} 

3027 ) 

3028 rs = connection.exec_driver_sql(s, (table_name,)) 

3029 except exc.DBAPIError: 

3030 s = ( 

3031 "SELECT sql FROM %(schema)ssqlite_master " 

3032 "WHERE name = ? " 

3033 "AND type in ('table', 'view')" % {"schema": schema_expr} 

3034 ) 

3035 rs = connection.exec_driver_sql(s, (table_name,)) 

3036 value = rs.scalar() 

3037 if value is None and not self._is_sys_table(table_name): 

3038 raise exc.NoSuchTableError(f"{schema_expr}{table_name}") 

3039 return value 

3040 

3041 def _get_table_pragma(self, connection, pragma, table_name, schema=None): 

3042 quote = self.identifier_preparer.quote_identifier 

3043 if schema is not None: 

3044 statements = [f"PRAGMA {quote(schema)}."] 

3045 else: 

3046 # because PRAGMA looks in all attached databases if no schema 

3047 # given, need to specify "main" schema, however since we want 

3048 # 'temp' tables in the same namespace as 'main', need to run 

3049 # the PRAGMA twice 

3050 statements = ["PRAGMA main.", "PRAGMA temp."] 

3051 

3052 qtable = quote(table_name) 

3053 for statement in statements: 

3054 statement = f"{statement}{pragma}({qtable})" 

3055 cursor = connection.exec_driver_sql(statement) 

3056 if not cursor._soft_closed: 

3057 # work around SQLite issue whereby cursor.description 

3058 # is blank when PRAGMA returns no rows: 

3059 # https://www.sqlite.org/cvstrac/tktview?tn=1884 

3060 result = cursor.fetchall() 

3061 else: 

3062 result = [] 

3063 if result: 

3064 return result 

3065 else: 

3066 return []