Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

775 statements  

1# dialects/sqlite/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r''' 

11.. dialect:: sqlite 

12 :name: SQLite 

13 :normal_support: 3.12+ 

14 :best_effort: 3.7.16+ 

15 

16.. _sqlite_datetime: 

17 

18Date and Time Types 

19------------------- 

20 

21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does 

22not provide out of the box functionality for translating values between Python 

23`datetime` objects and a SQLite-supported format. SQLAlchemy's own 

24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting 

25and parsing functionality when SQLite is used. The implementation classes are 

26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. 

27These types represent dates and times as ISO formatted strings, which also 

28nicely support ordering. There's no reliance on typical "libc" internals for 

29these functions so historical dates are fully supported. 

30 

31Ensuring Text affinity 

32^^^^^^^^^^^^^^^^^^^^^^ 

33 

34The DDL rendered for these types is the standard ``DATE``, ``TIME`` 

35and ``DATETIME`` indicators. However, custom storage formats can also be 

36applied to these types. When the 

37storage format is detected as containing no alpha characters, the DDL for 

38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, 

39so that the column continues to have textual affinity. 

40 

41.. seealso:: 

42 

43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - 

44 in the SQLite documentation 

45 

46.. _sqlite_autoincrement: 

47 

48SQLite Auto Incrementing Behavior 

49---------------------------------- 

50 

51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html 

52 

53Key concepts: 

54 

55* SQLite has an implicit "auto increment" feature that takes place for any 

56 non-composite primary-key column that is specifically created using 

57 "INTEGER PRIMARY KEY" for the type + primary key. 

58 

59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** 

60 equivalent to the implicit autoincrement feature; this keyword is not 

61 recommended for general use. SQLAlchemy does not render this keyword 

62 unless a special SQLite-specific directive is used (see below). However, 

63 it still requires that the column's type is named "INTEGER". 

64 

65Using the AUTOINCREMENT Keyword 

66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

67 

68To specifically render the AUTOINCREMENT keyword on the primary key column 

69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table 

70construct:: 

71 

72 Table( 

73 "sometable", 

74 metadata, 

75 Column("id", Integer, primary_key=True), 

76 sqlite_autoincrement=True, 

77 ) 

78 

79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER 

80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

81 

82SQLite's typing model is based on naming conventions. Among other things, this 

83means that any type name which contains the substring ``"INT"`` will be 

84determined to be of "integer affinity". A type named ``"BIGINT"``, 

85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be 

86of "integer" affinity. However, **the SQLite autoincrement feature, whether 

87implicitly or explicitly enabled, requires that the name of the column's type 

88is exactly the string "INTEGER"**. Therefore, if an application uses a type 

89like :class:`.BigInteger` for a primary key, on SQLite this type will need to 

90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE 

91TABLE`` statement in order for the autoincrement behavior to be available. 

92 

93One approach to achieve this is to use :class:`.Integer` on SQLite 

94only using :meth:`.TypeEngine.with_variant`:: 

95 

96 table = Table( 

97 "my_table", 

98 metadata, 

99 Column( 

100 "id", 

101 BigInteger().with_variant(Integer, "sqlite"), 

102 primary_key=True, 

103 ), 

104 ) 

105 

106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL 

107name to be ``INTEGER`` when compiled against SQLite:: 

108 

109 from sqlalchemy import BigInteger 

110 from sqlalchemy.ext.compiler import compiles 

111 

112 

113 class SLBigInteger(BigInteger): 

114 pass 

115 

116 

117 @compiles(SLBigInteger, "sqlite") 

118 def bi_c(element, compiler, **kw): 

119 return "INTEGER" 

120 

121 

122 @compiles(SLBigInteger) 

123 def bi_c(element, compiler, **kw): 

124 return compiler.visit_BIGINT(element, **kw) 

125 

126 

127 table = Table( 

128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True) 

129 ) 

130 

131.. seealso:: 

132 

133 :meth:`.TypeEngine.with_variant` 

134 

135 :ref:`sqlalchemy.ext.compiler_toplevel` 

136 

137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ 

138 

139.. _sqlite_transactions: 

140 

141Transactions with SQLite and the sqlite3 driver 

142----------------------------------------------- 

143 

144As a file-based database, SQLite's approach to transactions differs from 

145traditional databases in many ways. Additionally, the ``sqlite3`` driver 

146standard with Python (as well as the async version ``aiosqlite`` which builds 

147on top of it) has several quirks, workarounds, and API features in the 

148area of transaction control, all of which generally need to be addressed when 

149constructing a SQLAlchemy application that uses SQLite. 

150 

151Legacy Transaction Mode with the sqlite3 driver 

152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

153 

154The most important aspect of transaction handling with the sqlite3 driver is 

155that it defaults (which will continue through Python 3.15 before being 

156removed in Python 3.16) to legacy transactional behavior which does 

157not strictly follow :pep:`249`. The way in which the driver diverges from the 

158PEP is that it does not "begin" a transaction automatically as dictated by 

159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and 

160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon 

161the first SQL statement of any kind, so that all subsequent operations will 

162be established within a transaction until ``connection.commit()`` has been 

163called. The ``sqlite3`` driver, in an effort to be easier to use in 

164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, 

165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy 

166reasons. Statements such as SAVEPOINT are also skipped. 

167 

168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy 

169mode of operation is referred to as 

170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in 

171effect by default due to the ``Connection.autocommit`` parameter being set to 

172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12, 

173the ``Connection.autocommit`` attribute did not exist. 

174 

175The implications of legacy transaction mode include: 

176 

177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE, 

178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not 

179 started already, leading to the changes by each statement being 

180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very 

181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these 

182 operations even if a transaction were present, however this is no longer the 

183 case. 

184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation 

185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file- 

186 based system that locks the database file entirely for write operations, 

187 preventing COMMIT until all reader transactions (and associated file locks) 

188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT 

189 statements, which causes these SELECT statements to no longer be "repeatable", 

190 failing one of the consistency guarantees of SERIALIZABLE. 

191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not 

192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its 

193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK 

194 of the transaction will not rollback elements that were part of a released 

195 savepoint. 

196 

197Legacy transaction mode first existed in order to faciliate working around 

198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to 

199get "database is locked" errors, particularly when newer features like "write 

200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy 

201transaction mode is still the default mode of operation; disabling it will 

202produce behavior that is more susceptible to locked database errors. However 

203note that **legacy transaction mode will no longer be the default** in a future 

204Python version (3.16 as of this writing). 

205 

206.. _sqlite_enabling_transactions: 

207 

208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver 

209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

210 

211Current SQLAlchemy support allows either for setting the 

212``.Connection.autocommit`` attribute, most directly by using a 

213:func:`._sa.create_engine` parameter, or if on an older version of Python where 

214the attribute is not available, using event hooks to control the behavior of 

215BEGIN. 

216 

217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above) 

218 

219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_, 

220 the most straightforward approach is to set the attribute to its recommended value 

221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``:: 

222 

223 from sqlalchemy import create_engine 

224 

225 engine = create_engine( 

226 "sqlite:///myfile.db", connect_args={"autocommit": False} 

227 ) 

228 

229 This parameter is also passed through when using the aiosqlite driver:: 

230 

231 from sqlalchemy.ext.asyncio import create_async_engine 

232 

233 engine = create_async_engine( 

234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False} 

235 ) 

236 

