Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

822 statements  

1# dialects/sqlite/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r''' 

11.. dialect:: sqlite 

12 :name: SQLite 

13 :normal_support: 3.12+ 

14 :best_effort: 3.7.16+ 

15 

16.. _sqlite_datetime: 

17 

18Date and Time Types 

19------------------- 

20 

21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does 

22not provide out of the box functionality for translating values between Python 

23`datetime` objects and a SQLite-supported format. SQLAlchemy's own 

24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting 

25and parsing functionality when SQLite is used. The implementation classes are 

26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. 

27These types represent dates and times as ISO formatted strings, which also 

28nicely support ordering. There's no reliance on typical "libc" internals for 

29these functions so historical dates are fully supported. 

30 

31Ensuring Text affinity 

32^^^^^^^^^^^^^^^^^^^^^^ 

33 

34The DDL rendered for these types is the standard ``DATE``, ``TIME`` 

35and ``DATETIME`` indicators. However, custom storage formats can also be 

36applied to these types. When the 

37storage format is detected as containing no alpha characters, the DDL for 

38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, 

39so that the column continues to have textual affinity. 

40 

41.. seealso:: 

42 

43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - 

44 in the SQLite documentation 

45 

46.. _sqlite_autoincrement: 

47 

48SQLite Auto Incrementing Behavior 

49---------------------------------- 

50 

51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html 

52 

53Key concepts: 

54 

55* SQLite has an implicit "auto increment" feature that takes place for any 

56 non-composite primary-key column that is specifically created using 

57 "INTEGER PRIMARY KEY" for the type + primary key. 

58 

59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** 

60 equivalent to the implicit autoincrement feature; this keyword is not 

61 recommended for general use. SQLAlchemy does not render this keyword 

62 unless a special SQLite-specific directive is used (see below). However, 

63 it still requires that the column's type is named "INTEGER". 

64 

65Using the AUTOINCREMENT Keyword 

66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

67 

68To specifically render the AUTOINCREMENT keyword on the primary key column 

69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table 

70construct:: 

71 

72 Table( 

73 "sometable", 

74 metadata, 

75 Column("id", Integer, primary_key=True), 

76 sqlite_autoincrement=True, 

77 ) 

78 

79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER 

80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

81 

82SQLite's typing model is based on naming conventions. Among other things, this 

83means that any type name which contains the substring ``"INT"`` will be 

84determined to be of "integer affinity". A type named ``"BIGINT"``, 

85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be 

86of "integer" affinity. However, **the SQLite autoincrement feature, whether 

87implicitly or explicitly enabled, requires that the name of the column's type 

88is exactly the string "INTEGER"**. Therefore, if an application uses a type 

89like :class:`.BigInteger` for a primary key, on SQLite this type will need to 

90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE 

91TABLE`` statement in order for the autoincrement behavior to be available. 

92 

93One approach to achieve this is to use :class:`.Integer` on SQLite 

94only using :meth:`.TypeEngine.with_variant`:: 

95 

96 table = Table( 

97 "my_table", 

98 metadata, 

99 Column( 

100 "id", 

101 BigInteger().with_variant(Integer, "sqlite"), 

102 primary_key=True, 

103 ), 

104 ) 

105 

106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL 

107name to be ``INTEGER`` when compiled against SQLite:: 

108 

109 from sqlalchemy import BigInteger 

110 from sqlalchemy.ext.compiler import compiles 

111 

112 

113 class SLBigInteger(BigInteger): 

114 pass 

115 

116 

117 @compiles(SLBigInteger, "sqlite") 

118 def bi_c(element, compiler, **kw): 

119 return "INTEGER" 

120 

121 

122 @compiles(SLBigInteger) 

123 def bi_c(element, compiler, **kw): 

124 return compiler.visit_BIGINT(element, **kw) 

125 

126 

127 table = Table( 

128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True) 

129 ) 

130 

131.. seealso:: 

132 

133 :meth:`.TypeEngine.with_variant` 

134 

135 :ref:`sqlalchemy.ext.compiler_toplevel` 

136 

137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ 

138 

139.. _sqlite_transactions: 

140 

141Transactions with SQLite and the sqlite3 driver 

142----------------------------------------------- 

143 

144As a file-based database, SQLite's approach to transactions differs from 

145traditional databases in many ways. Additionally, the ``sqlite3`` driver 

146standard with Python (as well as the async version ``aiosqlite`` which builds 

147on top of it) has several quirks, workarounds, and API features in the 

148area of transaction control, all of which generally need to be addressed when 

149constructing a SQLAlchemy application that uses SQLite. 

150 

151Legacy Transaction Mode with the sqlite3 driver 

152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

153 

154The most important aspect of transaction handling with the sqlite3 driver is 

155that it defaults (which will continue through Python 3.15 before being 

156removed in Python 3.16) to legacy transactional behavior which does 

157not strictly follow :pep:`249`. The way in which the driver diverges from the 

158PEP is that it does not "begin" a transaction automatically as dictated by 

159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and 

160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon 

161the first SQL statement of any kind, so that all subsequent operations will 

162be established within a transaction until ``connection.commit()`` has been 

163called. The ``sqlite3`` driver, in an effort to be easier to use in 

164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, 

165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy 

166reasons. Statements such as SAVEPOINT are also skipped. 

167 

168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy 

169mode of operation is referred to as 

170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in 

171effect by default due to the ``Connection.autocommit`` parameter being set to 

172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12, 

173the ``Connection.autocommit`` attribute did not exist. 

174 

175The implications of legacy transaction mode include: 

176 

177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE, 

178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not 

179 started already, leading to the changes by each statement being 

180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very 

181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these 

182 operations even if a transaction were present, however this is no longer the 

183 case. 

184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation 

185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file- 

186 based system that locks the database file entirely for write operations, 

187 preventing COMMIT until all reader transactions (and associated file locks) 

188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT 

189 statements, which causes these SELECT statements to no longer be "repeatable", 

190 failing one of the consistency guarantees of SERIALIZABLE. 

191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not 

192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its 

193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK 

194 of the transaction will not rollback elements that were part of a released 

195 savepoint. 

196 

197Legacy transaction mode first existed in order to facilitate working around 

198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to 

199get "database is locked" errors, particularly when newer features like "write 

200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy 

201transaction mode is still the default mode of operation; disabling it will 

202produce behavior that is more susceptible to locked database errors. However 

203note that **legacy transaction mode will no longer be the default** in a future 

204Python version (3.16 as of this writing). 

205 

206.. _sqlite_enabling_transactions: 

207 

208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver 

209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

210 

211Current SQLAlchemy support allows either for setting the 

212``.Connection.autocommit`` attribute, most directly by using a 

213:func:`._sa.create_engine` parameter, or if on an older version of Python where 

214the attribute is not available, using event hooks to control the behavior of 

215BEGIN. 

216 

217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above) 

218 

219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_, 

220 the most straightforward approach is to set the attribute to its recommended value 

221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``:: 

222 

223 from sqlalchemy import create_engine 

224 

225 engine = create_engine( 

226 "sqlite:///myfile.db", connect_args={"autocommit": False} 

227 ) 

228 

229 This parameter is also passed through when using the aiosqlite driver:: 

230 

231 from sqlalchemy.ext.asyncio import create_async_engine 

232 

233 engine = create_async_engine( 

234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False} 

235 ) 

236 

237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect` 

238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this 

239 attribute on its ``Connection`` object:: 

240 

241 from sqlalchemy import create_engine, event 

242 

243 engine = create_engine("sqlite:///myfile.db") 

244 

245 

246 @event.listens_for(engine, "connect") 

247 def do_connect(dbapi_connection, connection_record): 

248 # enable autocommit=False mode 

249 dbapi_connection.autocommit = False 

250 

251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite) 

252 

253 For older versions of ``sqlite3`` or for cross-compatiblity with older and 

254 newer versions, SQLAlchemy can also take over the job of transaction control. 

255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook 

256 to emit the "BEGIN" command directly, while also disabling SQLite's control 

257 of this command using the :meth:`.PoolEvents.connect` event hook to set the 

258 ``Connection.isolation_level`` attribute to ``None``:: 

259 

260 

261 from sqlalchemy import create_engine, event 

262 

263 engine = create_engine("sqlite:///myfile.db") 

264 

265 

266 @event.listens_for(engine, "connect") 

267 def do_connect(dbapi_connection, connection_record): 

268 # disable sqlite3's emitting of the BEGIN statement entirely. 

269 dbapi_connection.isolation_level = None 

270 

271 

272 @event.listens_for(engine, "begin") 

273 def do_begin(conn): 

274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly 

275 conn.exec_driver_sql("BEGIN") 

276 

277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine`` 

278 as in the example below:: 

279 

280 from sqlalchemy import create_engine, event 

281 from sqlalchemy.ext.asyncio import create_async_engine 

282 

283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 

284 

285 

286 @event.listens_for(engine.sync_engine, "connect") 

287 def do_connect(dbapi_connection, connection_record): 

288 # disable aiosqlite's emitting of the BEGIN statement entirely. 

289 dbapi_connection.isolation_level = None 

290 

291 

292 @event.listens_for(engine.sync_engine, "begin") 

293 def do_begin(conn): 

294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly 

295 conn.exec_driver_sql("BEGIN") 

296 

297.. _sqlite_isolation_level: 

298 

299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite 

300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

301 

302SQLAlchemy has a comprehensive database isolation feature with optional 

303autocommit support that is introduced in the section :ref:`dbapi_autocommit`. 

304 

305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes 

306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible 

307with the non-legacy isolation mode hooks documented in the previous 

308section at :ref:`sqlite_enabling_transactions`. 

309 

310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit, 

311create an engine setting the :paramref:`_sa.create_engine.isolation_level` 

312parameter to "AUTOCOMMIT":: 

313 

314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT") 

315 

316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit`` 

317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL`` 

318as well as hooks that emit ``BEGIN`` should be disabled. 

319 

320Additional Reading for SQLite / sqlite3 transaction control 

321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

322 

323Links with important information on SQLite, the sqlite3 driver, 

324as well as long historical conversations on how things got to their current state: 

325 

326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website 

327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well 

328 as the legacy isolation_level attribute. 

329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github 

330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github 

331 

332 

333INSERT/UPDATE/DELETE...RETURNING 

334--------------------------------- 

335 

336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING`` 

337syntax. ``INSERT..RETURNING`` may be used 

338automatically in some cases in order to fetch newly generated identifiers in 

339place of the traditional approach of using ``cursor.lastrowid``, however 

340``cursor.lastrowid`` is currently still preferred for simple single-statement 

341cases for its better performance. 

342 

343To specify an explicit ``RETURNING`` clause, use the 

344:meth:`._UpdateBase.returning` method on a per-statement basis:: 

345 

346 # INSERT..RETURNING 

347 result = connection.execute( 

348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2) 

349 ) 

350 print(result.all()) 

351 

352 # UPDATE..RETURNING 

353 result = connection.execute( 

354 table.update() 

355 .where(table.c.name == "foo") 

356 .values(name="bar") 

357 .returning(table.c.col1, table.c.col2) 

358 ) 

359 print(result.all()) 

360 

361 # DELETE..RETURNING 

362 result = connection.execute( 

363 table.delete() 

364 .where(table.c.name == "foo") 

365 .returning(table.c.col1, table.c.col2) 

366 ) 

367 print(result.all()) 

368 

369.. versionadded:: 2.0 Added support for SQLite RETURNING 

370 

371 

372.. _sqlite_foreign_keys: 

373 

374Foreign Key Support 

375------------------- 

376 

377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, 

378however by default these constraints have no effect on the operation of the 

379table. 

380 

381Constraint checking on SQLite has three prerequisites: 

382 

383* At least version 3.6.19 of SQLite must be in use 

384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY 

385 or SQLITE_OMIT_TRIGGER symbols enabled. 

386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all 

387 connections before use -- including the initial call to 

388 :meth:`sqlalchemy.schema.MetaData.create_all`. 

389 

390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for 

391new connections through the usage of events:: 

392 

393 from sqlalchemy.engine import Engine 

394 from sqlalchemy import event 

395 

396 

397 @event.listens_for(Engine, "connect") 

398 def set_sqlite_pragma(dbapi_connection, connection_record): 

399 # the sqlite3 driver will not set PRAGMA foreign_keys 

400 # if autocommit=False; set to True temporarily 

401 ac = dbapi_connection.autocommit 

402 dbapi_connection.autocommit = True 

403 

404 cursor = dbapi_connection.cursor() 

405 cursor.execute("PRAGMA foreign_keys=ON") 

406 cursor.close() 

407 

408 # restore previous autocommit setting 

409 dbapi_connection.autocommit = ac 

410 

411.. warning:: 

412 

413 When SQLite foreign keys are enabled, it is **not possible** 

414 to emit CREATE or DROP statements for tables that contain 

415 mutually-dependent foreign key constraints; 

416 to emit the DDL for these tables requires that ALTER TABLE be used to 

417 create or drop these constraints separately, for which SQLite has 

418 no support. 

419 

420.. seealso:: 

421 

422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ 

423 - on the SQLite web site. 

424 

425 :ref:`event_toplevel` - SQLAlchemy event API. 

426 

427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling 

428 mutually-dependent foreign key constraints. 

429 

430.. _sqlite_on_conflict_ddl: 

431 

432ON CONFLICT support for constraints 

433----------------------------------- 

434 

435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for 

436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as 

437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. 

438 

439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied 

440to primary key, unique, check, and not null constraints. In DDL, it is 

441rendered either within the "CONSTRAINT" clause or within the column definition 

442itself depending on the location of the target constraint. To render this 

443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be 

444specified with a string conflict resolution algorithm within the 

445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, 

446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object, 

447there 

448are individual parameters ``sqlite_on_conflict_not_null``, 

449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each 

450correspond to the three types of relevant constraint types that can be 

451indicated from a :class:`_schema.Column` object. 

452 

453.. seealso:: 

454 

455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite 

456 documentation 

457 

458The ``sqlite_on_conflict`` parameters accept a string argument which is just 

459the resolution name to be chosen, which on SQLite can be one of ROLLBACK, 

460ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint 

461that specifies the IGNORE algorithm:: 

462 

463 some_table = Table( 

464 "some_table", 

465 metadata, 

466 Column("id", Integer, primary_key=True), 

467 Column("data", Integer), 

468 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"), 

469 ) 

470 

471The above renders CREATE TABLE DDL as: 

472 

473.. sourcecode:: sql 

474 

475 CREATE TABLE some_table ( 

476 id INTEGER NOT NULL, 

477 data INTEGER, 

478 PRIMARY KEY (id), 

479 UNIQUE (id, data) ON CONFLICT IGNORE 

480 ) 

481 

482 

483When using the :paramref:`_schema.Column.unique` 

484flag to add a UNIQUE constraint 

485to a single column, the ``sqlite_on_conflict_unique`` parameter can 

486be added to the :class:`_schema.Column` as well, which will be added to the 

487UNIQUE constraint in the DDL:: 

488 

489 some_table = Table( 

490 "some_table", 

491 metadata, 

492 Column("id", Integer, primary_key=True), 

493 Column( 

494 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE" 

495 ), 

496 ) 

497 

498rendering: 

499 

500.. sourcecode:: sql 

501 

502 CREATE TABLE some_table ( 

503 id INTEGER NOT NULL, 

504 data INTEGER, 

505 PRIMARY KEY (id), 

506 UNIQUE (data) ON CONFLICT IGNORE 

507 ) 

508 

509To apply the FAIL algorithm for a NOT NULL constraint, 

510``sqlite_on_conflict_not_null`` is used:: 

511 

512 some_table = Table( 

513 "some_table", 

514 metadata, 

515 Column("id", Integer, primary_key=True), 

516 Column( 

517 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL" 

518 ), 

519 ) 

520 

521this renders the column inline ON CONFLICT phrase: 

522 

523.. sourcecode:: sql 

524 

525 CREATE TABLE some_table ( 

526 id INTEGER NOT NULL, 

527 data INTEGER NOT NULL ON CONFLICT FAIL, 

528 PRIMARY KEY (id) 

529 ) 

530 

531 

532Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: 

533 

534 some_table = Table( 

535 "some_table", 

536 metadata, 

537 Column( 

538 "id", 

539 Integer, 

540 primary_key=True, 

541 sqlite_on_conflict_primary_key="FAIL", 

542 ), 

543 ) 

544 

545SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict 

546resolution algorithm is applied to the constraint itself: 

547 

548.. sourcecode:: sql 

549 

550 CREATE TABLE some_table ( 

551 id INTEGER NOT NULL, 

552 PRIMARY KEY (id) ON CONFLICT FAIL 

553 ) 

554 

555.. _sqlite_on_conflict_insert: 

556 

557INSERT...ON CONFLICT (Upsert) 

558----------------------------- 

559 

560.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for 

561 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as 

562 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. 

563 

564From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) 

565of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` 