237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect` 

238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this 

239 attribute on its ``Connection`` object:: 

240 

241 from sqlalchemy import create_engine, event 

242 

243 engine = create_engine("sqlite:///myfile.db") 

244 

245 

246 @event.listens_for(engine, "connect") 

247 def do_connect(dbapi_connection, connection_record): 

248 # enable autocommit=False mode 

249 dbapi_connection.autocommit = False 

250 

251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite) 

252 

253 For older versions of ``sqlite3`` or for cross-compatiblity with older and 

254 newer versions, SQLAlchemy can also take over the job of transaction control. 

255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook 

256 to emit the "BEGIN" command directly, while also disabling SQLite's control 

257 of this command using the :meth:`.PoolEvents.connect` event hook to set the 

258 ``Connection.isolation_level`` attribute to ``None``:: 

259 

260 

261 from sqlalchemy import create_engine, event 

262 

263 engine = create_engine("sqlite:///myfile.db") 

264 

265 

266 @event.listens_for(engine, "connect") 

267 def do_connect(dbapi_connection, connection_record): 

268 # disable sqlite3's emitting of the BEGIN statement entirely. 

269 dbapi_connection.isolation_level = None 

270 

271 

272 @event.listens_for(engine, "begin") 

273 def do_begin(conn): 

274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly 

275 conn.exec_driver_sql("BEGIN") 

276 

277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine`` 

278 as in the example below:: 

279 

280 from sqlalchemy import create_engine, event 

281 from sqlalchemy.ext.asyncio import create_async_engine 

282 

283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 

284 

285 

286 @event.listens_for(engine.sync_engine, "connect") 

287 def do_connect(dbapi_connection, connection_record): 

288 # disable aiosqlite's emitting of the BEGIN statement entirely. 

289 dbapi_connection.isolation_level = None 

290 

291 

292 @event.listens_for(engine.sync_engine, "begin") 

293 def do_begin(conn): 

294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly 

295 conn.exec_driver_sql("BEGIN") 

296 

297.. _sqlite_isolation_level: 

298 

299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite 

300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

301 

302SQLAlchemy has a comprehensive database isolation feature with optional 

303autocommit support that is introduced in the section :ref:`dbapi_autocommit`. 

304 

305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes 

306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible 

307with the non-legacy isolation mode hooks documented in the previous 

308section at :ref:`sqlite_enabling_transactions`. 

309 

310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit, 

311create an engine setting the :paramref:`_sa.create_engine.isolation_level` 

312parameter to "AUTOCOMMIT":: 

313 

314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT") 

315 

316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit`` 

317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL`` 

318as well as hooks that emit ``BEGIN`` should be disabled. 

319 

320Additional Reading for SQLite / sqlite3 transaction control 

321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

322 

323Links with important information on SQLite, the sqlite3 driver, 

324as well as long historical conversations on how things got to their current state: 

325 

326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website 

327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well 

328 as the legacy isolation_level attribute. 

329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github 

330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github 

331 

332 

333INSERT/UPDATE/DELETE...RETURNING 

334--------------------------------- 

335 

336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING`` 

337syntax. ``INSERT..RETURNING`` may be used 

338automatically in some cases in order to fetch newly generated identifiers in 

339place of the traditional approach of using ``cursor.lastrowid``, however 

340``cursor.lastrowid`` is currently still preferred for simple single-statement 

341cases for its better performance. 

342 

343To specify an explicit ``RETURNING`` clause, use the 

344:meth:`._UpdateBase.returning` method on a per-statement basis:: 

345 

346 # INSERT..RETURNING 

347 result = connection.execute( 

348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2) 

349 ) 

350 print(result.all()) 

351 

352 # UPDATE..RETURNING 

353 result = connection.execute( 

354 table.update() 

355 .where(table.c.name == "foo") 

356 .values(name="bar") 

357 .returning(table.c.col1, table.c.col2) 

358 ) 

359 print(result.all()) 

360 

361 # DELETE..RETURNING 

362 result = connection.execute( 

363 table.delete() 

364 .where(table.c.name == "foo") 

365 .returning(table.c.col1, table.c.col2) 

366 ) 

367 print(result.all()) 

368 

369.. versionadded:: 2.0 Added support for SQLite RETURNING 

370 

371 

372.. _sqlite_foreign_keys: 

373 

374Foreign Key Support 

375------------------- 

376 

377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, 

378however by default these constraints have no effect on the operation of the 

379table. 

380 

381Constraint checking on SQLite has three prerequisites: 

382 

383* At least version 3.6.19 of SQLite must be in use 

384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY 

385 or SQLITE_OMIT_TRIGGER symbols enabled. 

386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all 

387 connections before use -- including the initial call to 

388 :meth:`sqlalchemy.schema.MetaData.create_all`. 

389 

390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for 

391new connections through the usage of events:: 

392 

393 from sqlalchemy.engine import Engine 

394 from sqlalchemy import event 

395 

396 

397 @event.listens_for(Engine, "connect") 

398 def set_sqlite_pragma(dbapi_connection, connection_record): 

399 cursor = dbapi_connection.cursor() 

400 cursor.execute("PRAGMA foreign_keys=ON") 

401 cursor.close() 

402 

403.. warning:: 

404 

405 When SQLite foreign keys are enabled, it is **not possible** 

406 to emit CREATE or DROP statements for tables that contain 

407 mutually-dependent foreign key constraints; 

408 to emit the DDL for these tables requires that ALTER TABLE be used to 

409 create or drop these constraints separately, for which SQLite has 

410 no support. 

411 

412.. seealso:: 

413 

414 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ 

415 - on the SQLite web site. 

416 

417 :ref:`event_toplevel` - SQLAlchemy event API. 

418 

419 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling 

420 mutually-dependent foreign key constraints. 

421 

422.. _sqlite_on_conflict_ddl: 

423 

424ON CONFLICT support for constraints 

425----------------------------------- 

426 

427.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for 

428 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as 

429 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. 

430 

431SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied 

432to primary key, unique, check, and not null constraints. In DDL, it is 

433rendered either within the "CONSTRAINT" clause or within the column definition 

434itself depending on the location of the target constraint. To render this 

435clause within DDL, the extension parameter ``sqlite_on_conflict`` can be 

436specified with a string conflict resolution algorithm within the 

437:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, 

438:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object, 

439there 

440are individual parameters ``sqlite_on_conflict_not_null``, 

441``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each 

442correspond to the three types of relevant constraint types that can be 

443indicated from a :class:`_schema.Column` object. 

444 

445.. seealso:: 

446 

447 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite 

448 documentation 

449 

450The ``sqlite_on_conflict`` parameters accept a string argument which is just 

451the resolution name to be chosen, which on SQLite can be one of ROLLBACK, 

452ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint 

453that specifies the IGNORE algorithm:: 

454 

455 some_table = Table( 

456 "some_table", 

457 metadata, 

458 Column("id", Integer, primary_key=True), 

459 Column("data", Integer), 

460 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"), 

461 ) 

462 

463The above renders CREATE TABLE DDL as: 

464 

465.. sourcecode:: sql 

466 

467 CREATE TABLE some_table ( 

468 id INTEGER NOT NULL, 

469 data INTEGER, 

470 PRIMARY KEY (id), 

471 UNIQUE (id, data) ON CONFLICT IGNORE 

472 ) 

473 

474 

475When using the :paramref:`_schema.Column.unique` 

476flag to add a UNIQUE constraint 

477to a single column, the ``sqlite_on_conflict_unique`` parameter can 

478be added to the :class:`_schema.Column` as well, which will be added to the 

479UNIQUE constraint in the DDL:: 

480 

481 some_table = Table( 

482 "some_table", 

483 metadata, 

484 Column("id", Integer, primary_key=True), 

485 Column( 

486 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE" 

487 ), 

488 ) 

489 

490rendering: 

491 

492.. sourcecode:: sql 

493 

494 CREATE TABLE some_table ( 

495 id INTEGER NOT NULL, 

496 data INTEGER, 

497 PRIMARY KEY (id), 

498 UNIQUE (data) ON CONFLICT IGNORE 

499 ) 

500 

501To apply the FAIL algorithm for a NOT NULL constraint, 

502``sqlite_on_conflict_not_null`` is used:: 

503 

504 some_table = Table( 

505 "some_table", 

506 metadata, 

507 Column("id", Integer, primary_key=True), 

508 Column( 

509 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL" 

510 ), 

511 ) 

512 

513this renders the column inline ON CONFLICT phrase: 

514 

515.. sourcecode:: sql 

516 

517 CREATE TABLE some_table ( 

518 id INTEGER NOT NULL, 

519 data INTEGER NOT NULL ON CONFLICT FAIL, 

520 PRIMARY KEY (id) 

521 ) 

522 

523 

524Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: 

525 

526 some_table = Table( 

527 "some_table", 

528 metadata, 

529 Column( 

530 "id", 

531 Integer, 

532 primary_key=True, 

533 sqlite_on_conflict_primary_key="FAIL", 

534 ), 

535 ) 

536 

537SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict 

538resolution algorithm is applied to the constraint itself: 

539 

540.. sourcecode:: sql 

541 

542 CREATE TABLE some_table ( 

543 id INTEGER NOT NULL, 

544 PRIMARY KEY (id) ON CONFLICT FAIL 

545 ) 

546 

547.. _sqlite_on_conflict_insert: 

548 

549INSERT...ON CONFLICT (Upsert) 

550----------------------------- 

551 

552.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for 

553 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as 

554 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. 

555 

556From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) 

557of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` 

558statement. A candidate row will only be inserted if that row does not violate 

559any unique or primary key constraints. In the case of a unique constraint violation, a 

560secondary action can occur which can be either "DO UPDATE", indicating that 

561the data in the target row should be updated, or "DO NOTHING", which indicates 

562to silently skip this row. 

563 

564Conflicts are determined using columns that are part of existing unique 

565constraints and indexes. These constraints are identified by stating the 

566columns and conditions that comprise the indexes. 

567 

568SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific 

569:func:`_sqlite.insert()` function, which provides 

570the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` 

571and :meth:`_sqlite.Insert.on_conflict_do_nothing`: 

572 

573.. sourcecode:: pycon+sql 

574 

575 >>> from sqlalchemy.dialects.sqlite import insert 

576 

577 >>> insert_stmt = insert(my_table).values( 

578 ... id="some_existing_id", data="inserted value" 

579 ... ) 

580 

581 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

582 ... index_elements=["id"], set_=dict(data="updated value") 

583 ... ) 

584 

585 >>> print(do_update_stmt) 

586 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

587 ON CONFLICT (id) DO UPDATE SET data = ?{stop} 

588 

589 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

590 

591 >>> print(do_nothing_stmt) 

592 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

593 ON CONFLICT (id) DO NOTHING 

594 

595.. versionadded:: 1.4 

596 

597.. seealso:: 

598 

599 `Upsert 

600 <https://sqlite.org/lang_UPSERT.html>`_ 

601 - in the SQLite documentation. 

602 

603 

604Specifying the Target 

605^^^^^^^^^^^^^^^^^^^^^ 

606 

607Both methods supply the "target" of the conflict using column inference: 

608 

609* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument 

610 specifies a sequence containing string column names, :class:`_schema.Column` 

611 objects, and/or SQL expression elements, which would identify a unique index 

612 or unique constraint. 

613 

614* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` 

615 to infer an index, a partial index can be inferred by also specifying the 

616 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: 

617 

618 .. sourcecode:: pycon+sql 

619 

620 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

621 

622 >>> do_update_stmt = stmt.on_conflict_do_update( 

623 ... index_elements=[my_table.c.user_email], 

624 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

625 ... set_=dict(data=stmt.excluded.data), 

626 ... ) 

627 

628 >>> print(do_update_stmt) 

629 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) 

630 ON CONFLICT (user_email) 

631 WHERE user_email LIKE '%@gmail.com' 

632 DO UPDATE SET data = excluded.data 

633 

634The SET Clause 

635^^^^^^^^^^^^^^^ 

636 

637``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

638existing row, using any combination of new values as well as values 

639from the proposed insertion. These values are specified using the 

640:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This 

641parameter accepts a dictionary which consists of direct values 

642for UPDATE: 

643 

644.. sourcecode:: pycon+sql 

645 

646 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

647 

648 >>> do_update_stmt = stmt.on_conflict_do_update( 

649 ... index_elements=["id"], set_=dict(data="updated value") 

650 ... ) 

651 

652 >>> print(do_update_stmt) 

653 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

654 ON CONFLICT (id) DO UPDATE SET data = ? 

655 

656.. warning:: 

657 

658 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take 

659 into account Python-side default UPDATE values or generation functions, 

660 e.g. those specified using :paramref:`_schema.Column.onupdate`. These 

661 values will not be exercised for an ON CONFLICT style of UPDATE, unless 

662 they are manually specified in the 

663 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. 

664 

665Updating using the Excluded INSERT Values 

666^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

667 

668In order to refer to the proposed insertion row, the special alias 

669:attr:`~.sqlite.Insert.excluded` is available as an attribute on 

670the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix 

671on a column, that informs the DO UPDATE to update the row with the value that 

672would have been inserted had the constraint not failed: 

673 

674.. sourcecode:: pycon+sql 

675 

676 >>> stmt = insert(my_table).values( 

677 ... id="some_id", data="inserted value", author="jlh" 

678 ... ) 

679 

680 >>> do_update_stmt = stmt.on_conflict_do_update( 

681 ... index_elements=["id"], 

682 ... set_=dict(data="updated value", author=stmt.excluded.author), 

683 ... ) 

684 

685 >>> print(do_update_stmt) 

686 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

687 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

688 

689Additional WHERE Criteria 

690^^^^^^^^^^^^^^^^^^^^^^^^^ 

691 

692The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts 

693a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` 

694parameter, which will limit those rows which receive an UPDATE: 

695 

696.. sourcecode:: pycon+sql 

697 

698 >>> stmt = insert(my_table).values( 

699 ... id="some_id", data="inserted value", author="jlh" 

700 ... ) 

701 

702 >>> on_update_stmt = stmt.on_conflict_do_update( 

703 ... index_elements=["id"], 

704 ... set_=dict(data="updated value", author=stmt.excluded.author), 

705 ... where=(my_table.c.status == 2), 

706 ... ) 

707 >>> print(on_update_stmt) 

708 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

709 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

710 WHERE my_table.status = ? 

711 

712 

713Skipping Rows with DO NOTHING 

714^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

715 

716``ON CONFLICT`` may be used to skip inserting a row entirely 

717if any conflict with a unique constraint occurs; below this is illustrated 

718using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: 

719 

720.. sourcecode:: pycon+sql 

721 

722 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

723 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

724 >>> print(stmt) 

725 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING 

726 

727 

728If ``DO NOTHING`` is used without specifying any columns or constraint, 

729it has the effect of skipping the INSERT for any unique violation which 

730occurs: 

731 

732.. sourcecode:: pycon+sql 

733 

734 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

735 >>> stmt = stmt.on_conflict_do_nothing() 

736 >>> print(stmt) 

737 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING 

738 

739.. _sqlite_type_reflection: 

740 

741Type Reflection 

742--------------- 

743 

744SQLite types are unlike those of most other database backends, in that 

745the string name of the type usually does not correspond to a "type" in a 

746one-to-one fashion. Instead, SQLite links per-column typing behavior 

747to one of five so-called "type affinities" based on a string matching 

748pattern for the type. 

749 

750SQLAlchemy's reflection process, when inspecting types, uses a simple 

751lookup table to link the keywords returned to provided SQLAlchemy types. 

752This lookup table is present within the SQLite dialect as it is for all 

753other dialects. However, the SQLite dialect has a different "fallback" 

754routine for when a particular type name is not located in the lookup map; 

755it instead implements the SQLite "type affinity" scheme located at 

756https://www.sqlite.org/datatype3.html section 2.1. 

757 

758The provided typemap will make direct associations from an exact string 

759name match for the following types: 

760 

761:class:`_types.BIGINT`, :class:`_types.BLOB`, 

762:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, 

763:class:`_types.CHAR`, :class:`_types.DATE`, 

764:class:`_types.DATETIME`, :class:`_types.FLOAT`, 

765:class:`_types.DECIMAL`, :class:`_types.FLOAT`, 

766:class:`_types.INTEGER`, :class:`_types.INTEGER`, 

767:class:`_types.NUMERIC`, :class:`_types.REAL`, 

768:class:`_types.SMALLINT`, :class:`_types.TEXT`, 

769:class:`_types.TIME`, :class:`_types.TIMESTAMP`, 

770:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, 

771:class:`_types.NCHAR` 

772 

773When a type name does not match one of the above types, the "type affinity" 

774lookup is used instead: 

775 

776* :class:`_types.INTEGER` is returned if the type name includes the 

777 string ``INT`` 

778* :class:`_types.TEXT` is returned if the type name includes the 

779 string ``CHAR``, ``CLOB`` or ``TEXT`` 

780* :class:`_types.NullType` is returned if the type name includes the 

781 string ``BLOB`` 

782* :class:`_types.REAL` is returned if the type name includes the string 

783 ``REAL``, ``FLOA`` or ``DOUB``. 

784* Otherwise, the :class:`_types.NUMERIC` type is used. 

785 

786.. _sqlite_partial_index: 

787 

788Partial Indexes 

789--------------- 

790 

791A partial index, e.g. one which uses a WHERE clause, can be specified 

792with the DDL system using the argument ``sqlite_where``:: 

793 

794 tbl = Table("testtbl", m, Column("data", Integer)) 

795 idx = Index( 

796 "test_idx1", 

797 tbl.c.data, 

798 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10), 

799 ) 

800 

801The index will be rendered at create time as: 

802 

803.. sourcecode:: sql 

804 

805 CREATE INDEX test_idx1 ON testtbl (data) 

806 WHERE data > 5 AND data < 10 

807 

808.. _sqlite_dotted_column_names: 

809 

810Dotted Column Names 

811------------------- 

812 

813Using table or column names that explicitly have periods in them is 

814**not recommended**. While this is generally a bad idea for relational 

815databases in general, as the dot is a syntactically significant character, 

816the SQLite driver up until version **3.10.0** of SQLite has a bug which 

817requires that SQLAlchemy filter out these dots in result sets. 

818 

819The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: 

820 

821 import sqlite3 

822 

823 assert sqlite3.sqlite_version_info < ( 

824 3, 

825 10, 

826 0, 

827 ), "bug is fixed in this version" 

828 

829 conn = sqlite3.connect(":memory:") 

830 cursor = conn.cursor() 

831 

832 cursor.execute("create table x (a integer, b integer)") 

833 cursor.execute("insert into x (a, b) values (1, 1)") 

834 cursor.execute("insert into x (a, b) values (2, 2)") 

835 

836 cursor.execute("select x.a, x.b from x") 

837 assert [c[0] for c in cursor.description] == ["a", "b"] 

838 

839 cursor.execute( 

840 """ 

841 select x.a, x.b from x where a=1 

842 union 

843 select x.a, x.b from x where a=2 

844 """ 

845 ) 

846 assert [c[0] for c in cursor.description] == ["a", "b"], [ 

847 c[0] for c in cursor.description 

848 ] 

849 

850The second assertion fails: 

851 

852.. sourcecode:: text 

853 

854 Traceback (most recent call last): 

855 File "test.py", line 19, in <module> 

856 [c[0] for c in cursor.description] 

857 AssertionError: ['x.a', 'x.b'] 

858 

859Where above, the driver incorrectly reports the names of the columns 

860including the name of the table, which is entirely inconsistent vs. 

861when the UNION is not present. 

862 

863SQLAlchemy relies upon column names being predictable in how they match 

864to the original statement, so the SQLAlchemy dialect has no choice but 

865to filter these out:: 

866 

867 

868 from sqlalchemy import create_engine 

869 

870 eng = create_engine("sqlite://") 

871 conn = eng.connect() 

872 

873 conn.exec_driver_sql("create table x (a integer, b integer)") 

874 conn.exec_driver_sql("insert into x (a, b) values (1, 1)") 

875 conn.exec_driver_sql("insert into x (a, b) values (2, 2)") 

876 

877 result = conn.exec_driver_sql("select x.a, x.b from x") 

878 assert result.keys() == ["a", "b"] 

879 

880 result = conn.exec_driver_sql( 

881 """ 

882 select x.a, x.b from x where a=1 

883 union 

884 select x.a, x.b from x where a=2 

885 """ 

886 ) 

887 assert result.keys() == ["a", "b"] 

888 

889Note that above, even though SQLAlchemy filters out the dots, *both 

890names are still addressable*:: 

891 

892 >>> row = result.first() 

893 >>> row["a"] 

894 1 

895 >>> row["x.a"] 

896 1 

897 >>> row["b"] 

898 1 

899 >>> row["x.b"] 

900 1 

901 

902Therefore, the workaround applied by SQLAlchemy only impacts 

903:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In 

904the very specific case where an application is forced to use column names that 

905contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and 

906:meth:`.Row.keys()` is required to return these dotted names unmodified, 

907the ``sqlite_raw_colnames`` execution option may be provided, either on a 

908per-:class:`_engine.Connection` basis:: 

909 

910 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql( 

911 """ 

912 select x.a, x.b from x where a=1 

913 union 

914 select x.a, x.b from x where a=2 

915 """ 

916 ) 

917 assert result.keys() == ["x.a", "x.b"] 

918 

919or on a per-:class:`_engine.Engine` basis:: 

920 

921 engine = create_engine( 

922 "sqlite://", execution_options={"sqlite_raw_colnames": True} 

923 ) 

924 

925When using the per-:class:`_engine.Engine` execution option, note that 

926**Core and ORM queries that use UNION may not function properly**. 

927 

928SQLite-specific table options 

929----------------------------- 

930 

931One option for CREATE TABLE is supported directly by the SQLite 

932dialect in conjunction with the :class:`_schema.Table` construct: 

933 

934* ``WITHOUT ROWID``:: 

935 

936 Table("some_table", metadata, ..., sqlite_with_rowid=False) 

937 

938* 

939 ``STRICT``:: 

940 

941 Table("some_table", metadata, ..., sqlite_strict=True) 

942 

943 .. versionadded:: 2.0.37 

944 

945.. seealso:: 

946 

947 `SQLite CREATE TABLE options 

948 <https://www.sqlite.org/lang_createtable.html>`_ 

949 

950.. _sqlite_include_internal: 

951 

952Reflecting internal schema tables 

953---------------------------------- 

954 

955Reflection methods that return lists of tables will omit so-called 

956"SQLite internal schema object" names, which are considered by SQLite 

957as any object name that is prefixed with ``sqlite_``. An example of 

958such an object is the ``sqlite_sequence`` table that's generated when 

959the ``AUTOINCREMENT`` column parameter is used. In order to return 

960these objects, the parameter ``sqlite_include_internal=True`` may be 

961passed to methods such as :meth:`_schema.MetaData.reflect` or 

962:meth:`.Inspector.get_table_names`. 

963 

964.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter. 

965 Previously, these tables were not ignored by SQLAlchemy reflection 

966 methods. 

967 

968.. note:: 

969 

970 The ``sqlite_include_internal`` parameter does not refer to the 

971 "system" tables that are present in schemas such as ``sqlite_master``. 

972 

973.. seealso:: 

974 

975 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite 

976 documentation. 

977 