566statement. A candidate row will only be inserted if that row does not violate 

567any unique or primary key constraints. In the case of a unique constraint violation, a 

568secondary action can occur which can be either "DO UPDATE", indicating that 

569the data in the target row should be updated, or "DO NOTHING", which indicates 

570to silently skip this row. 

571 

572Conflicts are determined using columns that are part of existing unique 

573constraints and indexes. These constraints are identified by stating the 

574columns and conditions that comprise the indexes. 

575 

576SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific 

577:func:`_sqlite.insert()` function, which provides 

578the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` 

579and :meth:`_sqlite.Insert.on_conflict_do_nothing`: 

580 

581.. sourcecode:: pycon+sql 

582 

583 >>> from sqlalchemy.dialects.sqlite import insert 

584 

585 >>> insert_stmt = insert(my_table).values( 

586 ... id="some_existing_id", data="inserted value" 

587 ... ) 

588 

589 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

590 ... index_elements=["id"], set_=dict(data="updated value") 

591 ... ) 

592 

593 >>> print(do_update_stmt) 

594 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

595 ON CONFLICT (id) DO UPDATE SET data = ?{stop} 

596 

597 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

598 

599 >>> print(do_nothing_stmt) 

600 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

601 ON CONFLICT (id) DO NOTHING 

602 

603.. versionadded:: 1.4 

604 

605.. seealso:: 

606 

607 `Upsert 

608 <https://sqlite.org/lang_UPSERT.html>`_ 

609 - in the SQLite documentation. 

610 

611 

612Specifying the Target 

613^^^^^^^^^^^^^^^^^^^^^ 

614 

615Both methods supply the "target" of the conflict using column inference: 

616 

617* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument 

618 specifies a sequence containing string column names, :class:`_schema.Column` 

619 objects, and/or SQL expression elements, which would identify a unique index 

620 or unique constraint. 

621 

622* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` 

623 to infer an index, a partial index can be inferred by also specifying the 

624 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: 

625 

626 .. sourcecode:: pycon+sql 

627 

628 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

629 

630 >>> do_update_stmt = stmt.on_conflict_do_update( 

631 ... index_elements=[my_table.c.user_email], 

632 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

633 ... set_=dict(data=stmt.excluded.data), 

634 ... ) 

635 

636 >>> print(do_update_stmt) 

637 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) 

638 ON CONFLICT (user_email) 

639 WHERE user_email LIKE '%@gmail.com' 

640 DO UPDATE SET data = excluded.data 

641 

642The SET Clause 

643^^^^^^^^^^^^^^^ 

644 

645``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

646existing row, using any combination of new values as well as values 

647from the proposed insertion. These values are specified using the 

648:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This 

649parameter accepts a dictionary which consists of direct values 

650for UPDATE: 

651 

652.. sourcecode:: pycon+sql 

653 

654 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

655 

656 >>> do_update_stmt = stmt.on_conflict_do_update( 

657 ... index_elements=["id"], set_=dict(data="updated value") 

658 ... ) 

659 

660 >>> print(do_update_stmt) 

661 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

662 ON CONFLICT (id) DO UPDATE SET data = ? 

663 

664.. warning:: 

665 

666 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take 

667 into account Python-side default UPDATE values or generation functions, 

668 e.g. those specified using :paramref:`_schema.Column.onupdate`. These 

669 values will not be exercised for an ON CONFLICT style of UPDATE, unless 

670 they are manually specified in the 

671 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. 

672 

673Updating using the Excluded INSERT Values 

674^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

675 

676In order to refer to the proposed insertion row, the special alias 

677:attr:`~.sqlite.Insert.excluded` is available as an attribute on 

678the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix 

679on a column, that informs the DO UPDATE to update the row with the value that 

680would have been inserted had the constraint not failed: 

681 

682.. sourcecode:: pycon+sql 

683 

684 >>> stmt = insert(my_table).values( 

685 ... id="some_id", data="inserted value", author="jlh" 

686 ... ) 

687 

688 >>> do_update_stmt = stmt.on_conflict_do_update( 

689 ... index_elements=["id"], 

690 ... set_=dict(data="updated value", author=stmt.excluded.author), 

691 ... ) 

692 

693 >>> print(do_update_stmt) 

694 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

695 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

696 

697Additional WHERE Criteria 

698^^^^^^^^^^^^^^^^^^^^^^^^^ 

699 

700The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts 

701a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` 

702parameter, which will limit those rows which receive an UPDATE: 

703 

704.. sourcecode:: pycon+sql 

705 

706 >>> stmt = insert(my_table).values( 

707 ... id="some_id", data="inserted value", author="jlh" 

708 ... ) 

709 

710 >>> on_update_stmt = stmt.on_conflict_do_update( 

711 ... index_elements=["id"], 

712 ... set_=dict(data="updated value", author=stmt.excluded.author), 

713 ... where=(my_table.c.status == 2), 

714 ... ) 

715 >>> print(on_update_stmt) 

716 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

717 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

718 WHERE my_table.status = ? 

719 

720 

721Skipping Rows with DO NOTHING 

722^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

723 

724``ON CONFLICT`` may be used to skip inserting a row entirely 

725if any conflict with a unique constraint occurs; below this is illustrated 

726using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: 

727 

728.. sourcecode:: pycon+sql 

729 

730 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

731 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

732 >>> print(stmt) 

733 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING 

734 

735 

736If ``DO NOTHING`` is used without specifying any columns or constraint, 

737it has the effect of skipping the INSERT for any unique violation which 

738occurs: 

739 

740.. sourcecode:: pycon+sql 

741 

742 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

743 >>> stmt = stmt.on_conflict_do_nothing() 

744 >>> print(stmt) 

745 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING 

746 

747.. _sqlite_type_reflection: 

748 

749Type Reflection 

750--------------- 

751 

752SQLite types are unlike those of most other database backends, in that 

753the string name of the type usually does not correspond to a "type" in a 

754one-to-one fashion. Instead, SQLite links per-column typing behavior 

755to one of five so-called "type affinities" based on a string matching 

756pattern for the type. 

757 

758SQLAlchemy's reflection process, when inspecting types, uses a simple 

759lookup table to link the keywords returned to provided SQLAlchemy types. 

760This lookup table is present within the SQLite dialect as it is for all 

761other dialects. However, the SQLite dialect has a different "fallback" 

762routine for when a particular type name is not located in the lookup map; 

763it instead implements the SQLite "type affinity" scheme located at 

764https://www.sqlite.org/datatype3.html section 2.1. 

765 

766The provided typemap will make direct associations from an exact string 

767name match for the following types: 

768 

769:class:`_types.BIGINT`, :class:`_types.BLOB`, 

770:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, 

771:class:`_types.CHAR`, :class:`_types.DATE`, 

772:class:`_types.DATETIME`, :class:`_types.FLOAT`, 

773:class:`_types.DECIMAL`, :class:`_types.FLOAT`, 

774:class:`_types.INTEGER`, :class:`_types.INTEGER`, 

775:class:`_types.NUMERIC`, :class:`_types.REAL`, 

776:class:`_types.SMALLINT`, :class:`_types.TEXT`, 

777:class:`_types.TIME`, :class:`_types.TIMESTAMP`, 

778:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, 

779:class:`_types.NCHAR` 

780 

781When a type name does not match one of the above types, the "type affinity" 

782lookup is used instead: 

783 

784* :class:`_types.INTEGER` is returned if the type name includes the 

785 string ``INT`` 

786* :class:`_types.TEXT` is returned if the type name includes the 

787 string ``CHAR``, ``CLOB`` or ``TEXT`` 

788* :class:`_types.NullType` is returned if the type name includes the 

789 string ``BLOB`` 

790* :class:`_types.REAL` is returned if the type name includes the string 

791 ``REAL``, ``FLOA`` or ``DOUB``. 

792* Otherwise, the :class:`_types.NUMERIC` type is used. 

793 

794.. _sqlite_partial_index: 

795 

796Partial Indexes 

797--------------- 

798 

799A partial index, e.g. one which uses a WHERE clause, can be specified 

800with the DDL system using the argument ``sqlite_where``:: 

801 

802 tbl = Table("testtbl", m, Column("data", Integer)) 

803 idx = Index( 

804 "test_idx1", 

805 tbl.c.data, 

806 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10), 

807 ) 

808 

809The index will be rendered at create time as: 

810 

811.. sourcecode:: sql 

812 

813 CREATE INDEX test_idx1 ON testtbl (data) 

814 WHERE data > 5 AND data < 10 

815 

816.. _sqlite_dotted_column_names: 

817 

818Dotted Column Names 

819------------------- 

820 

821Using table or column names that explicitly have periods in them is 

822**not recommended**. While this is generally a bad idea for relational 

823databases in general, as the dot is a syntactically significant character, 

824the SQLite driver up until version **3.10.0** of SQLite has a bug which 

825requires that SQLAlchemy filter out these dots in result sets. 

826 

827The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: 

828 

829 import sqlite3 

830 

831 assert sqlite3.sqlite_version_info < ( 

832 3, 

833 10, 

834 0, 

835 ), "bug is fixed in this version" 

836 

837 conn = sqlite3.connect(":memory:") 

838 cursor = conn.cursor() 

839 

840 cursor.execute("create table x (a integer, b integer)") 

841 cursor.execute("insert into x (a, b) values (1, 1)") 

842 cursor.execute("insert into x (a, b) values (2, 2)") 

843 

844 cursor.execute("select x.a, x.b from x") 

845 assert [c[0] for c in cursor.description] == ["a", "b"] 

846 

847 cursor.execute( 

848 """ 

849 select x.a, x.b from x where a=1 

850 union 

851 select x.a, x.b from x where a=2 

852 """ 

853 ) 

854 assert [c[0] for c in cursor.description] == ["a", "b"], [ 

855 c[0] for c in cursor.description 

856 ] 

857 

858The second assertion fails: 

859 

860.. sourcecode:: text 

861 

862 Traceback (most recent call last): 

863 File "test.py", line 19, in <module> 

864 [c[0] for c in cursor.description] 

865 AssertionError: ['x.a', 'x.b'] 

866 

867Where above, the driver incorrectly reports the names of the columns 

868including the name of the table, which is entirely inconsistent vs. 

869when the UNION is not present. 

870 

871SQLAlchemy relies upon column names being predictable in how they match 

872to the original statement, so the SQLAlchemy dialect has no choice but 

873to filter these out:: 

874 

875 

876 from sqlalchemy import create_engine 

877 

878 eng = create_engine("sqlite://") 

879 conn = eng.connect() 

880 

881 conn.exec_driver_sql("create table x (a integer, b integer)") 

882 conn.exec_driver_sql("insert into x (a, b) values (1, 1)") 

883 conn.exec_driver_sql("insert into x (a, b) values (2, 2)") 

884 

885 result = conn.exec_driver_sql("select x.a, x.b from x") 

886 assert result.keys() == ["a", "b"] 

887 

888 result = conn.exec_driver_sql( 

889 """ 

890 select x.a, x.b from x where a=1 

891 union 

892 select x.a, x.b from x where a=2 

893 """ 

894 ) 

895 assert result.keys() == ["a", "b"] 

896 

897Note that above, even though SQLAlchemy filters out the dots, *both 

898names are still addressable*:: 

899 

900 >>> row = result.first() 

901 >>> row["a"] 

902 1 

903 >>> row["x.a"] 

904 1 

905 >>> row["b"] 

906 1 

907 >>> row["x.b"] 

908 1 

909 

910Therefore, the workaround applied by SQLAlchemy only impacts 

911:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In 

912the very specific case where an application is forced to use column names that 

913contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and 

914:meth:`.Row.keys()` is required to return these dotted names unmodified, 

915the ``sqlite_raw_colnames`` execution option may be provided, either on a 

916per-:class:`_engine.Connection` basis:: 

917 

918 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql( 

919 """ 

920 select x.a, x.b from x where a=1 

921 union 

922 select x.a, x.b from x where a=2 

923 """ 

924 ) 

925 assert result.keys() == ["x.a", "x.b"] 

926 

927or on a per-:class:`_engine.Engine` basis:: 

928 

929 engine = create_engine( 

930 "sqlite://", execution_options={"sqlite_raw_colnames": True} 

931 ) 

932 

933When using the per-:class:`_engine.Engine` execution option, note that 

934**Core and ORM queries that use UNION may not function properly**. 

935 

936SQLite-specific table options 

937----------------------------- 

938 

939One option for CREATE TABLE is supported directly by the SQLite 

940dialect in conjunction with the :class:`_schema.Table` construct: 

941 

942* ``WITHOUT ROWID``:: 

943 

944 Table("some_table", metadata, ..., sqlite_with_rowid=False) 

945 

946* 

947 ``STRICT``:: 

948 

949 Table("some_table", metadata, ..., sqlite_strict=True) 

950 

951 .. versionadded:: 2.0.37 

952 

953.. seealso:: 

954 

955 `SQLite CREATE TABLE options 

956 <https://www.sqlite.org/lang_createtable.html>`_ 

957 

958.. _sqlite_include_internal: 

959 

960Reflecting internal schema tables 

961---------------------------------- 

962 

963Reflection methods that return lists of tables will omit so-called 

964"SQLite internal schema object" names, which are considered by SQLite 

965as any object name that is prefixed with ``sqlite_``. An example of 

966such an object is the ``sqlite_sequence`` table that's generated when 

967the ``AUTOINCREMENT`` column parameter is used. In order to return 

968these objects, the parameter ``sqlite_include_internal=True`` may be 

969passed to methods such as :meth:`_schema.MetaData.reflect` or 

970:meth:`.Inspector.get_table_names`. 

971 

972.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter. 

973 Previously, these tables were not ignored by SQLAlchemy reflection 

974 methods. 

975 

976.. note:: 

977 

978 The ``sqlite_include_internal`` parameter does not refer to the 

979 "system" tables that are present in schemas such as ``sqlite_master``. 

980 

981.. seealso:: 

982 

983 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite 

984 documentation. 

985 