978''' # noqa 

979from __future__ import annotations 

980 

981import datetime 

982import numbers 

983import re 

984from typing import Optional 

985 

986from .json import JSON 

987from .json import JSONIndexType 

988from .json import JSONPathType 

989from ... import exc 

990from ... import schema as sa_schema 

991from ... import sql 

992from ... import text 

993from ... import types as sqltypes 

994from ... import util 

995from ...engine import default 

996from ...engine import processors 

997from ...engine import reflection 

998from ...engine.reflection import ReflectionDefaults 

999from ...sql import coercions 

1000from ...sql import compiler 

1001from ...sql import elements 

1002from ...sql import roles 

1003from ...sql import schema 

1004from ...types import BLOB # noqa 

1005from ...types import BOOLEAN # noqa 

1006from ...types import CHAR # noqa 

1007from ...types import DECIMAL # noqa 

1008from ...types import FLOAT # noqa 

1009from ...types import INTEGER # noqa 

1010from ...types import NUMERIC # noqa 

1011from ...types import REAL # noqa 

1012from ...types import SMALLINT # noqa 

1013from ...types import TEXT # noqa 

1014from ...types import TIMESTAMP # noqa 

1015from ...types import VARCHAR # noqa 

1016 

1017 

1018class _SQliteJson(JSON): 

1019 def result_processor(self, dialect, coltype): 

1020 default_processor = super().result_processor(dialect, coltype) 

1021 

1022 def process(value): 

1023 try: 

1024 return default_processor(value) 

1025 except TypeError: 

1026 if isinstance(value, numbers.Number): 

1027 return value 

1028 else: 

1029 raise 

1030 

1031 return process 

1032 

1033 

1034class _DateTimeMixin: 

1035 _reg = None 

1036 _storage_format = None 

1037 

1038 def __init__(self, storage_format=None, regexp=None, **kw): 

1039 super().__init__(**kw) 

1040 if regexp is not None: 

1041 self._reg = re.compile(regexp) 

1042 if storage_format is not None: 

1043 self._storage_format = storage_format 

1044 

1045 @property 

1046 def format_is_text_affinity(self): 

1047 """return True if the storage format will automatically imply 

1048 a TEXT affinity. 

1049 

1050 If the storage format contains no non-numeric characters, 

1051 it will imply a NUMERIC storage format on SQLite; in this case, 

1052 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, 

1053 TIME_CHAR. 

1054 

1055 """ 

1056 spec = self._storage_format % { 

1057 "year": 0, 

1058 "month": 0, 

1059 "day": 0, 

1060 "hour": 0, 

1061 "minute": 0, 

1062 "second": 0, 

1063 "microsecond": 0, 

1064 } 

1065 return bool(re.search(r"[^0-9]", spec)) 

1066 

1067 def adapt(self, cls, **kw): 

1068 if issubclass(cls, _DateTimeMixin): 

1069 if self._storage_format: 

1070 kw["storage_format"] = self._storage_format 

1071 if self._reg: 

1072 kw["regexp"] = self._reg 

1073 return super().adapt(cls, **kw) 

1074 

1075 def literal_processor(self, dialect): 

1076 bp = self.bind_processor(dialect) 

1077 

1078 def process(value): 

1079 return "'%s'" % bp(value) 

1080 

1081 return process 

1082 

1083 

1084class DATETIME(_DateTimeMixin, sqltypes.DateTime): 

1085 r"""Represent a Python datetime object in SQLite using a string. 

1086 

1087 The default string storage format is:: 

1088 

1089 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1090 

1091 e.g.: 

1092 

1093 .. sourcecode:: text 

1094 

1095 2021-03-15 12:05:57.105542 

1096 

1097 The incoming storage format is by default parsed using the 

1098 Python ``datetime.fromisoformat()`` function. 

1099 

1100 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default 

1101 datetime string parsing. 

1102 

1103 The storage format can be customized to some degree using the 

1104 ``storage_format`` and ``regexp`` parameters, such as:: 

1105 

1106 import re 

1107 from sqlalchemy.dialects.sqlite import DATETIME 

1108 

1109 dt = DATETIME( 

1110 storage_format=( 

1111 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d" 

1112 ), 

1113 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)", 

1114 ) 

1115 

1116 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1117 from the datetime. Can't be specified together with ``storage_format`` 

1118 or ``regexp``. 

1119 

1120 :param storage_format: format string which will be applied to the dict 

1121 with keys year, month, day, hour, minute, second, and microsecond. 

1122 

1123 :param regexp: regular expression which will be applied to incoming result 

1124 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1125 strings. If the regexp contains named groups, the resulting match dict is 

1126 applied to the Python datetime() constructor as keyword arguments. 

1127 Otherwise, if positional groups are used, the datetime() constructor 

1128 is called with positional arguments via 

1129 ``*map(int, match_obj.groups(0))``. 

1130 

1131 """ # noqa 

1132 

1133 _storage_format = ( 

1134 "%(year)04d-%(month)02d-%(day)02d " 

1135 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1136 ) 

1137 

1138 def __init__(self, *args, **kwargs): 

1139 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1140 super().__init__(*args, **kwargs) 

1141 if truncate_microseconds: 

1142 assert "storage_format" not in kwargs, ( 

1143 "You can specify only " 

1144 "one of truncate_microseconds or storage_format." 

1145 ) 

1146 assert "regexp" not in kwargs, ( 

1147 "You can specify only one of " 

1148 "truncate_microseconds or regexp." 

1149 ) 

1150 self._storage_format = ( 

1151 "%(year)04d-%(month)02d-%(day)02d " 

1152 "%(hour)02d:%(minute)02d:%(second)02d" 

1153 ) 

1154 

1155 def bind_processor(self, dialect): 

1156 datetime_datetime = datetime.datetime 

1157 datetime_date = datetime.date 

1158 format_ = self._storage_format 

1159 

1160 def process(value): 

1161 if value is None: 

1162 return None 

1163 elif isinstance(value, datetime_datetime): 

1164 return format_ % { 

1165 "year": value.year, 

1166 "month": value.month, 

1167 "day": value.day, 

1168 "hour": value.hour, 

1169 "minute": value.minute, 

1170 "second": value.second, 

1171 "microsecond": value.microsecond, 

1172 } 

1173 elif isinstance(value, datetime_date): 

1174 return format_ % { 

1175 "year": value.year, 

1176 "month": value.month, 

1177 "day": value.day, 

1178 "hour": 0, 

1179 "minute": 0, 

1180 "second": 0, 

1181 "microsecond": 0, 

1182 } 

1183 else: 

1184 raise TypeError( 

1185 "SQLite DateTime type only accepts Python " 

1186 "datetime and date objects as input." 

1187 ) 

1188 

1189 return process 

1190 

1191 def result_processor(self, dialect, coltype): 

1192 if self._reg: 

1193 return processors.str_to_datetime_processor_factory( 

1194 self._reg, datetime.datetime 

1195 ) 

1196 else: 

1197 return processors.str_to_datetime 

1198 

1199 

1200class DATE(_DateTimeMixin, sqltypes.Date): 

1201 r"""Represent a Python date object in SQLite using a string. 

1202 

1203 The default string storage format is:: 

1204 

1205 "%(year)04d-%(month)02d-%(day)02d" 

1206 

1207 e.g.: 

1208 

1209 .. sourcecode:: text 

1210 

1211 2011-03-15 

1212 

1213 The incoming storage format is by default parsed using the 

1214 Python ``date.fromisoformat()`` function. 

1215 

1216 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default 

1217 date string parsing. 

1218 

1219 

1220 The storage format can be customized to some degree using the 

1221 ``storage_format`` and ``regexp`` parameters, such as:: 

1222 

1223 import re 

1224 from sqlalchemy.dialects.sqlite import DATE 

1225 

1226 d = DATE( 

1227 storage_format="%(month)02d/%(day)02d/%(year)04d", 

1228 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"), 

1229 ) 

1230 

1231 :param storage_format: format string which will be applied to the 

1232 dict with keys year, month, and day. 

1233 

1234 :param regexp: regular expression which will be applied to 

1235 incoming result rows, replacing the use of ``date.fromisoformat()`` to 

1236 parse incoming strings. If the regexp contains named groups, the resulting 

1237 match dict is applied to the Python date() constructor as keyword 

1238 arguments. Otherwise, if positional groups are used, the date() 

1239 constructor is called with positional arguments via 

1240 ``*map(int, match_obj.groups(0))``. 

1241 

1242 """ 

1243 

1244 _storage_format = "%(year)04d-%(month)02d-%(day)02d" 

1245 

1246 def bind_processor(self, dialect): 

1247 datetime_date = datetime.date 

1248 format_ = self._storage_format 

1249 

1250 def process(value): 

1251 if value is None: 

1252 return None 

1253 elif isinstance(value, datetime_date): 

1254 return format_ % { 

1255 "year": value.year, 

1256 "month": value.month, 

1257 "day": value.day, 

1258 } 

1259 else: 

1260 raise TypeError( 

1261 "SQLite Date type only accepts Python " 

1262 "date objects as input." 

1263 ) 

1264 

1265 return process 

1266 

1267 def result_processor(self, dialect, coltype): 

1268 if self._reg: 

1269 return processors.str_to_datetime_processor_factory( 

1270 self._reg, datetime.date 

1271 ) 

1272 else: 

1273 return processors.str_to_date 

1274 

1275 

1276class TIME(_DateTimeMixin, sqltypes.Time): 

1277 r"""Represent a Python time object in SQLite using a string. 

1278 

1279 The default string storage format is:: 

1280 

1281 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1282 

1283 e.g.: 

1284 

1285 .. sourcecode:: text 

1286 

1287 12:05:57.10558 

1288 

1289 The incoming storage format is by default parsed using the 

1290 Python ``time.fromisoformat()`` function. 

1291 

1292 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default 

1293 time string parsing. 

1294 

1295 The storage format can be customized to some degree using the 

1296 ``storage_format`` and ``regexp`` parameters, such as:: 

1297 

1298 import re 

1299 from sqlalchemy.dialects.sqlite import TIME 

1300 

1301 t = TIME( 

1302 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d", 

1303 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"), 

1304 ) 

1305 

1306 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1307 from the time. Can't be specified together with ``storage_format`` 

1308 or ``regexp``. 

1309 

1310 :param storage_format: format string which will be applied to the dict 

1311 with keys hour, minute, second, and microsecond. 

1312 

1313 :param regexp: regular expression which will be applied to incoming result 

1314 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1315 strings. If the regexp contains named groups, the resulting match dict is 

1316 applied to the Python time() constructor as keyword arguments. Otherwise, 

1317 if positional groups are used, the time() constructor is called with 

1318 positional arguments via ``*map(int, match_obj.groups(0))``. 

1319 

1320 """ 

1321 

1322 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1323 

1324 def __init__(self, *args, **kwargs): 

1325 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1326 super().__init__(*args, **kwargs) 

1327 if truncate_microseconds: 

1328 assert "storage_format" not in kwargs, ( 

1329 "You can specify only " 

1330 "one of truncate_microseconds or storage_format." 

1331 ) 

1332 assert "regexp" not in kwargs, ( 

1333 "You can specify only one of " 

1334 "truncate_microseconds or regexp." 

1335 ) 

1336 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" 

1337 

1338 def bind_processor(self, dialect): 

1339 datetime_time = datetime.time 

1340 format_ = self._storage_format 

1341 

1342 def process(value): 

1343 if value is None: 

1344 return None 

1345 elif isinstance(value, datetime_time): 

1346 return format_ % { 

1347 "hour": value.hour, 

1348 "minute": value.minute, 

1349 "second": value.second, 

1350 "microsecond": value.microsecond, 

1351 } 

1352 else: 

1353 raise TypeError( 

1354 "SQLite Time type only accepts Python " 

1355 "time objects as input." 

1356 ) 

1357 

1358 return process 

1359 

1360 def result_processor(self, dialect, coltype): 

1361 if self._reg: 

1362 return processors.str_to_datetime_processor_factory( 

1363 self._reg, datetime.time 

1364 ) 

1365 else: 

1366 return processors.str_to_time 

1367 

1368 

1369colspecs = { 

1370 sqltypes.Date: DATE, 

1371 sqltypes.DateTime: DATETIME, 

1372 sqltypes.JSON: _SQliteJson, 

1373 sqltypes.JSON.JSONIndexType: JSONIndexType, 

1374 sqltypes.JSON.JSONPathType: JSONPathType, 

1375 sqltypes.Time: TIME, 

1376} 

1377 

1378ischema_names = { 

1379 "BIGINT": sqltypes.BIGINT, 

1380 "BLOB": sqltypes.BLOB, 

1381 "BOOL": sqltypes.BOOLEAN, 

1382 "BOOLEAN": sqltypes.BOOLEAN, 

1383 "CHAR": sqltypes.CHAR, 

1384 "DATE": sqltypes.DATE, 

1385 "DATE_CHAR": sqltypes.DATE, 

1386 "DATETIME": sqltypes.DATETIME, 

1387 "DATETIME_CHAR": sqltypes.DATETIME, 

1388 "DOUBLE": sqltypes.DOUBLE, 

1389 "DECIMAL": sqltypes.DECIMAL, 

1390 "FLOAT": sqltypes.FLOAT, 

1391 "INT": sqltypes.INTEGER, 

1392 "INTEGER": sqltypes.INTEGER, 

1393 "JSON": JSON, 

1394 "NUMERIC": sqltypes.NUMERIC, 

1395 "REAL": sqltypes.REAL, 

1396 "SMALLINT": sqltypes.SMALLINT, 

1397 "TEXT": sqltypes.TEXT, 

1398 "TIME": sqltypes.TIME, 

1399 "TIME_CHAR": sqltypes.TIME, 

1400 "TIMESTAMP": sqltypes.TIMESTAMP, 

1401 "VARCHAR": sqltypes.VARCHAR, 

1402 "NVARCHAR": sqltypes.NVARCHAR, 

1403 "NCHAR": sqltypes.NCHAR, 

1404} 

1405 

1406 

1407class SQLiteCompiler(compiler.SQLCompiler): 

1408 extract_map = util.update_copy( 

1409 compiler.SQLCompiler.extract_map, 

1410 { 

1411 "month": "%m", 

1412 "day": "%d", 

1413 "year": "%Y", 

1414 "second": "%S", 

1415 "hour": "%H", 

1416 "doy": "%j", 

1417 "minute": "%M", 

1418 "epoch": "%s", 

1419 "dow": "%w", 

1420 "week": "%W", 

1421 }, 

1422 ) 

1423 

1424 def visit_truediv_binary(self, binary, operator, **kw): 

1425 return ( 

1426 self.process(binary.left, **kw) 

1427 + " / " 

1428 + "(%s + 0.0)" % self.process(binary.right, **kw) 

1429 ) 

1430 

1431 def visit_now_func(self, fn, **kw): 

1432 return "CURRENT_TIMESTAMP" 

1433 

1434 def visit_localtimestamp_func(self, func, **kw): 

1435 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')" 

1436 

1437 def visit_true(self, expr, **kw): 

1438 return "1" 

1439 

1440 def visit_false(self, expr, **kw): 

1441 return "0" 

1442 

1443 def visit_char_length_func(self, fn, **kw): 

1444 return "length%s" % self.function_argspec(fn) 

1445 

1446 def visit_aggregate_strings_func(self, fn, **kw): 

1447 return "group_concat%s" % self.function_argspec(fn) 

1448 

1449 def visit_cast(self, cast, **kwargs): 

1450 if self.dialect.supports_cast: 

1451 return super().visit_cast(cast, **kwargs) 

1452 else: 

1453 return self.process(cast.clause, **kwargs) 

1454 

1455 def visit_extract(self, extract, **kw): 

1456 try: 

1457 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( 

1458 self.extract_map[extract.field], 

1459 self.process(extract.expr, **kw), 

1460 ) 

1461 except KeyError as err: 

1462 raise exc.CompileError( 

1463 "%s is not a valid extract argument." % extract.field 

1464 ) from err 

1465 

1466 def returning_clause( 

1467 self, 

1468 stmt, 

1469 returning_cols, 

1470 *, 

1471 populate_result_map, 

1472 **kw, 

1473 ): 

1474 kw["include_table"] = False 

1475 return super().returning_clause( 

1476 stmt, returning_cols, populate_result_map=populate_result_map, **kw 

1477 ) 

1478 

1479 def limit_clause(self, select, **kw): 

1480 text = "" 

1481 if select._limit_clause is not None: 

1482 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

1483 if select._offset_clause is not None: 

1484 if select._limit_clause is None: 

1485 text += "\n LIMIT " + self.process(sql.literal(-1)) 

1486 text += " OFFSET " + self.process(select._offset_clause, **kw) 

1487 else: 

1488 text += " OFFSET " + self.process(sql.literal(0), **kw) 

1489 return text 

1490 

1491 def for_update_clause(self, select, **kw): 

1492 # sqlite has no "FOR UPDATE" AFAICT 

1493 return "" 

1494 

1495 def update_from_clause( 

1496 self, update_stmt, from_table, extra_froms, from_hints, **kw 

1497 ): 

1498 kw["asfrom"] = True 

1499 return "FROM " + ", ".join( 

1500 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

1501 for t in extra_froms 

1502 ) 

1503 

1504 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1505 return "%s IS NOT %s" % ( 

1506 self.process(binary.left), 

1507 self.process(binary.right), 

1508 ) 

1509 

1510 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1511 return "%s IS %s" % ( 

1512 self.process(binary.left), 

1513 self.process(binary.right), 

1514 ) 

1515 

1516 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

1517 if binary.type._type_affinity is sqltypes.JSON: 

1518 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1519 else: 

1520 expr = "JSON_EXTRACT(%s, %s)" 

1521 

1522 return expr % ( 

1523 self.process(binary.left, **kw), 

1524 self.process(binary.right, **kw), 

1525 ) 

1526 

1527 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

1528 if binary.type._type_affinity is sqltypes.JSON: 

1529 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1530 else: 

1531 expr = "JSON_EXTRACT(%s, %s)" 

1532 

1533 return expr % ( 

1534 self.process(binary.left, **kw), 

1535 self.process(binary.right, **kw), 

1536 ) 

1537 

1538 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

1539 # slightly old SQLite versions don't seem to be able to handle 

1540 # the empty set impl 

1541 return self.visit_empty_set_expr(type_) 

1542 

1543 def visit_empty_set_expr(self, element_types, **kw): 

1544 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( 

1545 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1546 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1547 ) 

1548 

1549 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1550 return self._generate_generic_binary(binary, " REGEXP ", **kw) 

1551 

1552 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1553 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) 

1554 

1555 def _on_conflict_target(self, clause, **kw): 

1556 if clause.inferred_target_elements is not None: 

1557 target_text = "(%s)" % ", ".join( 

1558 ( 

1559 self.preparer.quote(c) 

1560 if isinstance(c, str) 

1561 else self.process(c, include_table=False, use_schema=False) 

1562 ) 

1563 for c in clause.inferred_target_elements 

1564 ) 

1565 if clause.inferred_target_whereclause is not None: 

1566 target_text += " WHERE %s" % self.process( 

1567 clause.inferred_target_whereclause, 

1568 include_table=False, 

1569 use_schema=False, 

1570 literal_execute=True, 

1571 ) 

1572 

1573 else: 

1574 target_text = "" 

1575 

1576 return target_text 

1577 

1578 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

1579 target_text = self._on_conflict_target(on_conflict, **kw) 

1580 

1581 if target_text: 

1582 return "ON CONFLICT %s DO NOTHING" % target_text 

1583 else: 

1584 return "ON CONFLICT DO NOTHING" 

1585 

1586 def visit_on_conflict_do_update(self, on_conflict, **kw): 

1587 clause = on_conflict 

1588 

1589 target_text = self._on_conflict_target(on_conflict, **kw) 

1590 

1591 action_set_ops = [] 

1592 

1593 set_parameters = dict(clause.update_values_to_set) 

1594 # create a list of column assignment clauses as tuples 

1595 

1596 insert_statement = self.stack[-1]["selectable"] 

1597 cols = insert_statement.table.c 

1598 for c in cols: 

1599 col_key = c.key 

1600 

1601 if col_key in set_parameters: 

1602 value = set_parameters.pop(col_key) 

1603 elif c in set_parameters: 

1604 value = set_parameters.pop(c) 

1605 else: 

1606 continue 

1607 

1608 if ( 

1609 isinstance(value, elements.BindParameter) 

1610 and value.type._isnull 

1611 ): 

1612 value = value._with_binary_element_type(c.type) 

1613 value_text = self.process(value.self_group(), use_schema=False) 

1614 

1615 key_text = self.preparer.quote(c.name) 

1616 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1617 

1618 # check for names that don't match columns 

1619 if set_parameters: 

1620 util.warn( 

1621 "Additional column names not matching " 

1622 "any column keys in table '%s': %s" 

1623 % ( 

1624 self.current_executable.table.name, 

1625 (", ".join("'%s'" % c for c in set_parameters)), 

1626 ) 

1627 ) 

1628 for k, v in set_parameters.items(): 

1629 key_text = ( 

1630 self.preparer.quote(k) 

1631 if isinstance(k, str) 

1632 else self.process(k, use_schema=False) 

1633 ) 

1634 value_text = self.process( 

1635 coercions.expect(roles.ExpressionElementRole, v), 

1636 use_schema=False, 

1637 ) 

1638 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1639 

1640 action_text = ", ".join(action_set_ops) 

1641 if clause.update_whereclause is not None: 

1642 action_text += " WHERE %s" % self.process( 

1643 clause.update_whereclause, include_table=True, use_schema=False 

1644 ) 

1645 

1646 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

1647 

1648 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1649 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)". 

1650 kw["eager_grouping"] = True 

1651 or_ = self._generate_generic_binary(binary, " | ", **kw) 

1652 and_ = self._generate_generic_binary(binary, " & ", **kw) 

1653 return f"({or_} - {and_})" 

1654 

1655 

1656class SQLiteDDLCompiler(compiler.DDLCompiler): 

1657 def get_column_specification(self, column, **kwargs): 

1658 coltype = self.dialect.type_compiler_instance.process( 

1659 column.type, type_expression=column 

1660 ) 

1661 colspec = self.preparer.format_column(column) + " " + coltype 

1662 default = self.get_column_default_string(column) 

1663 if default is not None: 

1664 

1665 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match( 

1666 r".*\W.*", default 

1667 ): 

1668 colspec += f" DEFAULT ({default})" 

1669 else: 

1670 colspec += f" DEFAULT {default}" 

1671 

1672 if not column.nullable: 

1673 colspec += " NOT NULL" 

1674 

1675 on_conflict_clause = column.dialect_options["sqlite"][ 

1676 "on_conflict_not_null" 

1677 ] 

1678 if on_conflict_clause is not None: 

1679 colspec += " ON CONFLICT " + on_conflict_clause 

1680 

1681 if column.primary_key: 

1682 if ( 

1683 column.autoincrement is True 

1684 and len(column.table.primary_key.columns) != 1 

1685 ): 

1686 raise exc.CompileError( 

1687 "SQLite does not support autoincrement for " 

1688 "composite primary keys" 

1689 ) 

1690 

1691 if ( 

1692 column.table.dialect_options["sqlite"]["autoincrement"] 

1693 and len(column.table.primary_key.columns) == 1 

1694 and issubclass(column.type._type_affinity, sqltypes.Integer) 

1695 and not column.foreign_keys 

1696 ): 

1697 colspec += " PRIMARY KEY" 

1698 

1699 on_conflict_clause = column.dialect_options["sqlite"][ 

1700 "on_conflict_primary_key" 

1701 ] 

1702 if on_conflict_clause is not None: 

1703 colspec += " ON CONFLICT " + on_conflict_clause 

1704 

1705 colspec += " AUTOINCREMENT" 

1706 

1707 if column.computed is not None: 

1708 colspec += " " + self.process(column.computed) 

1709 

1710 return colspec 

1711 

1712 def visit_primary_key_constraint(self, constraint, **kw): 

1713 # for columns with sqlite_autoincrement=True, 

1714 # the PRIMARY KEY constraint can only be inline 

1715 # with the column itself. 

1716 if len(constraint.columns) == 1: 

1717 c = list(constraint)[0] 

1718 if ( 

1719 c.primary_key 

1720 and c.table.dialect_options["sqlite"]["autoincrement"] 

1721 and issubclass(c.type._type_affinity, sqltypes.Integer) 

1722 and not c.foreign_keys 

1723 ): 

1724 return None 

1725 

1726 text = super().visit_primary_key_constraint(constraint) 

1727 

1728 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1729 "on_conflict" 

1730 ] 

1731 if on_conflict_clause is None and len(constraint.columns) == 1: 

1732 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ 

1733 "on_conflict_primary_key" 

1734 ] 

1735 

1736 if on_conflict_clause is not None: 

1737 text += " ON CONFLICT " + on_conflict_clause 

1738 

1739 return text 

1740 

1741 def visit_unique_constraint(self, constraint, **kw): 

1742 text = super().visit_unique_constraint(constraint) 

1743 

1744 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1745 "on_conflict" 

1746 ] 

1747 if on_conflict_clause is None and len(constraint.columns) == 1: 

1748 col1 = list(constraint)[0] 

1749 if isinstance(col1, schema.SchemaItem): 

1750 on_conflict_clause = list(constraint)[0].dialect_options[ 

1751 "sqlite" 

1752 ]["on_conflict_unique"] 

1753 

1754 if on_conflict_clause is not None: 

1755 text += " ON CONFLICT " + on_conflict_clause 

1756 

1757 return text 

1758 

1759 def visit_check_constraint(self, constraint, **kw): 

1760 text = super().visit_check_constraint(constraint) 

1761 

1762 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1763 "on_conflict" 

1764 ] 

1765 

1766 if on_conflict_clause is not None: 

1767 text += " ON CONFLICT " + on_conflict_clause 

1768 

1769 return text 

1770 

1771 def visit_column_check_constraint(self, constraint, **kw): 

1772 text = super().visit_column_check_constraint(constraint) 

1773 

1774 if constraint.dialect_options["sqlite"]["on_conflict"] is not None: 

1775 raise exc.CompileError( 

1776 "SQLite does not support on conflict clause for " 

1777 "column check constraint" 

1778 ) 

1779 

1780 return text 

1781 

1782 def visit_foreign_key_constraint(self, constraint, **kw): 

1783 local_table = constraint.elements[0].parent.table 

1784 remote_table = constraint.elements[0].column.table 

1785 

1786 if local_table.schema != remote_table.schema: 

1787 return None 

1788 else: 

1789 return super().visit_foreign_key_constraint(constraint) 

1790 

1791 def define_constraint_remote_table(self, constraint, table, preparer): 

1792 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

1793 

1794 return preparer.format_table(table, use_schema=False) 

1795 

1796 def visit_create_index( 

1797 self, create, include_schema=False, include_table_schema=True, **kw 

1798 ): 

1799 index = create.element 

1800 self._verify_index_table(index) 

1801 preparer = self.preparer 

1802 text = "CREATE " 

1803 if index.unique: 

1804 text += "UNIQUE " 

1805 

1806 text += "INDEX " 

1807 

1808 if create.if_not_exists: 

1809 text += "IF NOT EXISTS " 

1810 

1811 text += "%s ON %s (%s)" % ( 

1812 self._prepared_index_name(index, include_schema=True), 

1813 preparer.format_table(index.table, use_schema=False), 

1814 ", ".join( 

1815 self.sql_compiler.process( 

1816 expr, include_table=False, literal_binds=True 

1817 ) 

1818 for expr in index.expressions 

1819 ), 

1820 ) 

1821 

1822 whereclause = index.dialect_options["sqlite"]["where"] 

1823 if whereclause is not None: 

1824 where_compiled = self.sql_compiler.process( 

1825 whereclause, include_table=False, literal_binds=True 

1826 ) 

1827 text += " WHERE " + where_compiled 

1828 

1829 return text 

1830 

1831 def post_create_table(self, table): 

1832 table_options = [] 

1833 

1834 if not table.dialect_options["sqlite"]["with_rowid"]: 

1835 table_options.append("WITHOUT ROWID") 

1836 

1837 if table.dialect_options["sqlite"]["strict"]: 

1838 table_options.append("STRICT") 

1839 

1840 if table_options: 

1841 return "\n " + ",\n ".join(table_options) 

1842 else: 

1843 return "" 

1844 

1845 

1846class SQLiteTypeCompiler(compiler.GenericTypeCompiler): 

1847 def visit_large_binary(self, type_, **kw): 

1848 return self.visit_BLOB(type_) 

1849 

1850 def visit_DATETIME(self, type_, **kw): 

1851 if ( 

1852 not isinstance(type_, _DateTimeMixin) 

1853 or type_.format_is_text_affinity 

1854 ): 

1855 return super().visit_DATETIME(type_) 

1856 else: 

1857 return "DATETIME_CHAR" 

1858 

1859 def visit_DATE(self, type_, **kw): 

1860 if ( 

1861 not isinstance(type_, _DateTimeMixin) 

1862 or type_.format_is_text_affinity 

1863 ): 

1864 return super().visit_DATE(type_) 

1865 else: 

1866 return "DATE_CHAR" 

1867 

1868 def visit_TIME(self, type_, **kw): 

1869 if ( 

1870 not isinstance(type_, _DateTimeMixin) 

1871 or type_.format_is_text_affinity 

1872 ): 

1873 return super().visit_TIME(type_) 

1874 else: 

1875 return "TIME_CHAR" 

1876 

1877 def visit_JSON(self, type_, **kw): 

1878 # note this name provides NUMERIC affinity, not TEXT. 

1879 # should not be an issue unless the JSON value consists of a single 

1880 # numeric value. JSONTEXT can be used if this case is required. 

1881 return "JSON" 

1882 

1883 

1884class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 

1885 reserved_words = { 

1886 "add", 

1887 "after", 

1888 "all", 

1889 "alter", 

1890 "analyze", 

1891 "and", 

1892 "as", 

1893 "asc", 

1894 "attach", 

1895 "autoincrement", 

1896 "before", 

1897 "begin", 

1898 "between", 

1899 "by", 

1900 "cascade", 

1901 "case", 

1902 "cast", 

1903 "check", 

1904 "collate", 

1905 "column", 

1906 "commit", 

1907 "conflict", 

1908 "constraint", 

1909 "create", 

1910 "cross", 

1911 "current_date", 

1912 "current_time", 

1913 "current_timestamp", 

1914 "database", 

1915 "default", 

1916 "deferrable", 

1917 "deferred", 

1918 "delete", 

1919 "desc", 

1920 "detach", 

1921 "distinct", 

1922 "drop", 

1923 "each", 

1924 "else", 

1925 "end", 

1926 "escape", 

1927 "except", 

1928 "exclusive", 

1929 "exists", 

1930 "explain", 

1931 "false", 

1932 "fail", 

1933 "for", 

1934 "foreign", 

1935 "from", 

1936 "full", 

1937 "glob", 

1938 "group", 

1939 "having", 

1940 "if", 

1941 "ignore", 

1942 "immediate", 

1943 "in", 

1944 "index", 

1945 "indexed", 

1946 "initially", 

1947 "inner", 

1948 "insert", 

1949 "instead", 

1950 "intersect", 

1951 "into", 

1952 "is", 

1953 "isnull", 

1954 "join", 

1955 "key", 

1956 "left", 

1957 "like", 

1958 "limit", 

1959 "match", 

1960 "natural", 

1961 "not", 

1962 "notnull", 

1963 "null", 

1964 "of", 

1965 "offset", 

1966 "on", 

1967 "or", 

1968 "order", 

1969 "outer", 

1970 "plan", 

1971 "pragma", 

1972 "primary", 

1973 "query", 

1974 "raise", 

1975 "references", 

1976 "reindex", 

1977 "rename", 

1978 "replace", 

1979 "restrict", 

1980 "right", 

1981 "rollback", 

1982 "row", 

1983 "select", 

1984 "set", 

1985 "table", 

1986 "temp", 

1987 "temporary", 

1988 "then", 

1989 "to", 

1990 "transaction", 

1991 "trigger", 

1992 "true", 

1993 "union", 

1994 "unique", 

1995 "update", 

1996 "using", 

1997 "vacuum", 

1998 "values", 

1999 "view", 

2000 "virtual", 

2001 "when", 

2002 "where", 

2003 } 

2004 

2005 

2006class SQLiteExecutionContext(default.DefaultExecutionContext): 

2007 @util.memoized_property 

2008 def _preserve_raw_colnames(self): 

2009 return ( 

2010 not self.dialect._broken_dotted_colnames 

2011 or self.execution_options.get("sqlite_raw_colnames", False) 

2012 ) 

2013 

2014 def _translate_colname(self, colname): 

2015 # TODO: detect SQLite version 3.10.0 or greater; 

2016 # see [ticket:3633] 

2017 

2018 # adjust for dotted column names. SQLite 

2019 # in the case of UNION may store col names as 

2020 # "tablename.colname", or if using an attached database, 

2021 # "database.tablename.colname", in cursor.description 

2022 if not self._preserve_raw_colnames and "." in colname: 

2023 return colname.split(".")[-1], colname 

2024 else: 

2025 return colname, None 

2026 

2027 

2028class SQLiteDialect(default.DefaultDialect): 

2029 name = "sqlite" 

2030 supports_alter = False 

2031 

2032 # SQlite supports "DEFAULT VALUES" but *does not* support 

2033 # "VALUES (DEFAULT)" 

2034 supports_default_values = True 

2035 supports_default_metavalue = False 

2036 

2037 # sqlite issue: 

2038 # https://github.com/python/cpython/issues/93421 

2039 # note this parameter is no longer used by the ORM or default dialect 

2040 # see #9414 

2041 supports_sane_rowcount_returning = False 

2042 

2043 supports_empty_insert = False 

2044 supports_cast = True 

2045 supports_multivalues_insert = True 

2046 use_insertmanyvalues = True 

2047 tuple_in_values = True 

2048 supports_statement_cache = True 

2049 insert_null_pk_still_autoincrements = True 

2050 insert_returning = True 

2051 update_returning = True 

2052 update_returning_multifrom = True 

2053 delete_returning = True 

2054 update_returning_multifrom = True 

2055 

2056 supports_default_metavalue = True 

2057 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

2058 

2059 default_metavalue_token = "NULL" 

2060 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

2061 parenthesis.""" 

2062 

2063 default_paramstyle = "qmark" 

2064 execution_ctx_cls = SQLiteExecutionContext 

2065 statement_compiler = SQLiteCompiler 

2066 ddl_compiler = SQLiteDDLCompiler 

2067 type_compiler_cls = SQLiteTypeCompiler 

2068 preparer = SQLiteIdentifierPreparer 

2069 ischema_names = ischema_names 

2070 colspecs = colspecs 

2071 

2072 construct_arguments = [ 

2073 ( 

2074 sa_schema.Table, 

2075 { 

2076 "autoincrement": False, 

2077 "with_rowid": True, 

2078 "strict": False, 

2079 }, 

2080 ), 

2081 (sa_schema.Index, {"where": None}), 

2082 ( 

2083 sa_schema.Column, 

2084 { 

2085 "on_conflict_primary_key": None, 

2086 "on_conflict_not_null": None, 

2087 "on_conflict_unique": None, 

2088 }, 

2089 ), 

2090 (sa_schema.Constraint, {"on_conflict": None}), 

2091 ] 

2092 

2093 _broken_fk_pragma_quotes = False 

2094 _broken_dotted_colnames = False 

2095 

2096 def __init__( 

2097 self, 

2098 native_datetime=False, 

2099 json_serializer=None, 

2100 json_deserializer=None, 

2101 **kwargs, 

2102 ): 

2103 default.DefaultDialect.__init__(self, **kwargs) 

2104 

2105 self._json_serializer = json_serializer 

2106 self._json_deserializer = json_deserializer 

2107 

2108 # this flag used by pysqlite dialect, and perhaps others in the 

2109 # future, to indicate the driver is handling date/timestamp 

2110 # conversions (and perhaps datetime/time as well on some hypothetical 

2111 # driver ?) 

2112 self.native_datetime = native_datetime 

2113 

2114 if self.dbapi is not None: 

2115 if self.dbapi.sqlite_version_info < (3, 7, 16): 

2116 util.warn( 

2117 "SQLite version %s is older than 3.7.16, and will not " 

2118 "support right nested joins, as are sometimes used in " 

2119 "more complex ORM scenarios. SQLAlchemy 1.4 and above " 

2120 "no longer tries to rewrite these joins." 

2121 % (self.dbapi.sqlite_version_info,) 

2122 ) 

2123 

2124 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These 

2125 # version checks are getting very stale. 

2126 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( 

2127 3, 

2128 10, 

2129 0, 

2130 ) 

2131 self.supports_default_values = self.dbapi.sqlite_version_info >= ( 

2132 3, 

2133 3, 

2134 8, 

2135 ) 

2136 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) 

2137 self.supports_multivalues_insert = ( 

2138 # https://www.sqlite.org/releaselog/3_7_11.html 

2139 self.dbapi.sqlite_version_info 

2140 >= (3, 7, 11) 

2141 ) 

2142 # see https://www.sqlalchemy.org/trac/ticket/2568 

2143 # as well as https://www.sqlite.org/src/info/600482d161 

2144 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( 

2145 3, 

2146 6, 

2147 14, 

2148 ) 

2149 

2150 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: 

2151 self.update_returning = self.delete_returning = ( 

2152 self.insert_returning 

2153 ) = False 

2154 

2155 if self.dbapi.sqlite_version_info < (3, 32, 0): 

2156 # https://www.sqlite.org/limits.html 

2157 self.insertmanyvalues_max_parameters = 999 

2158 

2159 _isolation_lookup = util.immutabledict( 

2160 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} 

2161 ) 

2162 

2163 def get_isolation_level_values(self, dbapi_connection): 

2164 return list(self._isolation_lookup) 

2165 

2166 def set_isolation_level(self, dbapi_connection, level): 

2167 isolation_level = self._isolation_lookup[level] 

2168 

2169 cursor = dbapi_connection.cursor() 

2170 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") 

2171 cursor.close() 

2172 

2173 def get_isolation_level(self, dbapi_connection): 

2174 cursor = dbapi_connection.cursor() 

2175 cursor.execute("PRAGMA read_uncommitted") 

2176 res = cursor.fetchone() 

2177 if res: 

2178 value = res[0] 

2179 else: 

2180 # https://www.sqlite.org/changes.html#version_3_3_3 

2181 # "Optional READ UNCOMMITTED isolation (instead of the 

2182 # default isolation level of SERIALIZABLE) and 

2183 # table level locking when database connections 

2184 # share a common cache."" 

2185 # pre-SQLite 3.3.0 default to 0 

2186 value = 0 

2187 cursor.close() 

2188 if value == 0: 

2189 return "SERIALIZABLE" 

2190 elif value == 1: 

2191 return "READ UNCOMMITTED" 

2192 else: 

2193 assert False, "Unknown isolation level %s" % value 

2194 

2195 @reflection.cache 

2196 def get_schema_names(self, connection, **kw): 

2197 s = "PRAGMA database_list" 

2198 dl = connection.exec_driver_sql(s) 

2199 

2200 return [db[1] for db in dl if db[1] != "temp"] 

2201 

2202 def _format_schema(self, schema, table_name): 

2203 if schema is not None: 

2204 qschema = self.identifier_preparer.quote_identifier(schema) 

2205 name = f"{qschema}.{table_name}" 

2206 else: 

2207 name = table_name 

2208 return name 

2209 

2210 def _sqlite_main_query( 

2211 self, 

2212 table: str, 

2213 type_: str, 

2214 schema: Optional[str], 

2215 sqlite_include_internal: bool, 

2216 ): 

2217 main = self._format_schema(schema, table) 

2218 if not sqlite_include_internal: 

2219 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" 

2220 else: 

2221 filter_table = "" 

2222 query = ( 

2223 f"SELECT name FROM {main} " 

2224 f"WHERE type='{type_}'{filter_table} " 

2225 "ORDER BY name" 

2226 ) 

2227 return query 

2228 

2229 @reflection.cache 

2230 def get_table_names( 

2231 self, connection, schema=None, sqlite_include_internal=False, **kw 

2232 ): 

2233 query = self._sqlite_main_query( 

2234 "sqlite_master", "table", schema, sqlite_include_internal 

2235 ) 

2236 names = connection.exec_driver_sql(query).scalars().all() 

2237 return names 

2238 

2239 @reflection.cache 

2240 def get_temp_table_names( 

2241 self, connection, sqlite_include_internal=False, **kw 

2242 ): 

2243 query = self._sqlite_main_query( 

2244 "sqlite_temp_master", "table", None, sqlite_include_internal 

2245 ) 

2246 names = connection.exec_driver_sql(query).scalars().all() 

2247 return names 

2248 

2249 @reflection.cache 

2250 def get_temp_view_names( 

2251 self, connection, sqlite_include_internal=False, **kw 

2252 ): 

2253 query = self._sqlite_main_query( 

2254 "sqlite_temp_master", "view", None, sqlite_include_internal 

2255 ) 

2256 names = connection.exec_driver_sql(query).scalars().all() 

2257 return names 

2258 

2259 @reflection.cache 

2260 def has_table(self, connection, table_name, schema=None, **kw): 

2261 self._ensure_has_table_connection(connection) 

2262 

2263 if schema is not None and schema not in self.get_schema_names( 

2264 connection, **kw 

2265 ): 

2266 return False 

2267 

2268 info = self._get_table_pragma( 

2269 connection, "table_info", table_name, schema=schema 

2270 ) 

2271 return bool(info) 

2272 

2273 def _get_default_schema_name(self, connection): 

2274 return "main" 

2275 

2276 @reflection.cache 

2277 def get_view_names( 

2278 self, connection, schema=None, sqlite_include_internal=False, **kw 

2279 ): 

2280 query = self._sqlite_main_query( 

2281 "sqlite_master", "view", schema, sqlite_include_internal 

2282 ) 

2283 names = connection.exec_driver_sql(query).scalars().all() 

2284 return names 

2285 

2286 @reflection.cache 

2287 def get_view_definition(self, connection, view_name, schema=None, **kw): 

2288 if schema is not None: 

2289 qschema = self.identifier_preparer.quote_identifier(schema) 

2290 master = f"{qschema}.sqlite_master" 

2291 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( 

2292 master, 

2293 ) 

2294 rs = connection.exec_driver_sql(s, (view_name,)) 

2295 else: 

2296 try: 

2297 s = ( 

2298 "SELECT sql FROM " 

2299 " (SELECT * FROM sqlite_master UNION ALL " 

2300 " SELECT * FROM sqlite_temp_master) " 

2301 "WHERE name = ? " 

2302 "AND type='view'" 

2303 ) 

2304 rs = connection.exec_driver_sql(s, (view_name,)) 

2305 except exc.DBAPIError: 

2306 s = ( 

2307 "SELECT sql FROM sqlite_master WHERE name = ? " 

2308 "AND type='view'" 

2309 ) 

2310 rs = connection.exec_driver_sql(s, (view_name,)) 

2311 

2312 result = rs.fetchall() 

2313 if result: 

2314 return result[0].sql 

2315 else: 

2316 raise exc.NoSuchTableError( 

2317 f"{schema}.{view_name}" if schema else view_name 

2318 ) 

2319 

2320 @reflection.cache 

2321 def get_columns(self, connection, table_name, schema=None, **kw): 

2322 pragma = "table_info" 

2323 # computed columns are threaded as hidden, they require table_xinfo 

2324 if self.server_version_info >= (3, 31): 

2325 pragma = "table_xinfo" 

2326 info = self._get_table_pragma( 

2327 connection, pragma, table_name, schema=schema 

2328 ) 

2329 columns = [] 

2330 tablesql = None 

2331 for row in info: 

2332 name = row[1] 

2333 type_ = row[2].upper() 

2334 nullable = not row[3] 

2335 default = row[4] 

2336 primary_key = row[5] 

2337 hidden = row[6] if pragma == "table_xinfo" else 0 

2338 

2339 # hidden has value 0 for normal columns, 1 for hidden columns, 

2340 # 2 for computed virtual columns and 3 for computed stored columns 

2341 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b 

2342 if hidden == 1: 

2343 continue 

2344 

2345 generated = bool(hidden) 

2346 persisted = hidden == 3 

2347 

2348 if tablesql is None and generated: 

2349 tablesql = self._get_table_sql( 

2350 connection, table_name, schema, **kw 

2351 ) 

2352 # remove create table 

2353 match = re.match( 

2354 r"create table .*?\((.*)\)$", 

2355 tablesql.strip(), 

2356 re.DOTALL | re.IGNORECASE, 

2357 ) 

2358 assert match, f"create table not found in {tablesql}" 

2359 tablesql = match.group(1).strip() 

2360 

2361 columns.append( 

2362 self._get_column_info( 

2363 name, 

2364 type_, 

2365 nullable, 

2366 default, 

2367 primary_key, 

2368 generated, 

2369 persisted, 

2370 tablesql, 

2371 ) 

2372 ) 

2373 if columns: 

2374 return columns 

2375 elif not self.has_table(connection, table_name, schema): 

2376 raise exc.NoSuchTableError( 

2377 f"{schema}.{table_name}" if schema else table_name 

2378 ) 

2379 else: 

2380 return ReflectionDefaults.columns() 

2381 

2382 def _get_column_info( 

2383 self, 

2384 name, 

2385 type_, 

2386 nullable, 

2387 default, 

2388 primary_key, 

2389 generated, 

2390 persisted, 

2391 tablesql, 

2392 ): 

2393 if generated: 

2394 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" 

2395 # somehow is "INTEGER GENERATED ALWAYS" 

2396 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) 

2397 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() 

2398 

2399 coltype = self._resolve_type_affinity(type_) 

2400 

2401 if default is not None: 

2402 default = str(default) 

2403 

2404 colspec = { 

2405 "name": name, 

2406 "type": coltype, 

2407 "nullable": nullable, 

2408 "default": default, 

2409 "primary_key": primary_key, 

2410 } 

2411 if generated: 

2412 sqltext = "" 

2413 if tablesql: 

2414 pattern = ( 

2415 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS" 

2416 r"\s+\((.*)\)\s*(?:virtual|stored)?" 

2417 ) 

2418 match = re.search( 

2419 re.escape(name) + pattern, tablesql, re.IGNORECASE 

2420 ) 

2421 if match: 

2422 sqltext = match.group(1) 

2423 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} 

2424 return colspec 

2425 

2426 def _resolve_type_affinity(self, type_): 

2427 """Return a data type from a reflected column, using affinity rules. 

2428 

2429 SQLite's goal for universal compatibility introduces some complexity 

2430 during reflection, as a column's defined type might not actually be a 

2431 type that SQLite understands - or indeed, my not be defined *at all*. 

2432 Internally, SQLite handles this with a 'data type affinity' for each 

2433 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', 

2434 'REAL', or 'NONE' (raw bits). The algorithm that determines this is 

2435 listed in https://www.sqlite.org/datatype3.html section 2.1. 

2436 

2437 This method allows SQLAlchemy to support that algorithm, while still 

2438 providing access to smarter reflection utilities by recognizing 

2439 column definitions that SQLite only supports through affinity (like 

2440 DATE and DOUBLE). 

2441 

2442 """ 

2443 match = re.match(r"([\w ]+)(\(.*?\))?", type_) 

2444 if match: 

2445 coltype = match.group(1) 

2446 args = match.group(2) 

2447 else: 

2448 coltype = "" 

2449 args = "" 

2450 

2451 if coltype in self.ischema_names: 

2452 coltype = self.ischema_names[coltype] 

2453 elif "INT" in coltype: 

2454 coltype = sqltypes.INTEGER 

2455 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: 

2456 coltype = sqltypes.TEXT 

2457 elif "BLOB" in coltype or not coltype: 

2458 coltype = sqltypes.NullType 

2459 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: 

2460 coltype = sqltypes.REAL 

2461 else: 

2462 coltype = sqltypes.NUMERIC 

2463 

2464 if args is not None: 

2465 args = re.findall(r"(\d+)", args) 

2466 try: 

2467 coltype = coltype(*[int(a) for a in args]) 

2468 except TypeError: 

2469 util.warn( 

2470 "Could not instantiate type %s with " 

2471 "reflected arguments %s; using no arguments." 

2472 % (coltype, args) 

2473 ) 

2474 coltype = coltype() 

2475 else: 

2476 coltype = coltype() 

2477 

2478 return coltype 

2479 

2480 @reflection.cache 

2481 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

2482 constraint_name = None 

2483 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2484 if table_data: 

2485 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY" 

2486 result = re.search(PK_PATTERN, table_data, re.I) 

2487 constraint_name = result.group(1) if result else None 

2488 

2489 cols = self.get_columns(connection, table_name, schema, **kw) 

2490 # consider only pk columns. This also avoids sorting the cached 

2491 # value returned by get_columns 

2492 cols = [col for col in cols if col.get("primary_key", 0) > 0] 

2493 cols.sort(key=lambda col: col.get("primary_key")) 

2494 pkeys = [col["name"] for col in cols] 

2495 

2496 if pkeys: 

2497 return {"constrained_columns": pkeys, "name": constraint_name} 

2498 else: 

2499 return ReflectionDefaults.pk_constraint() 

2500 

2501 @reflection.cache 

2502 def get_foreign_keys(self, connection, table_name, schema=None, **kw): 

2503 # sqlite makes this *extremely difficult*. 

2504 # First, use the pragma to get the actual FKs. 

2505 pragma_fks = self._get_table_pragma( 

2506 connection, "foreign_key_list", table_name, schema=schema 

2507 ) 

2508 

2509 fks = {} 

2510 

2511 for row in pragma_fks: 

2512 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) 

2513 

2514 if not rcol: 

2515 # no referred column, which means it was not named in the 

2516 # original DDL. The referred columns of the foreign key 

2517 # constraint are therefore the primary key of the referred 

2518 # table. 

2519 try: 

2520 referred_pk = self.get_pk_constraint( 

2521 connection, rtbl, schema=schema, **kw 

2522 ) 

2523 referred_columns = referred_pk["constrained_columns"] 

2524 except exc.NoSuchTableError: 

2525 # ignore not existing parents 

2526 referred_columns = [] 

2527 else: 

2528 # note we use this list only if this is the first column 

2529 # in the constraint. for subsequent columns we ignore the 

2530 # list and append "rcol" if present. 

2531 referred_columns = [] 

2532 

2533 if self._broken_fk_pragma_quotes: 

2534 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) 

2535 

2536 if numerical_id in fks: 

2537 fk = fks[numerical_id] 

2538 else: 

2539 fk = fks[numerical_id] = { 

2540 "name": None, 

2541 "constrained_columns": [], 

2542 "referred_schema": schema, 

2543 "referred_table": rtbl, 

2544 "referred_columns": referred_columns, 

2545 "options": {}, 

2546 } 

2547 fks[numerical_id] = fk 

2548 

2549 fk["constrained_columns"].append(lcol) 

2550 

2551 if rcol: 

2552 fk["referred_columns"].append(rcol) 

2553 

2554 def fk_sig(constrained_columns, referred_table, referred_columns): 

2555 return ( 

2556 tuple(constrained_columns) 

2557 + (referred_table,) 

2558 + tuple(referred_columns) 

2559 ) 

2560 

2561 # then, parse the actual SQL and attempt to find DDL that matches 

2562 # the names as well. SQLite saves the DDL in whatever format 

2563 # it was typed in as, so need to be liberal here. 

2564 

2565 keys_by_signature = { 

2566 fk_sig( 

2567 fk["constrained_columns"], 

2568 fk["referred_table"], 

2569 fk["referred_columns"], 

2570 ): fk 

2571 for fk in fks.values() 

2572 } 

2573 

2574 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2575 

2576 def parse_fks(): 

2577 if table_data is None: 

2578 # system tables, etc. 

2579 return 

2580 

2581 # note that we already have the FKs from PRAGMA above. This whole 

2582 # regexp thing is trying to locate additional detail about the 

2583 # FKs, namely the name of the constraint and other options. 

2584 # so parsing the columns is really about matching it up to what 

2585 # we already have. 

2586 FK_PATTERN = ( 

2587 r"(?:CONSTRAINT (\w+) +)?" 

2588 r"FOREIGN KEY *\( *(.+?) *\) +" 

2589 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501 

2590 r"((?:ON (?:DELETE|UPDATE) " 

2591 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" 

2592 r"((?:NOT +)?DEFERRABLE)?" 

2593 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" 

2594 ) 

2595 for match in re.finditer(FK_PATTERN, table_data, re.I): 

2596 ( 

2597 constraint_name, 

2598 constrained_columns, 

2599 referred_quoted_name, 

2600 referred_name, 

2601 referred_columns, 

2602 onupdatedelete, 

2603 deferrable, 

2604 initially, 

2605 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8) 

2606 constrained_columns = list( 

2607 self._find_cols_in_sig(constrained_columns) 

2608 ) 

2609 if not referred_columns: 

2610 referred_columns = constrained_columns 

2611 else: 

2612 referred_columns = list( 

2613 self._find_cols_in_sig(referred_columns) 

2614 ) 

2615 referred_name = referred_quoted_name or referred_name 

2616 options = {} 

2617 

2618 for token in re.split(r" *\bON\b *", onupdatedelete.upper()): 

2619 if token.startswith("DELETE"): 

2620 ondelete = token[6:].strip() 

2621 if ondelete and ondelete != "NO ACTION": 

2622 options["ondelete"] = ondelete 

2623 elif token.startswith("UPDATE"): 

2624 onupdate = token[6:].strip() 

2625 if onupdate and onupdate != "NO ACTION": 

2626 options["onupdate"] = onupdate 

2627 

2628 if deferrable: 

2629 options["deferrable"] = "NOT" not in deferrable.upper() 

2630 if initially: 

2631 options["initially"] = initially.upper() 

2632 

2633 yield ( 

2634 constraint_name, 

2635 constrained_columns, 

2636 referred_name, 

2637 referred_columns, 

2638 options, 

2639 ) 

2640 

2641 fkeys = [] 

2642 

2643 for ( 

2644 constraint_name, 

2645 constrained_columns, 

2646 referred_name, 

2647 referred_columns, 

2648 options, 

2649 ) in parse_fks(): 

2650 sig = fk_sig(constrained_columns, referred_name, referred_columns) 

2651 if sig not in keys_by_signature: 

2652 util.warn( 

2653 "WARNING: SQL-parsed foreign key constraint " 

2654 "'%s' could not be located in PRAGMA " 

2655 "foreign_keys for table %s" % (sig, table_name) 

2656 ) 

2657 continue 

2658 key = keys_by_signature.pop(sig) 

2659 key["name"] = constraint_name 

2660 key["options"] = options 

2661 fkeys.append(key) 

2662 # assume the remainders are the unnamed, inline constraints, just 

2663 # use them as is as it's extremely difficult to parse inline 

2664 # constraints 

2665 fkeys.extend(keys_by_signature.values()) 

2666 if fkeys: 

2667 return fkeys 

2668 else: 

2669 return ReflectionDefaults.foreign_keys() 

2670 

2671 def _find_cols_in_sig(self, sig): 

2672 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): 

2673 yield match.group(1) or match.group(2) 

2674 

2675 @reflection.cache 

2676 def get_unique_constraints( 

2677 self, connection, table_name, schema=None, **kw 

2678 ): 

2679 auto_index_by_sig = {} 

2680 for idx in self.get_indexes( 

2681 connection, 

2682 table_name, 

2683 schema=schema, 

2684 include_auto_indexes=True, 

2685 **kw, 

2686 ): 

2687 if not idx["name"].startswith("sqlite_autoindex"): 

2688 continue 

2689 sig = tuple(idx["column_names"]) 

2690 auto_index_by_sig[sig] = idx 

2691 

2692 table_data = self._get_table_sql( 

2693 connection, table_name, schema=schema, **kw 

2694 ) 

2695 unique_constraints = [] 

2696 

2697 def parse_uqs(): 

2698 if table_data is None: 

2699 return 

2700 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)' 

2701 INLINE_UNIQUE_PATTERN = ( 

2702 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]' 

2703 r"+[a-z0-9_ ]+?[\t ]+UNIQUE" 

2704 ) 

2705 

2706 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): 

2707 name, cols = match.group(1, 2) 

2708 yield name, list(self._find_cols_in_sig(cols)) 

2709 

2710 # we need to match inlines as well, as we seek to differentiate 

2711 # a UNIQUE constraint from a UNIQUE INDEX, even though these 

2712 # are kind of the same thing :) 

2713 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): 

2714 cols = list( 

2715 self._find_cols_in_sig(match.group(1) or match.group(2)) 

2716 ) 

2717 yield None, cols 

2718 

2719 for name, cols in parse_uqs(): 

2720 sig = tuple(cols) 

2721 if sig in auto_index_by_sig: 

2722 auto_index_by_sig.pop(sig) 

2723 parsed_constraint = {"name": name, "column_names": cols} 

2724 unique_constraints.append(parsed_constraint) 

2725 # NOTE: auto_index_by_sig might not be empty here, 

2726 # the PRIMARY KEY may have an entry. 

2727 if unique_constraints: 

2728 return unique_constraints 

2729 else: 

2730 return ReflectionDefaults.unique_constraints() 

2731 

2732 @reflection.cache 

2733 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

2734 table_data = self._get_table_sql( 

2735 connection, table_name, schema=schema, **kw 

2736 ) 

2737 

2738 # NOTE NOTE NOTE 

2739 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way 

2740 # to parse CHECK constraints that contain newlines themselves using 

2741 # regular expressions, and the approach here relies upon each 

2742 # individual 

2743 # CHECK constraint being on a single line by itself. This 

2744 # necessarily makes assumptions as to how the CREATE TABLE 

2745 # was emitted. A more comprehensive DDL parsing solution would be 

2746 # needed to improve upon the current situation. See #11840 for 

2747 # background 

2748 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *" 

2749 cks = [] 

2750 

2751 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I): 

2752 

2753 name = match.group(1) 

2754 

2755 if name: 

2756 name = re.sub(r'^"|"$', "", name) 

2757 

2758 cks.append({"sqltext": match.group(2), "name": name}) 

2759 cks.sort(key=lambda d: d["name"] or "~") # sort None as last 

2760 if cks: 

2761 return cks 

2762 else: 

2763 return ReflectionDefaults.check_constraints() 

2764 

2765 @reflection.cache 

2766 def get_indexes(self, connection, table_name, schema=None, **kw): 

2767 pragma_indexes = self._get_table_pragma( 

2768 connection, "index_list", table_name, schema=schema 

2769 ) 

2770 indexes = [] 

2771 

2772 # regular expression to extract the filter predicate of a partial 

2773 # index. this could fail to extract the predicate correctly on 

2774 # indexes created like 

2775 # CREATE INDEX i ON t (col || ') where') WHERE col <> '' 

2776 # but as this function does not support expression-based indexes 

2777 # this case does not occur. 

2778 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) 

2779 

2780 if schema: 

2781 schema_expr = "%s." % self.identifier_preparer.quote_identifier( 

2782 schema 

2783 ) 

2784 else: 

2785 schema_expr = "" 

2786 

2787 include_auto_indexes = kw.pop("include_auto_indexes", False) 

2788 for row in pragma_indexes: 

2789 # ignore implicit primary key index. 

2790 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html 

2791 if not include_auto_indexes and row[1].startswith( 

2792 "sqlite_autoindex" 

2793 ): 

2794 continue 

2795 indexes.append( 

2796 dict( 

2797 name=row[1], 

2798 column_names=[], 

2799 unique=row[2], 

2800 dialect_options={}, 

2801 ) 

2802 ) 

2803 

2804 # check partial indexes 

2805 if len(row) >= 5 and row[4]: 

2806 s = ( 

2807 "SELECT sql FROM %(schema)ssqlite_master " 

2808 "WHERE name = ? " 

2809 "AND type = 'index'" % {"schema": schema_expr} 

2810 ) 

2811 rs = connection.exec_driver_sql(s, (row[1],)) 

2812 index_sql = rs.scalar() 

2813 predicate_match = partial_pred_re.search(index_sql) 

2814 if predicate_match is None: 

2815 # unless the regex is broken this case shouldn't happen 

2816 # because we know this is a partial index, so the 

2817 # definition sql should match the regex 

2818 util.warn( 

2819 "Failed to look up filter predicate of " 

2820 "partial index %s" % row[1] 

2821 ) 

2822 else: 

2823 predicate = predicate_match.group(1) 

2824 indexes[-1]["dialect_options"]["sqlite_where"] = text( 

2825 predicate 

2826 ) 

2827 

2828 # loop thru unique indexes to get the column names. 

2829 for idx in list(indexes): 

2830 pragma_index = self._get_table_pragma( 

2831 connection, "index_info", idx["name"], schema=schema 

2832 ) 

2833 

2834 for row in pragma_index: 

2835 if row[2] is None: 

2836 util.warn( 

2837 "Skipped unsupported reflection of " 

2838 "expression-based index %s" % idx["name"] 

2839 ) 

2840 indexes.remove(idx) 

2841 break 

2842 else: 

2843 idx["column_names"].append(row[2]) 

2844 

2845 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last 

2846 if indexes: 

2847 return indexes 

2848 elif not self.has_table(connection, table_name, schema): 

2849 raise exc.NoSuchTableError( 

2850 f"{schema}.{table_name}" if schema else table_name 

2851 ) 

2852 else: 

2853 return ReflectionDefaults.indexes() 

2854 

2855 def _is_sys_table(self, table_name): 

2856 return table_name in { 

2857 "sqlite_schema", 

2858 "sqlite_master", 

2859 "sqlite_temp_schema", 

2860 "sqlite_temp_master", 

2861 } 

2862 

2863 @reflection.cache 

2864 def _get_table_sql(self, connection, table_name, schema=None, **kw): 

2865 if schema: 

2866 schema_expr = "%s." % ( 

2867 self.identifier_preparer.quote_identifier(schema) 

2868 ) 

2869 else: 

2870 schema_expr = "" 

2871 try: 

2872 s = ( 

2873 "SELECT sql FROM " 

2874 " (SELECT * FROM %(schema)ssqlite_master UNION ALL " 

2875 " SELECT * FROM %(schema)ssqlite_temp_master) " 

2876 "WHERE name = ? " 

2877 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2878 ) 

2879 rs = connection.exec_driver_sql(s, (table_name,)) 

2880 except exc.DBAPIError: 

2881 s = ( 

2882 "SELECT sql FROM %(schema)ssqlite_master " 

2883 "WHERE name = ? " 

2884 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2885 ) 

2886 rs = connection.exec_driver_sql(s, (table_name,)) 

2887 value = rs.scalar() 

2888 if value is None and not self._is_sys_table(table_name): 

2889 raise exc.NoSuchTableError(f"{schema_expr}{table_name}") 

2890 return value 

2891 

2892 def _get_table_pragma(self, connection, pragma, table_name, schema=None): 

2893 quote = self.identifier_preparer.quote_identifier 

2894 if schema is not None: 

2895 statements = [f"PRAGMA {quote(schema)}."] 

2896 else: 

2897 # because PRAGMA looks in all attached databases if no schema 

2898 # given, need to specify "main" schema, however since we want 

2899 # 'temp' tables in the same namespace as 'main', need to run 

2900 # the PRAGMA twice 

2901 statements = ["PRAGMA main.", "PRAGMA temp."] 

2902 

2903 qtable = quote(table_name) 

2904 for statement in statements: 

2905 statement = f"{statement}{pragma}({qtable})" 

2906 cursor = connection.exec_driver_sql(statement) 

2907 if not cursor._soft_closed: 

2908 # work around SQLite issue whereby cursor.description 

2909 # is blank when PRAGMA returns no rows: 

2910 # https://www.sqlite.org/cvstrac/tktview?tn=1884 

2911 result = cursor.fetchall() 

2912 else: 

2913 result = [] 

2914 if result: 

2915 return result 

2916 else: 

2917 return []