986''' # noqa 

987from __future__ import annotations 

988 

989import datetime 

990import numbers 

991import re 

992from typing import Any 

993from typing import Callable 

994from typing import Optional 

995from typing import TYPE_CHECKING 

996 

997from .json import JSON 

998from .json import JSONIndexType 

999from .json import JSONPathType 

1000from ... import exc 

1001from ... import schema as sa_schema 

1002from ... import sql 

1003from ... import text 

1004from ... import types as sqltypes 

1005from ... import util 

1006from ...engine import default 

1007from ...engine import processors 

1008from ...engine import reflection 

1009from ...engine.reflection import ReflectionDefaults 

1010from ...sql import coercions 

1011from ...sql import compiler 

1012from ...sql import ddl as sa_ddl 

1013from ...sql import elements 

1014from ...sql import roles 

1015from ...sql import schema 

1016from ...types import BLOB # noqa 

1017from ...types import BOOLEAN # noqa 

1018from ...types import CHAR # noqa 

1019from ...types import DECIMAL # noqa 

1020from ...types import FLOAT # noqa 

1021from ...types import INTEGER # noqa 

1022from ...types import NUMERIC # noqa 

1023from ...types import REAL # noqa 

1024from ...types import SMALLINT # noqa 

1025from ...types import TEXT # noqa 

1026from ...types import TIMESTAMP # noqa 

1027from ...types import VARCHAR # noqa 

1028 

1029if TYPE_CHECKING: 

1030 from ...engine.interfaces import DBAPIConnection 

1031 from ...engine.interfaces import Dialect 

1032 from ...engine.interfaces import IsolationLevel 

1033 from ...sql.type_api import _BindProcessorType 

1034 from ...sql.type_api import _ResultProcessorType 

1035 

1036 

1037class _SQliteJson(JSON): 

1038 def result_processor(self, dialect, coltype): 

1039 default_processor = super().result_processor(dialect, coltype) 

1040 

1041 def process(value): 

1042 try: 

1043 return default_processor(value) 

1044 except TypeError: 

1045 if isinstance(value, numbers.Number): 

1046 return value 

1047 else: 

1048 raise 

1049 

1050 return process 

1051 

1052 

1053class _DateTimeMixin: 

1054 _reg = None 

1055 _storage_format = None 

1056 

1057 def __init__(self, storage_format=None, regexp=None, **kw): 

1058 super().__init__(**kw) 

1059 if regexp is not None: 

1060 self._reg = re.compile(regexp) 

1061 if storage_format is not None: 

1062 self._storage_format = storage_format 

1063 

1064 @property 

1065 def format_is_text_affinity(self): 

1066 """return True if the storage format will automatically imply 

1067 a TEXT affinity. 

1068 

1069 If the storage format contains no non-numeric characters, 

1070 it will imply a NUMERIC storage format on SQLite; in this case, 

1071 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, 

1072 TIME_CHAR. 

1073 

1074 """ 

1075 spec = self._storage_format % { 

1076 "year": 0, 

1077 "month": 0, 

1078 "day": 0, 

1079 "hour": 0, 

1080 "minute": 0, 

1081 "second": 0, 

1082 "microsecond": 0, 

1083 } 

1084 return bool(re.search(r"[^0-9]", spec)) 

1085 

1086 def adapt(self, cls, **kw): 

1087 if issubclass(cls, _DateTimeMixin): 

1088 if self._storage_format: 

1089 kw["storage_format"] = self._storage_format 

1090 if self._reg: 

1091 kw["regexp"] = self._reg 

1092 return super().adapt(cls, **kw) 

1093 

1094 def literal_processor(self, dialect): 

1095 bp = self.bind_processor(dialect) 

1096 

1097 def process(value): 

1098 return "'%s'" % bp(value) 

1099 

1100 return process 

1101 

1102 

1103class DATETIME(_DateTimeMixin, sqltypes.DateTime): 

1104 r"""Represent a Python datetime object in SQLite using a string. 

1105 

1106 The default string storage format is:: 

1107 

1108 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1109 

1110 e.g.: 

1111 

1112 .. sourcecode:: text 

1113 

1114 2021-03-15 12:05:57.105542 

1115 

1116 The incoming storage format is by default parsed using the 

1117 Python ``datetime.fromisoformat()`` function. 

1118 

1119 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default 

1120 datetime string parsing. 

1121 

1122 The storage format can be customized to some degree using the 

1123 ``storage_format`` and ``regexp`` parameters, such as:: 

1124 

1125 import re 

1126 from sqlalchemy.dialects.sqlite import DATETIME 

1127 

1128 dt = DATETIME( 

1129 storage_format=( 

1130 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d" 

1131 ), 

1132 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)", 

1133 ) 

1134 

1135 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1136 from the datetime. Can't be specified together with ``storage_format`` 

1137 or ``regexp``. 

1138 

1139 :param storage_format: format string which will be applied to the dict 

1140 with keys year, month, day, hour, minute, second, and microsecond. 

1141 

1142 :param regexp: regular expression which will be applied to incoming result 

1143 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1144 strings. If the regexp contains named groups, the resulting match dict is 

1145 applied to the Python datetime() constructor as keyword arguments. 

1146 Otherwise, if positional groups are used, the datetime() constructor 

1147 is called with positional arguments via 

1148 ``*map(int, match_obj.groups(0))``. 

1149 

1150 """ # noqa 

1151 

1152 _storage_format = ( 

1153 "%(year)04d-%(month)02d-%(day)02d " 

1154 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1155 ) 

1156 

1157 def __init__(self, *args, **kwargs): 

1158 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1159 super().__init__(*args, **kwargs) 

1160 if truncate_microseconds: 

1161 assert "storage_format" not in kwargs, ( 

1162 "You can specify only " 

1163 "one of truncate_microseconds or storage_format." 

1164 ) 

1165 assert "regexp" not in kwargs, ( 

1166 "You can specify only one of " 

1167 "truncate_microseconds or regexp." 

1168 ) 

1169 self._storage_format = ( 

1170 "%(year)04d-%(month)02d-%(day)02d " 

1171 "%(hour)02d:%(minute)02d:%(second)02d" 

1172 ) 

1173 

1174 def bind_processor( 

1175 self, dialect: Dialect 

1176 ) -> Optional[_BindProcessorType[Any]]: 

1177 datetime_datetime = datetime.datetime 

1178 datetime_date = datetime.date 

1179 format_ = self._storage_format 

1180 

1181 def process(value): 

1182 if value is None: 

1183 return None 

1184 elif isinstance(value, datetime_datetime): 

1185 return format_ % { 

1186 "year": value.year, 

1187 "month": value.month, 

1188 "day": value.day, 

1189 "hour": value.hour, 

1190 "minute": value.minute, 

1191 "second": value.second, 

1192 "microsecond": value.microsecond, 

1193 } 

1194 elif isinstance(value, datetime_date): 

1195 return format_ % { 

1196 "year": value.year, 

1197 "month": value.month, 

1198 "day": value.day, 

1199 "hour": 0, 

1200 "minute": 0, 

1201 "second": 0, 

1202 "microsecond": 0, 

1203 } 

1204 else: 

1205 raise TypeError( 

1206 "SQLite DateTime type only accepts Python " 

1207 "datetime and date objects as input." 

1208 ) 

1209 

1210 return process 

1211 

1212 def result_processor( 

1213 self, dialect: Dialect, coltype: object 

1214 ) -> Optional[_ResultProcessorType[Any]]: 

1215 if self._reg: 

1216 return processors.str_to_datetime_processor_factory( 

1217 self._reg, datetime.datetime 

1218 ) 

1219 else: 

1220 return processors.str_to_datetime 

1221 

1222 

1223class DATE(_DateTimeMixin, sqltypes.Date): 

1224 r"""Represent a Python date object in SQLite using a string. 

1225 

1226 The default string storage format is:: 

1227 

1228 "%(year)04d-%(month)02d-%(day)02d" 

1229 

1230 e.g.: 

1231 

1232 .. sourcecode:: text 

1233 

1234 2011-03-15 

1235 

1236 The incoming storage format is by default parsed using the 

1237 Python ``date.fromisoformat()`` function. 

1238 

1239 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default 

1240 date string parsing. 

1241 

1242 

1243 The storage format can be customized to some degree using the 

1244 ``storage_format`` and ``regexp`` parameters, such as:: 

1245 

1246 import re 

1247 from sqlalchemy.dialects.sqlite import DATE 

1248 

1249 d = DATE( 

1250 storage_format="%(month)02d/%(day)02d/%(year)04d", 

1251 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"), 

1252 ) 

1253 

1254 :param storage_format: format string which will be applied to the 

1255 dict with keys year, month, and day. 

1256 

1257 :param regexp: regular expression which will be applied to 

1258 incoming result rows, replacing the use of ``date.fromisoformat()`` to 

1259 parse incoming strings. If the regexp contains named groups, the resulting 

1260 match dict is applied to the Python date() constructor as keyword 

1261 arguments. Otherwise, if positional groups are used, the date() 

1262 constructor is called with positional arguments via 

1263 ``*map(int, match_obj.groups(0))``. 

1264 

1265 """ 

1266 

1267 _storage_format = "%(year)04d-%(month)02d-%(day)02d" 

1268 

1269 def bind_processor( 

1270 self, dialect: Dialect 

1271 ) -> Optional[_BindProcessorType[Any]]: 

1272 datetime_date = datetime.date 

1273 format_ = self._storage_format 

1274 

1275 def process(value): 

1276 if value is None: 

1277 return None 

1278 elif isinstance(value, datetime_date): 

1279 return format_ % { 

1280 "year": value.year, 

1281 "month": value.month, 

1282 "day": value.day, 

1283 } 

1284 else: 

1285 raise TypeError( 

1286 "SQLite Date type only accepts Python " 

1287 "date objects as input." 

1288 ) 

1289 

1290 return process 

1291 

1292 def result_processor( 

1293 self, dialect: Dialect, coltype: object 

1294 ) -> Optional[_ResultProcessorType[Any]]: 

1295 if self._reg: 

1296 return processors.str_to_datetime_processor_factory( 

1297 self._reg, datetime.date 

1298 ) 

1299 else: 

1300 return processors.str_to_date 

1301 

1302 

1303class TIME(_DateTimeMixin, sqltypes.Time): 

1304 r"""Represent a Python time object in SQLite using a string. 

1305 

1306 The default string storage format is:: 

1307 

1308 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1309 

1310 e.g.: 

1311 

1312 .. sourcecode:: text 

1313 

1314 12:05:57.10558 

1315 

1316 The incoming storage format is by default parsed using the 

1317 Python ``time.fromisoformat()`` function. 

1318 

1319 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default 

1320 time string parsing. 

1321 

1322 The storage format can be customized to some degree using the 

1323 ``storage_format`` and ``regexp`` parameters, such as:: 

1324 

1325 import re 

1326 from sqlalchemy.dialects.sqlite import TIME 

1327 

1328 t = TIME( 

1329 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d", 

1330 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"), 

1331 ) 

1332 

1333 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1334 from the time. Can't be specified together with ``storage_format`` 

1335 or ``regexp``. 

1336 

1337 :param storage_format: format string which will be applied to the dict 

1338 with keys hour, minute, second, and microsecond. 

1339 

1340 :param regexp: regular expression which will be applied to incoming result 

1341 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1342 strings. If the regexp contains named groups, the resulting match dict is 

1343 applied to the Python time() constructor as keyword arguments. Otherwise, 

1344 if positional groups are used, the time() constructor is called with 

1345 positional arguments via ``*map(int, match_obj.groups(0))``. 

1346 

1347 """ 

1348 

1349 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1350 

1351 def __init__(self, *args, **kwargs): 

1352 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1353 super().__init__(*args, **kwargs) 

1354 if truncate_microseconds: 

1355 assert "storage_format" not in kwargs, ( 

1356 "You can specify only " 

1357 "one of truncate_microseconds or storage_format." 

1358 ) 

1359 assert "regexp" not in kwargs, ( 

1360 "You can specify only one of " 

1361 "truncate_microseconds or regexp." 

1362 ) 

1363 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" 

1364 

1365 def bind_processor(self, dialect): 

1366 datetime_time = datetime.time 

1367 format_ = self._storage_format 

1368 

1369 def process(value): 

1370 if value is None: 

1371 return None 

1372 elif isinstance(value, datetime_time): 

1373 return format_ % { 

1374 "hour": value.hour, 

1375 "minute": value.minute, 

1376 "second": value.second, 

1377 "microsecond": value.microsecond, 

1378 } 

1379 else: 

1380 raise TypeError( 

1381 "SQLite Time type only accepts Python " 

1382 "time objects as input." 

1383 ) 

1384 

1385 return process 

1386 

1387 def result_processor(self, dialect, coltype): 

1388 if self._reg: 

1389 return processors.str_to_datetime_processor_factory( 

1390 self._reg, datetime.time 

1391 ) 

1392 else: 

1393 return processors.str_to_time 

1394 

1395 

1396colspecs = { 

1397 sqltypes.Date: DATE, 

1398 sqltypes.DateTime: DATETIME, 

1399 sqltypes.JSON: _SQliteJson, 

1400 sqltypes.JSON.JSONIndexType: JSONIndexType, 

1401 sqltypes.JSON.JSONPathType: JSONPathType, 

1402 sqltypes.Time: TIME, 

1403} 

1404 

1405ischema_names = { 

1406 "BIGINT": sqltypes.BIGINT, 

1407 "BLOB": sqltypes.BLOB, 

1408 "BOOL": sqltypes.BOOLEAN, 

1409 "BOOLEAN": sqltypes.BOOLEAN, 

1410 "CHAR": sqltypes.CHAR, 

1411 "DATE": sqltypes.DATE, 

1412 "DATE_CHAR": sqltypes.DATE, 

1413 "DATETIME": sqltypes.DATETIME, 

1414 "DATETIME_CHAR": sqltypes.DATETIME, 

1415 "DOUBLE": sqltypes.DOUBLE, 

1416 "DECIMAL": sqltypes.DECIMAL, 

1417 "FLOAT": sqltypes.FLOAT, 

1418 "INT": sqltypes.INTEGER, 

1419 "INTEGER": sqltypes.INTEGER, 

1420 "JSON": JSON, 

1421 "NUMERIC": sqltypes.NUMERIC, 

1422 "REAL": sqltypes.REAL, 

1423 "SMALLINT": sqltypes.SMALLINT, 

1424 "TEXT": sqltypes.TEXT, 

1425 "TIME": sqltypes.TIME, 

1426 "TIME_CHAR": sqltypes.TIME, 

1427 "TIMESTAMP": sqltypes.TIMESTAMP, 

1428 "VARCHAR": sqltypes.VARCHAR, 

1429 "NVARCHAR": sqltypes.NVARCHAR, 

1430 "NCHAR": sqltypes.NCHAR, 

1431} 

1432 

1433 

1434class SQLiteCompiler(compiler.SQLCompiler): 

1435 extract_map = util.update_copy( 

1436 compiler.SQLCompiler.extract_map, 

1437 { 

1438 "month": "%m", 

1439 "day": "%d", 

1440 "year": "%Y", 

1441 "second": "%S", 

1442 "hour": "%H", 

1443 "doy": "%j", 

1444 "minute": "%M", 

1445 "epoch": "%s", 

1446 "dow": "%w", 

1447 "week": "%W", 

1448 }, 

1449 ) 

1450 

1451 def visit_truediv_binary(self, binary, operator, **kw): 

1452 return ( 

1453 self.process(binary.left, **kw) 

1454 + " / " 

1455 + "(%s + 0.0)" % self.process(binary.right, **kw) 

1456 ) 

1457 

1458 def visit_now_func(self, fn, **kw): 

1459 return "CURRENT_TIMESTAMP" 

1460 

1461 def visit_localtimestamp_func(self, func, **kw): 

1462 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')" 

1463 

1464 def visit_true(self, expr, **kw): 

1465 return "1" 

1466 

1467 def visit_false(self, expr, **kw): 

1468 return "0" 

1469 

1470 def visit_char_length_func(self, fn, **kw): 

1471 return "length%s" % self.function_argspec(fn) 

1472 

1473 def visit_aggregate_strings_func(self, fn, **kw): 

1474 return super().visit_aggregate_strings_func( 

1475 fn, use_function_name="group_concat", **kw 

1476 ) 

1477 

1478 def visit_cast(self, cast, **kwargs): 

1479 if self.dialect.supports_cast: 

1480 return super().visit_cast(cast, **kwargs) 

1481 else: 

1482 return self.process(cast.clause, **kwargs) 

1483 

1484 def visit_extract(self, extract, **kw): 

1485 try: 

1486 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( 

1487 self.extract_map[extract.field], 

1488 self.process(extract.expr, **kw), 

1489 ) 

1490 except KeyError as err: 

1491 raise exc.CompileError( 

1492 "%s is not a valid extract argument." % extract.field 

1493 ) from err 

1494 

1495 def returning_clause( 

1496 self, 

1497 stmt, 

1498 returning_cols, 

1499 *, 

1500 populate_result_map, 

1501 **kw, 

1502 ): 

1503 kw["include_table"] = False 

1504 return super().returning_clause( 

1505 stmt, returning_cols, populate_result_map=populate_result_map, **kw 

1506 ) 

1507 

1508 def limit_clause(self, select, **kw): 

1509 text = "" 

1510 if select._limit_clause is not None: 

1511 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

1512 if select._offset_clause is not None: 

1513 if select._limit_clause is None: 

1514 text += "\n LIMIT " + self.process(sql.literal(-1)) 

1515 text += " OFFSET " + self.process(select._offset_clause, **kw) 

1516 else: 

1517 text += " OFFSET " + self.process(sql.literal(0), **kw) 

1518 return text 

1519 

1520 def for_update_clause(self, select, **kw): 

1521 # sqlite has no "FOR UPDATE" AFAICT 

1522 return "" 

1523 

1524 def update_from_clause( 

1525 self, update_stmt, from_table, extra_froms, from_hints, **kw 

1526 ): 

1527 kw["asfrom"] = True 

1528 return "FROM " + ", ".join( 

1529 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

1530 for t in extra_froms 

1531 ) 

1532 

1533 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1534 return "%s IS NOT %s" % ( 

1535 self.process(binary.left), 

1536 self.process(binary.right), 

1537 ) 

1538 

1539 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1540 return "%s IS %s" % ( 

1541 self.process(binary.left), 

1542 self.process(binary.right), 

1543 ) 

1544 

1545 def visit_json_getitem_op_binary( 

1546 self, binary, operator, _cast_applied=False, **kw 

1547 ): 

1548 if ( 

1549 not _cast_applied 

1550 and binary.type._type_affinity is not sqltypes.JSON 

1551 ): 

1552 kw["_cast_applied"] = True 

1553 return self.process(sql.cast(binary, binary.type), **kw) 

1554 

1555 if binary.type._type_affinity is sqltypes.JSON: 

1556 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1557 else: 

1558 expr = "JSON_EXTRACT(%s, %s)" 

1559 

1560 return expr % ( 

1561 self.process(binary.left, **kw), 

1562 self.process(binary.right, **kw), 

1563 ) 

1564 

1565 def visit_json_path_getitem_op_binary( 

1566 self, binary, operator, _cast_applied=False, **kw 

1567 ): 

1568 if ( 

1569 not _cast_applied 

1570 and binary.type._type_affinity is not sqltypes.JSON 

1571 ): 

1572 kw["_cast_applied"] = True 

1573 return self.process(sql.cast(binary, binary.type), **kw) 

1574 

1575 if binary.type._type_affinity is sqltypes.JSON: 

1576 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1577 else: 

1578 expr = "JSON_EXTRACT(%s, %s)" 

1579 

1580 return expr % ( 

1581 self.process(binary.left, **kw), 

1582 self.process(binary.right, **kw), 

1583 ) 

1584 

1585 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

1586 # slightly old SQLite versions don't seem to be able to handle 

1587 # the empty set impl 

1588 return self.visit_empty_set_expr(type_) 

1589 

1590 def visit_empty_set_expr(self, element_types, **kw): 

1591 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( 

1592 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1593 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1594 ) 

1595 

1596 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1597 return self._generate_generic_binary(binary, " REGEXP ", **kw) 

1598 

1599 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1600 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) 

1601 

1602 def _on_conflict_target(self, clause, **kw): 

1603 if clause.inferred_target_elements is not None: 

1604 target_text = "(%s)" % ", ".join( 

1605 ( 

1606 self.preparer.quote(c) 

1607 if isinstance(c, str) 

1608 else self.process(c, include_table=False, use_schema=False) 

1609 ) 

1610 for c in clause.inferred_target_elements 

1611 ) 

1612 if clause.inferred_target_whereclause is not None: 

1613 whereclause_kw = dict(kw) 

1614 whereclause_kw.update( 

1615 include_table=False, 

1616 use_schema=False, 

1617 literal_execute=True, 

1618 ) 

1619 target_text += " WHERE %s" % self.process( 

1620 clause.inferred_target_whereclause, 

1621 **whereclause_kw, 

1622 ) 

1623 

1624 else: 

1625 target_text = "" 

1626 

1627 return target_text 

1628 

1629 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

1630 target_text = self._on_conflict_target(on_conflict, **kw) 

1631 

1632 if target_text: 

1633 return "ON CONFLICT %s DO NOTHING" % target_text 

1634 else: 

1635 return "ON CONFLICT DO NOTHING" 

1636 

1637 def visit_on_conflict_do_update(self, on_conflict, **kw): 

1638 clause = on_conflict 

1639 

1640 target_text = self._on_conflict_target(on_conflict, **kw) 

1641 

1642 action_set_ops = [] 

1643 

1644 set_parameters = dict(clause.update_values_to_set) 

1645 # create a list of column assignment clauses as tuples 

1646 

1647 insert_statement = self.stack[-1]["selectable"] 

1648 cols = insert_statement.table.c 

1649 set_kw = dict(kw) 

1650 set_kw.update(use_schema=False) 

1651 for c in cols: 

1652 col_key = c.key 

1653 

1654 if col_key in set_parameters: 

1655 value = set_parameters.pop(col_key) 

1656 elif c in set_parameters: 

1657 value = set_parameters.pop(c) 

1658 else: 

1659 continue 

1660 

1661 if ( 

1662 isinstance(value, elements.BindParameter) 

1663 and value.type._isnull 

1664 ): 

1665 value = value._with_binary_element_type(c.type) 

1666 

1667 value_text = self.process( 

1668 value.self_group(), is_upsert_set=True, **set_kw 

1669 ) 

1670 

1671 key_text = self.preparer.quote(c.name) 

1672 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1673 

1674 # check for names that don't match columns 

1675 if set_parameters: 

1676 util.warn( 

1677 "Additional column names not matching " 

1678 "any column keys in table '%s': %s" 

1679 % ( 

1680 self.current_executable.table.name, 

1681 (", ".join("'%s'" % c for c in set_parameters)), 

1682 ) 

1683 ) 

1684 for k, v in set_parameters.items(): 

1685 key_text = ( 

1686 self.preparer.quote(k) 

1687 if isinstance(k, str) 

1688 else self.process(k, **set_kw) 

1689 ) 

1690 value_text = self.process( 

1691 coercions.expect(roles.ExpressionElementRole, v), 

1692 is_upsert_set=True, 

1693 **set_kw, 

1694 ) 

1695 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1696 

1697 action_text = ", ".join(action_set_ops) 

1698 if clause.update_whereclause is not None: 

1699 where_kw = dict(kw) 

1700 where_kw.update(include_table=True, use_schema=False) 

1701 action_text += " WHERE %s" % self.process( 

1702 clause.update_whereclause, **where_kw 

1703 ) 

1704 

1705 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

1706 

1707 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1708 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)". 

1709 kw["eager_grouping"] = True 

1710 or_ = self._generate_generic_binary(binary, " | ", **kw) 

1711 and_ = self._generate_generic_binary(binary, " & ", **kw) 

1712 return f"({or_} - {and_})" 

1713 

1714 

1715class SQLiteDDLCompiler(compiler.DDLCompiler): 

1716 def get_column_specification(self, column, **kwargs): 

1717 coltype = self.dialect.type_compiler_instance.process( 

1718 column.type, type_expression=column 

1719 ) 

1720 colspec = self.preparer.format_column(column) + " " + coltype 

1721 default = self.get_column_default_string(column) 

1722 if default is not None: 

1723 

1724 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match( 

1725 r".*\W.*", default 

1726 ): 

1727 colspec += f" DEFAULT ({default})" 

1728 else: 

1729 colspec += f" DEFAULT {default}" 

1730 

1731 if not column.nullable: 

1732 colspec += " NOT NULL" 

1733 

1734 on_conflict_clause = column.dialect_options["sqlite"][ 

1735 "on_conflict_not_null" 

1736 ] 

1737 if on_conflict_clause is not None: 

1738 colspec += " ON CONFLICT " + on_conflict_clause 

1739 

1740 if column.primary_key: 

1741 if ( 

1742 column.autoincrement is True 

1743 and len(column.table.primary_key.columns) != 1 

1744 ): 

1745 raise exc.CompileError( 

1746 "SQLite does not support autoincrement for " 

1747 "composite primary keys" 

1748 ) 

1749 

1750 if ( 

1751 column.table.dialect_options["sqlite"]["autoincrement"] 

1752 and len(column.table.primary_key.columns) == 1 

1753 and issubclass(column.type._type_affinity, sqltypes.Integer) 

1754 and not column.foreign_keys 

1755 ): 

1756 colspec += " PRIMARY KEY" 

1757 

1758 on_conflict_clause = column.dialect_options["sqlite"][ 

1759 "on_conflict_primary_key" 

1760 ] 

1761 if on_conflict_clause is not None: 

1762 colspec += " ON CONFLICT " + on_conflict_clause 

1763 

1764 colspec += " AUTOINCREMENT" 

1765 

1766 if column.computed is not None: 

1767 colspec += " " + self.process(column.computed) 

1768 

1769 return colspec 

1770 

1771 def visit_primary_key_constraint(self, constraint, **kw): 

1772 # for columns with sqlite_autoincrement=True, 

1773 # the PRIMARY KEY constraint can only be inline 

1774 # with the column itself. 

1775 if len(constraint.columns) == 1: 

1776 c = list(constraint)[0] 

1777 if ( 

1778 c.primary_key 

1779 and c.table.dialect_options["sqlite"]["autoincrement"] 

1780 and issubclass(c.type._type_affinity, sqltypes.Integer) 

1781 and not c.foreign_keys 

1782 ): 

1783 return None 

1784 

1785 text = super().visit_primary_key_constraint(constraint) 

1786 

1787 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1788 "on_conflict" 

1789 ] 

1790 if on_conflict_clause is None and len(constraint.columns) == 1: 

1791 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ 

1792 "on_conflict_primary_key" 

1793 ] 

1794 

1795 if on_conflict_clause is not None: 

1796 text += " ON CONFLICT " + on_conflict_clause 

1797 

1798 return text 

1799 

1800 def visit_unique_constraint(self, constraint, **kw): 

1801 text = super().visit_unique_constraint(constraint) 

1802 

1803 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1804 "on_conflict" 

1805 ] 

1806 if on_conflict_clause is None and len(constraint.columns) == 1: 

1807 col1 = list(constraint)[0] 

1808 if isinstance(col1, schema.SchemaItem): 

1809 on_conflict_clause = list(constraint)[0].dialect_options[ 

1810 "sqlite" 

1811 ]["on_conflict_unique"] 

1812 

1813 if on_conflict_clause is not None: 

1814 text += " ON CONFLICT " + on_conflict_clause 

1815 

1816 return text 

1817 

1818 def visit_check_constraint(self, constraint, **kw): 

1819 text = super().visit_check_constraint(constraint) 

1820 

1821 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1822 "on_conflict" 

1823 ] 

1824 

1825 if on_conflict_clause is not None: 

1826 text += " ON CONFLICT " + on_conflict_clause 

1827 

1828 return text 

1829 

1830 def visit_column_check_constraint(self, constraint, **kw): 

1831 text = super().visit_column_check_constraint(constraint) 

1832 

1833 if constraint.dialect_options["sqlite"]["on_conflict"] is not None: 

1834 raise exc.CompileError( 

1835 "SQLite does not support on conflict clause for " 

1836 "column check constraint" 

1837 ) 

1838 

1839 return text 

1840 

1841 def visit_foreign_key_constraint(self, constraint, **kw): 

1842 local_table = constraint.elements[0].parent.table 

1843 remote_table = constraint.elements[0].column.table 

1844 

1845 if local_table.schema != remote_table.schema: 

1846 return None 

1847 else: 

1848 return super().visit_foreign_key_constraint(constraint) 

1849 

1850 def define_constraint_remote_table(self, constraint, table, preparer): 

1851 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

1852 

1853 return preparer.format_table(table, use_schema=False) 

1854 

1855 def visit_create_index( 

1856 self, create, include_schema=False, include_table_schema=True, **kw 

1857 ): 

1858 index = create.element 

1859 self._verify_index_table(index) 

1860 preparer = self.preparer 

1861 text = "CREATE " 

1862 if index.unique: 

1863 text += "UNIQUE " 

1864 

1865 text += "INDEX " 

1866 

1867 if create.if_not_exists: 

1868 text += "IF NOT EXISTS " 

1869 

1870 text += "%s ON %s (%s)" % ( 

1871 self._prepared_index_name(index, include_schema=True), 

1872 preparer.format_table(index.table, use_schema=False), 

1873 ", ".join( 

1874 self.sql_compiler.process( 

1875 expr, include_table=False, literal_binds=True 

1876 ) 

1877 for expr in index.expressions 

1878 ), 

1879 ) 

1880 

1881 whereclause = index.dialect_options["sqlite"]["where"] 

1882 if whereclause is not None: 

1883 where_compiled = self.sql_compiler.process( 

1884 whereclause, include_table=False, literal_binds=True 

1885 ) 

1886 text += " WHERE " + where_compiled 

1887 

1888 return text 

1889 

1890 def post_create_table(self, table): 

1891 table_options = [] 

1892 

1893 if not table.dialect_options["sqlite"]["with_rowid"]: 

1894 table_options.append("WITHOUT ROWID") 

1895 

1896 if table.dialect_options["sqlite"]["strict"]: 

1897 table_options.append("STRICT") 

1898 

1899 if table_options: 

1900 return "\n " + ",\n ".join(table_options) 

1901 else: 

1902 return "" 

1903 

1904 def visit_create_view(self, create, **kw): 

1905 """Handle SQLite if_not_exists dialect option for CREATE VIEW.""" 

1906 # Get the if_not_exists dialect option from the CreateView object 

1907 if_not_exists = create.dialect_options["sqlite"].get( 

1908 "if_not_exists", False 

1909 ) 

1910 

1911 # Pass if_not_exists through kw to the parent's _generate_table_select 

1912 kw["if_not_exists"] = if_not_exists 

1913 return super().visit_create_view(create, **kw) 

1914 

1915 

1916class SQLiteTypeCompiler(compiler.GenericTypeCompiler): 

1917 def visit_large_binary(self, type_, **kw): 

1918 return self.visit_BLOB(type_) 

1919 

1920 def visit_DATETIME(self, type_, **kw): 

1921 if ( 

1922 not isinstance(type_, _DateTimeMixin) 

1923 or type_.format_is_text_affinity 

1924 ): 

1925 return super().visit_DATETIME(type_) 

1926 else: 

1927 return "DATETIME_CHAR" 

1928 

1929 def visit_DATE(self, type_, **kw): 

1930 if ( 

1931 not isinstance(type_, _DateTimeMixin) 

1932 or type_.format_is_text_affinity 

1933 ): 

1934 return super().visit_DATE(type_) 

1935 else: 

1936 return "DATE_CHAR" 

1937 

1938 def visit_TIME(self, type_, **kw): 

1939 if ( 

1940 not isinstance(type_, _DateTimeMixin) 

1941 or type_.format_is_text_affinity 

1942 ): 

1943 return super().visit_TIME(type_) 

1944 else: 

1945 return "TIME_CHAR" 

1946 

1947 def visit_JSON(self, type_, **kw): 

1948 # note this name provides NUMERIC affinity, not TEXT. 

1949 # should not be an issue unless the JSON value consists of a single 

1950 # numeric value. JSONTEXT can be used if this case is required. 

1951 return "JSON" 

1952 

1953 

1954class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 

1955 reserved_words = { 

1956 "add", 

1957 "after", 

1958 "all", 

1959 "alter", 

1960 "analyze", 

1961 "and", 

1962 "as", 

1963 "asc", 

1964 "attach", 

1965 "autoincrement", 

1966 "before", 

1967 "begin", 

1968 "between", 

1969 "by", 

1970 "cascade", 

1971 "case", 

1972 "cast", 

1973 "check", 

1974 "collate", 

1975 "column", 

1976 "commit", 

1977 "conflict", 

1978 "constraint", 

1979 "create", 

1980 "cross", 

1981 "current_date", 

1982 "current_time", 

1983 "current_timestamp", 

1984 "database", 

1985 "default", 

1986 "deferrable", 

1987 "deferred", 

1988 "delete", 

1989 "desc", 

1990 "detach", 

1991 "distinct", 

1992 "drop", 

1993 "each", 

1994 "else", 

1995 "end", 

1996 "escape", 

1997 "except", 

1998 "exclusive", 

1999 "exists", 

2000 "explain", 

2001 "false", 

2002 "fail", 

2003 "for", 

2004 "foreign", 

2005 "from", 

2006 "full", 

2007 "glob", 

2008 "group", 

2009 "having", 

2010 "if", 

2011 "ignore", 

2012 "immediate", 

2013 "in", 

2014 "index", 

2015 "indexed", 

2016 "initially", 

2017 "inner", 

2018 "insert", 

2019 "instead", 

2020 "intersect", 

2021 "into", 

2022 "is", 

2023 "isnull", 

2024 "join", 

2025 "key", 

2026 "left", 

2027 "like", 

2028 "limit", 

2029 "match", 

2030 "natural", 

2031 "not", 

2032 "notnull", 

2033 "null", 

2034 "of", 

2035 "offset", 

2036 "on", 

2037 "or", 

2038 "order", 

2039 "outer", 

2040 "plan", 

2041 "pragma", 

2042 "primary", 

2043 "query", 

2044 "raise", 

2045 "references", 

2046 "reindex", 

2047 "rename", 

2048 "replace", 

2049 "restrict", 

2050 "right", 

2051 "rollback", 

2052 "row", 

2053 "select", 

2054 "set", 

2055 "table", 

2056 "temp", 

2057 "temporary", 

2058 "then", 

2059 "to", 

2060 "transaction", 

2061 "trigger", 

2062 "true", 

2063 "union", 

2064 "unique", 

2065 "update", 

2066 "using", 

2067 "vacuum", 

2068 "values", 

2069 "view", 

2070 "virtual", 

2071 "when", 

2072 "where", 

2073 } 

2074 

2075 

2076class SQLiteExecutionContext(default.DefaultExecutionContext): 

2077 @util.memoized_property 

2078 def _preserve_raw_colnames(self): 

2079 return ( 

2080 not self.dialect._broken_dotted_colnames 

2081 or self.execution_options.get("sqlite_raw_colnames", False) 

2082 ) 

2083 

2084 def _translate_colname(self, colname): 

2085 # TODO: detect SQLite version 3.10.0 or greater; 

2086 # see [ticket:3633] 

2087 

2088 # adjust for dotted column names. SQLite 

2089 # in the case of UNION may store col names as 

2090 # "tablename.colname", or if using an attached database, 

2091 # "database.tablename.colname", in cursor.description 

2092 if not self._preserve_raw_colnames and "." in colname: 

2093 return colname.split(".")[-1], colname 

2094 else: 

2095 return colname, None 

2096 

2097 

2098class SQLiteDialect(default.DefaultDialect): 

2099 name = "sqlite" 

2100 supports_alter = False 

2101 

2102 # SQlite supports "DEFAULT VALUES" but *does not* support 

2103 # "VALUES (DEFAULT)" 

2104 supports_default_values = True 

2105 supports_default_metavalue = False 

2106 

2107 # sqlite issue: 

2108 # https://github.com/python/cpython/issues/93421 

2109 # note this parameter is no longer used by the ORM or default dialect 

2110 # see #9414 

2111 supports_sane_rowcount_returning = False 

2112 

2113 supports_empty_insert = False 

2114 supports_cast = True 

2115 supports_multivalues_insert = True 

2116 use_insertmanyvalues = True 

2117 tuple_in_values = True 

2118 supports_statement_cache = True 

2119 insert_null_pk_still_autoincrements = True 

2120 insert_returning = True 

2121 update_returning = True 

2122 update_returning_multifrom = True 

2123 delete_returning = True 

2124 update_returning_multifrom = True 

2125 

2126 supports_default_metavalue = True 

2127 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

2128 

2129 default_metavalue_token = "NULL" 

2130 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

2131 parenthesis.""" 

2132 

2133 default_paramstyle = "qmark" 

2134 execution_ctx_cls = SQLiteExecutionContext 

2135 statement_compiler = SQLiteCompiler 

2136 ddl_compiler = SQLiteDDLCompiler 

2137 type_compiler_cls = SQLiteTypeCompiler 

2138 preparer = SQLiteIdentifierPreparer 

2139 ischema_names = ischema_names 

2140 colspecs = colspecs 

2141 

2142 construct_arguments = [ 

2143 ( 

2144 sa_schema.Table, 

2145 { 

2146 "autoincrement": False, 

2147 "with_rowid": True, 

2148 "strict": False, 

2149 }, 

2150 ), 

2151 (sa_schema.Index, {"where": None}), 

2152 ( 

2153 sa_schema.Column, 

2154 { 

2155 "on_conflict_primary_key": None, 

2156 "on_conflict_not_null": None, 

2157 "on_conflict_unique": None, 

2158 }, 

2159 ), 

2160 (sa_schema.Constraint, {"on_conflict": None}), 

2161 (sa_ddl.CreateView, {"if_not_exists": False}), 

2162 ] 

2163 

2164 _broken_fk_pragma_quotes = False 

2165 _broken_dotted_colnames = False 

2166 

2167 def __init__( 

2168 self, 

2169 native_datetime: bool = False, 

2170 json_serializer: Optional[Callable[..., Any]] = None, 

2171 json_deserializer: Optional[Callable[..., Any]] = None, 

2172 **kwargs: Any, 

2173 ) -> None: 

2174 default.DefaultDialect.__init__(self, **kwargs) 

2175 

2176 self._json_serializer = json_serializer 

2177 self._json_deserializer = json_deserializer 

2178 

2179 # this flag used by pysqlite dialect, and perhaps others in the 

2180 # future, to indicate the driver is handling date/timestamp 

2181 # conversions (and perhaps datetime/time as well on some hypothetical 

2182 # driver ?) 

2183 self.native_datetime = native_datetime 

2184 

2185 if self.dbapi is not None: 

2186 if self.dbapi.sqlite_version_info < (3, 7, 16): 

2187 util.warn( 

2188 "SQLite version %s is older than 3.7.16, and will not " 

2189 "support right nested joins, as are sometimes used in " 

2190 "more complex ORM scenarios. SQLAlchemy 1.4 and above " 

2191 "no longer tries to rewrite these joins." 

2192 % (self.dbapi.sqlite_version_info,) 

2193 ) 

2194 

2195 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These 

2196 # version checks are getting very stale. 

2197 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( 

2198 3, 

2199 10, 

2200 0, 

2201 ) 

2202 self.supports_default_values = self.dbapi.sqlite_version_info >= ( 

2203 3, 

2204 3, 

2205 8, 

2206 ) 

2207 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) 

2208 self.supports_multivalues_insert = ( 

2209 # https://www.sqlite.org/releaselog/3_7_11.html 

2210 self.dbapi.sqlite_version_info 

2211 >= (3, 7, 11) 

2212 ) 

2213 # see https://www.sqlalchemy.org/trac/ticket/2568 

2214 # as well as https://www.sqlite.org/src/info/600482d161 

2215 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( 

2216 3, 

2217 6, 

2218 14, 

2219 ) 

2220 

2221 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: 

2222 self.update_returning = self.delete_returning = ( 

2223 self.insert_returning 

2224 ) = False 

2225 

2226 if self.dbapi.sqlite_version_info < (3, 32, 0): 

2227 # https://www.sqlite.org/limits.html 

2228 self.insertmanyvalues_max_parameters = 999 

2229 

2230 _isolation_lookup = util.immutabledict( 

2231 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} 

2232 ) 

2233 

2234 def get_isolation_level_values(self, dbapi_connection): 

2235 return list(self._isolation_lookup) 

2236 

2237 def set_isolation_level( 

2238 self, dbapi_connection: DBAPIConnection, level: IsolationLevel 

2239 ) -> None: 

2240 isolation_level = self._isolation_lookup[level] 

2241 

2242 cursor = dbapi_connection.cursor() 

2243 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") 

2244 cursor.close() 

2245 

2246 def get_isolation_level(self, dbapi_connection): 

2247 cursor = dbapi_connection.cursor() 

2248 cursor.execute("PRAGMA read_uncommitted") 

2249 res = cursor.fetchone() 

2250 if res: 

2251 value = res[0] 

2252 else: 

2253 # https://www.sqlite.org/changes.html#version_3_3_3 

2254 # "Optional READ UNCOMMITTED isolation (instead of the 

2255 # default isolation level of SERIALIZABLE) and 

2256 # table level locking when database connections 

2257 # share a common cache."" 

2258 # pre-SQLite 3.3.0 default to 0 

2259 value = 0 

2260 cursor.close() 

2261 if value == 0: 

2262 return "SERIALIZABLE" 

2263 elif value == 1: 

2264 return "READ UNCOMMITTED" 

2265 else: 

2266 assert False, "Unknown isolation level %s" % value 

2267 

2268 @reflection.cache 

2269 def get_schema_names(self, connection, **kw): 

2270 s = "PRAGMA database_list" 

2271 dl = connection.exec_driver_sql(s) 

2272 

2273 return [db[1] for db in dl if db[1] != "temp"] 

2274 

2275 def _format_schema(self, schema, table_name): 

2276 if schema is not None: 

2277 qschema = self.identifier_preparer.quote_identifier(schema) 

2278 name = f"{qschema}.{table_name}" 

2279 else: 

2280 name = table_name 

2281 return name 

2282 

2283 def _sqlite_main_query( 

2284 self, 

2285 table: str, 

2286 type_: str, 

2287 schema: Optional[str], 

2288 sqlite_include_internal: bool, 

2289 ): 

2290 main = self._format_schema(schema, table) 

2291 if not sqlite_include_internal: 

2292 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" 

2293 else: 

2294 filter_table = "" 

2295 query = ( 

2296 f"SELECT name FROM {main} " 

2297 f"WHERE type='{type_}'{filter_table} " 

2298 "ORDER BY name" 

2299 ) 

2300 return query 

2301 

2302 @reflection.cache 

2303 def get_table_names( 

2304 self, connection, schema=None, sqlite_include_internal=False, **kw 

2305 ): 

2306 query = self._sqlite_main_query( 

2307 "sqlite_master", "table", schema, sqlite_include_internal 

2308 ) 

2309 names = connection.exec_driver_sql(query).scalars().all() 

2310 return names 

2311 

2312 @reflection.cache 

2313 def get_temp_table_names( 

2314 self, connection, sqlite_include_internal=False, **kw 

2315 ): 

2316 query = self._sqlite_main_query( 

2317 "sqlite_temp_master", "table", None, sqlite_include_internal 

2318 ) 

2319 names = connection.exec_driver_sql(query).scalars().all() 

2320 return names 

2321 

2322 @reflection.cache 

2323 def get_temp_view_names( 

2324 self, connection, sqlite_include_internal=False, **kw 

2325 ): 

2326 query = self._sqlite_main_query( 

2327 "sqlite_temp_master", "view", None, sqlite_include_internal 

2328 ) 

2329 names = connection.exec_driver_sql(query).scalars().all() 

2330 return names 

2331 

2332 @reflection.cache 

2333 def has_table(self, connection, table_name, schema=None, **kw): 

2334 self._ensure_has_table_connection(connection) 

2335 

2336 if schema is not None and schema not in self.get_schema_names( 

2337 connection, **kw 

2338 ): 

2339 return False 

2340 

2341 info = self._get_table_pragma( 

2342 connection, "table_info", table_name, schema=schema 

2343 ) 

2344 return bool(info) 

2345 

2346 def _get_default_schema_name(self, connection): 

2347 return "main" 

2348 

2349 @reflection.cache 

2350 def get_view_names( 

2351 self, connection, schema=None, sqlite_include_internal=False, **kw 

2352 ): 

2353 query = self._sqlite_main_query( 

2354 "sqlite_master", "view", schema, sqlite_include_internal 

2355 ) 

2356 names = connection.exec_driver_sql(query).scalars().all() 

2357 return names 

2358 

2359 @reflection.cache 

2360 def get_view_definition(self, connection, view_name, schema=None, **kw): 

2361 if schema is not None: 

2362 qschema = self.identifier_preparer.quote_identifier(schema) 

2363 master = f"{qschema}.sqlite_master" 

2364 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( 

2365 master, 

2366 ) 

2367 rs = connection.exec_driver_sql(s, (view_name,)) 

2368 else: 

2369 try: 

2370 s = ( 

2371 "SELECT sql FROM " 

2372 " (SELECT * FROM sqlite_master UNION ALL " 

2373 " SELECT * FROM sqlite_temp_master) " 

2374 "WHERE name = ? " 

2375 "AND type='view'" 

2376 ) 

2377 rs = connection.exec_driver_sql(s, (view_name,)) 

2378 except exc.DBAPIError: 

2379 s = ( 

2380 "SELECT sql FROM sqlite_master WHERE name = ? " 

2381 "AND type='view'" 

2382 ) 

2383 rs = connection.exec_driver_sql(s, (view_name,)) 

2384 

2385 result = rs.fetchall() 

2386 if result: 

2387 return result[0].sql 

2388 else: 

2389 raise exc.NoSuchTableError( 

2390 f"{schema}.{view_name}" if schema else view_name 

2391 ) 

2392 

2393 @reflection.cache 

2394 def get_columns(self, connection, table_name, schema=None, **kw): 

2395 pragma = "table_info" 

2396 # computed columns are threaded as hidden, they require table_xinfo 

2397 if self.server_version_info >= (3, 31): 

2398 pragma = "table_xinfo" 

2399 info = self._get_table_pragma( 

2400 connection, pragma, table_name, schema=schema 

2401 ) 

2402 columns = [] 

2403 tablesql = None 

2404 for row in info: 

2405 name = row[1] 

2406 type_ = row[2].upper() 

2407 nullable = not row[3] 

2408 default = row[4] 

2409 primary_key = row[5] 

2410 hidden = row[6] if pragma == "table_xinfo" else 0 

2411 

2412 # hidden has value 0 for normal columns, 1 for hidden columns, 

2413 # 2 for computed virtual columns and 3 for computed stored columns 

2414 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b 

2415 if hidden == 1: 

2416 continue 

2417 

2418 generated = bool(hidden) 

2419 persisted = hidden == 3 

2420 

2421 if tablesql is None and generated: 

2422 tablesql = self._get_table_sql( 

2423 connection, table_name, schema, **kw 

2424 ) 

2425 # remove create table 

2426 match = re.match( 

2427 ( 

2428 r"create table .*?\((.*)\)" 

2429 r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$" 

2430 ), 

2431 tablesql.strip(), 

2432 re.DOTALL | re.IGNORECASE, 

2433 ) 

2434 assert match, f"create table not found in {tablesql}" 

2435 tablesql = match.group(1).strip() 

2436 

2437 columns.append( 

2438 self._get_column_info( 

2439 name, 

2440 type_, 

2441 nullable, 

2442 default, 

2443 primary_key, 

2444 generated, 

2445 persisted, 

2446 tablesql, 

2447 ) 

2448 ) 

2449 if columns: 

2450 return columns 

2451 elif not self.has_table(connection, table_name, schema): 

2452 raise exc.NoSuchTableError( 

2453 f"{schema}.{table_name}" if schema else table_name 

2454 ) 

2455 else: 

2456 return ReflectionDefaults.columns() 

2457 

2458 def _get_column_info( 

2459 self, 

2460 name, 

2461 type_, 

2462 nullable, 

2463 default, 

2464 primary_key, 

2465 generated, 

2466 persisted, 

2467 tablesql, 

2468 ): 

2469 if generated: 

2470 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" 

2471 # somehow is "INTEGER GENERATED ALWAYS" 

2472 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) 

2473 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() 

2474 

2475 coltype = self._resolve_type_affinity(type_) 

2476 

2477 if default is not None: 

2478 default = str(default) 

2479 

2480 colspec = { 

2481 "name": name, 

2482 "type": coltype, 

2483 "nullable": nullable, 

2484 "default": default, 

2485 "primary_key": primary_key, 

2486 } 

2487 if generated: 

2488 sqltext = "" 

2489 if tablesql: 

2490 pattern = ( 

2491 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS" 

2492 r"\s+\((.*)\)\s*(?:virtual|stored)?" 

2493 ) 

2494 match = re.search( 

2495 re.escape(name) + pattern, tablesql, re.IGNORECASE 

2496 ) 

2497 if match: 

2498 sqltext = match.group(1) 

2499 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} 

2500 return colspec 

2501 

2502 def _resolve_type_affinity(self, type_): 

2503 """Return a data type from a reflected column, using affinity rules. 

2504 

2505 SQLite's goal for universal compatibility introduces some complexity 

2506 during reflection, as a column's defined type might not actually be a 

2507 type that SQLite understands - or indeed, my not be defined *at all*. 

2508 Internally, SQLite handles this with a 'data type affinity' for each 

2509 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', 

2510 'REAL', or 'NONE' (raw bits). The algorithm that determines this is 

2511 listed in https://www.sqlite.org/datatype3.html section 2.1. 

2512 

2513 This method allows SQLAlchemy to support that algorithm, while still 

2514 providing access to smarter reflection utilities by recognizing 

2515 column definitions that SQLite only supports through affinity (like 

2516 DATE and DOUBLE). 

2517 

2518 """ 

2519 match = re.match(r"([\w ]+)(\(.*?\))?", type_) 

2520 if match: 

2521 coltype = match.group(1) 

2522 args = match.group(2) 

2523 else: 

2524 coltype = "" 

2525 args = "" 

2526 

2527 if coltype in self.ischema_names: 

2528 coltype = self.ischema_names[coltype] 

2529 elif "INT" in coltype: 

2530 coltype = sqltypes.INTEGER 

2531 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: 

2532 coltype = sqltypes.TEXT 

2533 elif "BLOB" in coltype or not coltype: 

2534 coltype = sqltypes.NullType 

2535 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: 

2536 coltype = sqltypes.REAL 

2537 else: 

2538 coltype = sqltypes.NUMERIC 

2539 

2540 if args is not None: 

2541 args = re.findall(r"(\d+)", args) 

2542 try: 

2543 coltype = coltype(*[int(a) for a in args]) 

2544 except TypeError: 

2545 util.warn( 

2546 "Could not instantiate type %s with " 

2547 "reflected arguments %s; using no arguments." 

2548 % (coltype, args) 

2549 ) 

2550 coltype = coltype() 

2551 else: 

2552 coltype = coltype() 

2553 

2554 return coltype 

2555 

2556 @reflection.cache 

2557 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

2558 constraint_name = None 

2559 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2560 if table_data: 

2561 PK_PATTERN = r'CONSTRAINT +(?:"(.+?)"|(\w+)) +PRIMARY KEY' 

2562 result = re.search(PK_PATTERN, table_data, re.I) 

2563 if result: 

2564 constraint_name = result.group(1) or result.group(2) 

2565 else: 

2566 constraint_name = None 

2567 

2568 cols = self.get_columns(connection, table_name, schema, **kw) 

2569 # consider only pk columns. This also avoids sorting the cached 

2570 # value returned by get_columns 

2571 cols = [col for col in cols if col.get("primary_key", 0) > 0] 

2572 cols.sort(key=lambda col: col.get("primary_key")) 

2573 pkeys = [col["name"] for col in cols] 

2574 

2575 if pkeys: 

2576 return {"constrained_columns": pkeys, "name": constraint_name} 

2577 else: 

2578 return ReflectionDefaults.pk_constraint() 

2579 

2580 @reflection.cache 

2581 def get_foreign_keys(self, connection, table_name, schema=None, **kw): 

2582 # sqlite makes this *extremely difficult*. 

2583 # First, use the pragma to get the actual FKs. 

2584 pragma_fks = self._get_table_pragma( 

2585 connection, "foreign_key_list", table_name, schema=schema 

2586 ) 

2587 

2588 fks = {} 

2589 

2590 for row in pragma_fks: 

2591 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) 

2592 

2593 if not rcol: 

2594 # no referred column, which means it was not named in the 

2595 # original DDL. The referred columns of the foreign key 

2596 # constraint are therefore the primary key of the referred 

2597 # table. 

2598 try: 

2599 referred_pk = self.get_pk_constraint( 

2600 connection, rtbl, schema=schema, **kw 

2601 ) 

2602 referred_columns = referred_pk["constrained_columns"] 

2603 except exc.NoSuchTableError: 

2604 # ignore not existing parents 

2605 referred_columns = [] 

2606 else: 

2607 # note we use this list only if this is the first column 

2608 # in the constraint. for subsequent columns we ignore the 

2609 # list and append "rcol" if present. 

2610 referred_columns = [] 

2611 

2612 if self._broken_fk_pragma_quotes: 

2613 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) 

2614 

2615 if numerical_id in fks: 

2616 fk = fks[numerical_id] 

2617 else: 

2618 fk = fks[numerical_id] = { 

2619 "name": None, 

2620 "constrained_columns": [], 

2621 "referred_schema": schema, 

2622 "referred_table": rtbl, 

2623 "referred_columns": referred_columns, 

2624 "options": {}, 

2625 } 

2626 fks[numerical_id] = fk 

2627 

2628 fk["constrained_columns"].append(lcol) 

2629 

2630 if rcol: 

2631 fk["referred_columns"].append(rcol) 

2632 

2633 def fk_sig(constrained_columns, referred_table, referred_columns): 

2634 return ( 

2635 tuple(constrained_columns) 

2636 + (referred_table,) 

2637 + tuple(referred_columns) 

2638 ) 

2639 

2640 # then, parse the actual SQL and attempt to find DDL that matches 

2641 # the names as well. SQLite saves the DDL in whatever format 

2642 # it was typed in as, so need to be liberal here. 

2643 

2644 keys_by_signature = { 

2645 fk_sig( 

2646 fk["constrained_columns"], 

2647 fk["referred_table"], 

2648 fk["referred_columns"], 

2649 ): fk 

2650 for fk in fks.values() 

2651 } 

2652 

2653 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2654 

2655 def parse_fks(): 

2656 if table_data is None: 

2657 # system tables, etc. 

2658 return 

2659 

2660 # note that we already have the FKs from PRAGMA above. This whole 

2661 # regexp thing is trying to locate additional detail about the 

2662 # FKs, namely the name of the constraint and other options. 

2663 # so parsing the columns is really about matching it up to what 

2664 # we already have. 

2665 FK_PATTERN = ( 

2666 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?' 

2667 r"FOREIGN KEY *\( *(.+?) *\) +" 

2668 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501 

2669 r"((?:ON (?:DELETE|UPDATE) " 

2670 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" 

2671 r"((?:NOT +)?DEFERRABLE)?" 

2672 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" 

2673 ) 

2674 for match in re.finditer(FK_PATTERN, table_data, re.I): 

2675 ( 

2676 constraint_quoted_name, 

2677 constraint_name, 

2678 constrained_columns, 

2679 referred_quoted_name, 

2680 referred_name, 

2681 referred_columns, 

2682 onupdatedelete, 

2683 deferrable, 

2684 initially, 

2685 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8, 9) 

2686 constraint_name = constraint_quoted_name or constraint_name 

2687 constrained_columns = list( 

2688 self._find_cols_in_sig(constrained_columns) 

2689 ) 

2690 if not referred_columns: 

2691 referred_columns = constrained_columns 

2692 else: 

2693 referred_columns = list( 

2694 self._find_cols_in_sig(referred_columns) 

2695 ) 

2696 referred_name = referred_quoted_name or referred_name 

2697 options = {} 

2698 

2699 for token in re.split(r" *\bON\b *", onupdatedelete.upper()): 

2700 if token.startswith("DELETE"): 

2701 ondelete = token[6:].strip() 

2702 if ondelete and ondelete != "NO ACTION": 

2703 options["ondelete"] = ondelete 

2704 elif token.startswith("UPDATE"): 

2705 onupdate = token[6:].strip() 

2706 if onupdate and onupdate != "NO ACTION": 

2707 options["onupdate"] = onupdate 

2708 

2709 if deferrable: 

2710 options["deferrable"] = "NOT" not in deferrable.upper() 

2711 if initially: 

2712 options["initially"] = initially.upper() 

2713 

2714 yield ( 

2715 constraint_name, 

2716 constrained_columns, 

2717 referred_name, 

2718 referred_columns, 

2719 options, 

2720 ) 

2721 

2722 fkeys = [] 

2723 

2724 for ( 

2725 constraint_name, 

2726 constrained_columns, 

2727 referred_name, 

2728 referred_columns, 

2729 options, 

2730 ) in parse_fks(): 

2731 sig = fk_sig(constrained_columns, referred_name, referred_columns) 

2732 if sig not in keys_by_signature: 

2733 util.warn( 

2734 "WARNING: SQL-parsed foreign key constraint " 

2735 "'%s' could not be located in PRAGMA " 

2736 "foreign_keys for table %s" % (sig, table_name) 

2737 ) 

2738 continue 

2739 key = keys_by_signature.pop(sig) 

2740 key["name"] = constraint_name 

2741 key["options"] = options 

2742 fkeys.append(key) 

2743 # assume the remainders are the unnamed, inline constraints, just 

2744 # use them as is as it's extremely difficult to parse inline 

2745 # constraints 

2746 fkeys.extend(keys_by_signature.values()) 

2747 if fkeys: 

2748 return fkeys 

2749 else: 

2750 return ReflectionDefaults.foreign_keys() 

2751 

2752 def _find_cols_in_sig(self, sig): 

2753 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): 

2754 yield match.group(1) or match.group(2) 

2755 

2756 @reflection.cache 

2757 def get_unique_constraints( 

2758 self, connection, table_name, schema=None, **kw 

2759 ): 

2760 auto_index_by_sig = {} 

2761 for idx in self.get_indexes( 

2762 connection, 

2763 table_name, 

2764 schema=schema, 

2765 include_auto_indexes=True, 

2766 **kw, 

2767 ): 

2768 if not idx["name"].startswith("sqlite_autoindex"): 

2769 continue 

2770 sig = tuple(idx["column_names"]) 

2771 auto_index_by_sig[sig] = idx 

2772 

2773 table_data = self._get_table_sql( 

2774 connection, table_name, schema=schema, **kw 

2775 ) 

2776 unique_constraints = [] 

2777 

2778 def parse_uqs(): 

2779 if table_data is None: 

2780 return 

2781 UNIQUE_PATTERN = ( 

2782 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?UNIQUE *\((.+?)\)' 

2783 ) 

2784 INLINE_UNIQUE_PATTERN = ( 

2785 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]' 

2786 r"+[a-z0-9_ ]+?[\t ]+UNIQUE" 

2787 ) 

2788 

2789 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): 

2790 quoted_name, unquoted_name, cols = match.group(1, 2, 3) 

2791 name = quoted_name or unquoted_name 

2792 yield name, list(self._find_cols_in_sig(cols)) 

2793 

2794 # we need to match inlines as well, as we seek to differentiate 

2795 # a UNIQUE constraint from a UNIQUE INDEX, even though these 

2796 # are kind of the same thing :) 

2797 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): 

2798 cols = list( 

2799 self._find_cols_in_sig(match.group(1) or match.group(2)) 

2800 ) 

2801 yield None, cols 

2802 

2803 for name, cols in parse_uqs(): 

2804 sig = tuple(cols) 

2805 if sig in auto_index_by_sig: 

2806 auto_index_by_sig.pop(sig) 

2807 parsed_constraint = {"name": name, "column_names": cols} 

2808 unique_constraints.append(parsed_constraint) 

2809 # NOTE: auto_index_by_sig might not be empty here, 

2810 # the PRIMARY KEY may have an entry. 

2811 if unique_constraints: 

2812 return unique_constraints 

2813 else: 

2814 return ReflectionDefaults.unique_constraints() 

2815 

2816 @reflection.cache 

2817 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

2818 table_data = self._get_table_sql( 

2819 connection, table_name, schema=schema, **kw 

2820 ) 

2821 

2822 # Extract CHECK constraints by properly handling balanced parentheses 

2823 # and avoiding false matches when CHECK/CONSTRAINT appear in table 

2824 # names. See #12924 for context. 

2825 # 

2826 # SQLite supports 4 identifier quote styles (see 

2827 # sqlite.org/lang_keywords.html): 

2828 # - Double quotes "..." (standard SQL) 

2829 # - Brackets [...] (MS Access/SQL Server compatibility) 

2830 # - Backticks `...` (MySQL compatibility) 

2831 # - Single quotes '...' (SQLite extension) 

2832 # 

2833 # NOTE: there is not currently a way to parse CHECK constraints that 

2834 # contain newlines as the approach here relies upon each individual 

2835 # CHECK constraint being on a single line by itself. This necessarily 

2836 # makes assumptions as to how the CREATE TABLE was emitted. 

2837 CHECK_PATTERN = re.compile( 

2838 r""" 

2839 (?<![A-Za-z0-9_]) # Negative lookbehind: ensure CHECK is not 

2840 # part of an identifier (e.g., table name 

2841 # like "tableCHECK") 

2842 

2843 (?: # Optional CONSTRAINT clause 

2844 CONSTRAINT\s+ 

2845 ( # Group 1: Constraint name (quoted or unquoted) 

2846 "(?:[^"]|"")+" # Double-quoted: "name" or "na""me" 

2847 |'(?:[^']|'')+' # Single-quoted: 'name' or 'na''me' 

2848 |\[(?:[^\]]|\]\])+\] # Bracket-quoted: [name] or [na]]me] 

2849 |`(?:[^`]|``)+` # Backtick-quoted: `name` or `na``me` 

2850 |\S+ # Unquoted: simple_name 

2851 ) 

2852 \s+ 

2853 )? 

2854 

2855 CHECK\s*\( # CHECK keyword followed by opening paren 

2856 """, 

2857 re.VERBOSE | re.IGNORECASE, 

2858 ) 

2859 cks = [] 

2860 

2861 for match in re.finditer(CHECK_PATTERN, table_data or ""): 

2862 constraint_name = match.group(1) 

2863 

2864 if constraint_name: 

2865 # Remove surrounding quotes if present 

2866 # Double quotes: "name" -> name 

2867 # Single quotes: 'name' -> name 

2868 # Brackets: [name] -> name 

2869 # Backticks: `name` -> name 

2870 constraint_name = re.sub( 

2871 r'^(["\'`])(.+)\1$|^\[(.+)\]$', 

2872 lambda m: m.group(2) or m.group(3), 

2873 constraint_name, 

2874 flags=re.DOTALL, 

2875 ) 

2876 

2877 # Find the matching closing parenthesis by counting balanced parens 

2878 # Must track string context to ignore parens inside string literals 

2879 start = match.end() # Position after 'CHECK (' 

2880 paren_count = 1 

2881 in_single_quote = False 

2882 in_double_quote = False 

2883 

2884 for pos, char in enumerate(table_data[start:], start): 

2885 # Track string literal context 

2886 if char == "'" and not in_double_quote: 

2887 in_single_quote = not in_single_quote 

2888 elif char == '"' and not in_single_quote: 

2889 in_double_quote = not in_double_quote 

2890 # Only count parens when not inside a string literal 

2891 elif not in_single_quote and not in_double_quote: 

2892 if char == "(": 

2893 paren_count += 1 

2894 elif char == ")": 

2895 paren_count -= 1 

2896 if paren_count == 0: 

2897 # Successfully found matching closing parenthesis 

2898 sqltext = table_data[start:pos].strip() 

2899 cks.append( 

2900 {"sqltext": sqltext, "name": constraint_name} 

2901 ) 

2902 break 

2903 

2904 cks.sort(key=lambda d: d["name"] or "~") # sort None as last 

2905 if cks: 

2906 return cks 

2907 else: 

2908 return ReflectionDefaults.check_constraints() 

2909 

2910 @reflection.cache 

2911 def get_indexes(self, connection, table_name, schema=None, **kw): 

2912 pragma_indexes = self._get_table_pragma( 

2913 connection, "index_list", table_name, schema=schema 

2914 ) 

2915 indexes = [] 

2916 

2917 # regular expression to extract the filter predicate of a partial 

2918 # index. this could fail to extract the predicate correctly on 

2919 # indexes created like 

2920 # CREATE INDEX i ON t (col || ') where') WHERE col <> '' 

2921 # but as this function does not support expression-based indexes 

2922 # this case does not occur. 

2923 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) 

2924 

2925 if schema: 

2926 schema_expr = "%s." % self.identifier_preparer.quote_identifier( 

2927 schema 

2928 ) 

2929 else: 

2930 schema_expr = "" 

2931 

2932 include_auto_indexes = kw.pop("include_auto_indexes", False) 

2933 for row in pragma_indexes: 

2934 # ignore implicit primary key index. 

2935 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html 

2936 if not include_auto_indexes and row[1].startswith( 

2937 "sqlite_autoindex" 

2938 ): 

2939 continue 

2940 indexes.append( 

2941 dict( 

2942 name=row[1], 

2943 column_names=[], 

2944 unique=row[2], 

2945 dialect_options={}, 

2946 ) 

2947 ) 

2948 

2949 # check partial indexes 

2950 if len(row) >= 5 and row[4]: 

2951 s = ( 

2952 "SELECT sql FROM %(schema)ssqlite_master " 

2953 "WHERE name = ? " 

2954 "AND type = 'index'" % {"schema": schema_expr} 

2955 ) 

2956 rs = connection.exec_driver_sql(s, (row[1],)) 

2957 index_sql = rs.scalar() 

2958 predicate_match = partial_pred_re.search(index_sql) 

2959 if predicate_match is None: 

2960 # unless the regex is broken this case shouldn't happen 

2961 # because we know this is a partial index, so the 

2962 # definition sql should match the regex 

2963 util.warn( 

2964 "Failed to look up filter predicate of " 

2965 "partial index %s" % row[1] 

2966 ) 

2967 else: 

2968 predicate = predicate_match.group(1) 

2969 indexes[-1]["dialect_options"]["sqlite_where"] = text( 

2970 predicate 

2971 ) 

2972 

2973 # loop thru unique indexes to get the column names. 

2974 for idx in list(indexes): 

2975 pragma_index = self._get_table_pragma( 

2976 connection, "index_info", idx["name"], schema=schema 

2977 ) 

2978 

2979 for row in pragma_index: 

2980 if row[2] is None: 

2981 util.warn( 

2982 "Skipped unsupported reflection of " 

2983 "expression-based index %s" % idx["name"] 

2984 ) 

2985 indexes.remove(idx) 

2986 break 

2987 else: 

2988 idx["column_names"].append(row[2]) 

2989 

2990 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last 

2991 if indexes: 

2992 return indexes 

2993 elif not self.has_table(connection, table_name, schema): 

2994 raise exc.NoSuchTableError( 

2995 f"{schema}.{table_name}" if schema else table_name 

2996 ) 

2997 else: 

2998 return ReflectionDefaults.indexes() 

2999 

3000 def _is_sys_table(self, table_name): 

3001 return table_name in { 

3002 "sqlite_schema", 

3003 "sqlite_master", 

3004 "sqlite_temp_schema", 

3005 "sqlite_temp_master", 

3006 } 

3007 

3008 @reflection.cache 

3009 def _get_table_sql(self, connection, table_name, schema=None, **kw): 

3010 if schema: 

3011 schema_expr = "%s." % ( 

3012 self.identifier_preparer.quote_identifier(schema) 

3013 ) 

3014 else: 

3015 schema_expr = "" 

3016 try: 

3017 s = ( 

3018 "SELECT sql FROM " 

3019 " (SELECT * FROM %(schema)ssqlite_master UNION ALL " 

3020 " SELECT * FROM %(schema)ssqlite_temp_master) " 

3021 "WHERE name = ? " 

3022 "AND type in ('table', 'view')" % {"schema": schema_expr} 

3023 ) 

3024 rs = connection.exec_driver_sql(s, (table_name,)) 

3025 except exc.DBAPIError: 

3026 s = ( 

3027 "SELECT sql FROM %(schema)ssqlite_master " 

3028 "WHERE name = ? " 

3029 "AND type in ('table', 'view')" % {"schema": schema_expr} 

3030 ) 

3031 rs = connection.exec_driver_sql(s, (table_name,)) 

3032 value = rs.scalar() 

3033 if value is None and not self._is_sys_table(table_name): 

3034 raise exc.NoSuchTableError(f"{schema_expr}{table_name}") 

3035 return value 

3036 

3037 def _get_table_pragma(self, connection, pragma, table_name, schema=None): 

3038 quote = self.identifier_preparer.quote_identifier 

3039 if schema is not None: 

3040 statements = [f"PRAGMA {quote(schema)}."] 

3041 else: 

3042 # because PRAGMA looks in all attached databases if no schema 

3043 # given, need to specify "main" schema, however since we want 

3044 # 'temp' tables in the same namespace as 'main', need to run 

3045 # the PRAGMA twice 

3046 statements = ["PRAGMA main.", "PRAGMA temp."] 

3047 

3048 qtable = quote(table_name) 

3049 for statement in statements: 

3050 statement = f"{statement}{pragma}({qtable})" 

3051 cursor = connection.exec_driver_sql(statement) 

3052 if not cursor._soft_closed: 

3053 # work around SQLite issue whereby cursor.description 

3054 # is blank when PRAGMA returns no rows: 

3055 # https://www.sqlite.org/cvstrac/tktview?tn=1884 

3056 result = cursor.fetchall() 

3057 else: 

3058 result = [] 

3059 if result: 

3060 return result 

3061 else: 

3062 return []