Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

792 statements  

1# dialects/sqlite/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r''' 

11.. dialect:: sqlite 

12 :name: SQLite 

13 :normal_support: 3.12+ 

14 :best_effort: 3.7.16+ 

15 

16.. _sqlite_datetime: 

17 

18Date and Time Types 

19------------------- 

20 

21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does 

22not provide out of the box functionality for translating values between Python 

23`datetime` objects and a SQLite-supported format. SQLAlchemy's own 

24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting 

25and parsing functionality when SQLite is used. The implementation classes are 

26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. 

27These types represent dates and times as ISO formatted strings, which also 

28nicely support ordering. There's no reliance on typical "libc" internals for 

29these functions so historical dates are fully supported. 

30 

31Ensuring Text affinity 

32^^^^^^^^^^^^^^^^^^^^^^ 

33 

34The DDL rendered for these types is the standard ``DATE``, ``TIME`` 

35and ``DATETIME`` indicators. However, custom storage formats can also be 

36applied to these types. When the 

37storage format is detected as containing no alpha characters, the DDL for 

38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, 

39so that the column continues to have textual affinity. 

40 

41.. seealso:: 

42 

43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - 

44 in the SQLite documentation 

45 

46.. _sqlite_autoincrement: 

47 

48SQLite Auto Incrementing Behavior 

49---------------------------------- 

50 

51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html 

52 

53Key concepts: 

54 

55* SQLite has an implicit "auto increment" feature that takes place for any 

56 non-composite primary-key column that is specifically created using 

57 "INTEGER PRIMARY KEY" for the type + primary key. 

58 

59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** 

60 equivalent to the implicit autoincrement feature; this keyword is not 

61 recommended for general use. SQLAlchemy does not render this keyword 

62 unless a special SQLite-specific directive is used (see below). However, 

63 it still requires that the column's type is named "INTEGER". 

64 

65Using the AUTOINCREMENT Keyword 

66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

67 

68To specifically render the AUTOINCREMENT keyword on the primary key column 

69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table 

70construct:: 

71 

72 Table( 

73 "sometable", 

74 metadata, 

75 Column("id", Integer, primary_key=True), 

76 sqlite_autoincrement=True, 

77 ) 

78 

79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER 

80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

81 

82SQLite's typing model is based on naming conventions. Among other things, this 

83means that any type name which contains the substring ``"INT"`` will be 

84determined to be of "integer affinity". A type named ``"BIGINT"``, 

85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be 

86of "integer" affinity. However, **the SQLite autoincrement feature, whether 

87implicitly or explicitly enabled, requires that the name of the column's type 

88is exactly the string "INTEGER"**. Therefore, if an application uses a type 

89like :class:`.BigInteger` for a primary key, on SQLite this type will need to 

90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE 

91TABLE`` statement in order for the autoincrement behavior to be available. 

92 

93One approach to achieve this is to use :class:`.Integer` on SQLite 

94only using :meth:`.TypeEngine.with_variant`:: 

95 

96 table = Table( 

97 "my_table", 

98 metadata, 

99 Column( 

100 "id", 

101 BigInteger().with_variant(Integer, "sqlite"), 

102 primary_key=True, 

103 ), 

104 ) 

105 

106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL 

107name to be ``INTEGER`` when compiled against SQLite:: 

108 

109 from sqlalchemy import BigInteger 

110 from sqlalchemy.ext.compiler import compiles 

111 

112 

113 class SLBigInteger(BigInteger): 

114 pass 

115 

116 

117 @compiles(SLBigInteger, "sqlite") 

118 def bi_c(element, compiler, **kw): 

119 return "INTEGER" 

120 

121 

122 @compiles(SLBigInteger) 

123 def bi_c(element, compiler, **kw): 

124 return compiler.visit_BIGINT(element, **kw) 

125 

126 

127 table = Table( 

128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True) 

129 ) 

130 

131.. seealso:: 

132 

133 :meth:`.TypeEngine.with_variant` 

134 

135 :ref:`sqlalchemy.ext.compiler_toplevel` 

136 

137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ 

138 

139.. _sqlite_transactions: 

140 

141Transactions with SQLite and the sqlite3 driver 

142----------------------------------------------- 

143 

144As a file-based database, SQLite's approach to transactions differs from 

145traditional databases in many ways. Additionally, the ``sqlite3`` driver 

146standard with Python (as well as the async version ``aiosqlite`` which builds 

147on top of it) has several quirks, workarounds, and API features in the 

148area of transaction control, all of which generally need to be addressed when 

149constructing a SQLAlchemy application that uses SQLite. 

150 

151Legacy Transaction Mode with the sqlite3 driver 

152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

153 

154The most important aspect of transaction handling with the sqlite3 driver is 

155that it defaults (which will continue through Python 3.15 before being 

156removed in Python 3.16) to legacy transactional behavior which does 

157not strictly follow :pep:`249`. The way in which the driver diverges from the 

158PEP is that it does not "begin" a transaction automatically as dictated by 

159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and 

160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon 

161the first SQL statement of any kind, so that all subsequent operations will 

162be established within a transaction until ``connection.commit()`` has been 

163called. The ``sqlite3`` driver, in an effort to be easier to use in 

164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, 

165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy 

166reasons. Statements such as SAVEPOINT are also skipped. 

167 

168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy 

169mode of operation is referred to as 

170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in 

171effect by default due to the ``Connection.autocommit`` parameter being set to 

172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12, 

173the ``Connection.autocommit`` attribute did not exist. 

174 

175The implications of legacy transaction mode include: 

176 

177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE, 

178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not 

179 started already, leading to the changes by each statement being 

180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very 

181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these 

182 operations even if a transaction were present, however this is no longer the 

183 case. 

184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation 

185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file- 

186 based system that locks the database file entirely for write operations, 

187 preventing COMMIT until all reader transactions (and associated file locks) 

188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT 

189 statements, which causes these SELECT statements to no longer be "repeatable", 

190 failing one of the consistency guarantees of SERIALIZABLE. 

191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not 

192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its 

193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK 

194 of the transaction will not rollback elements that were part of a released 

195 savepoint. 

196 

197Legacy transaction mode first existed in order to faciliate working around 

198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to 

199get "database is locked" errors, particularly when newer features like "write 

200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy 

201transaction mode is still the default mode of operation; disabling it will 

202produce behavior that is more susceptible to locked database errors. However 

203note that **legacy transaction mode will no longer be the default** in a future 

204Python version (3.16 as of this writing). 

205 

206.. _sqlite_enabling_transactions: 

207 

208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver 

209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

210 

211Current SQLAlchemy support allows either for setting the 

212``.Connection.autocommit`` attribute, most directly by using a 

213:func:`._sa.create_engine` parameter, or if on an older version of Python where 

214the attribute is not available, using event hooks to control the behavior of 

215BEGIN. 

216 

217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above) 

218 

219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_, 

220 the most straightforward approach is to set the attribute to its recommended value 

221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``:: 

222 

223 from sqlalchemy import create_engine 

224 

225 engine = create_engine( 

226 "sqlite:///myfile.db", connect_args={"autocommit": False} 

227 ) 

228 

229 This parameter is also passed through when using the aiosqlite driver:: 

230 

231 from sqlalchemy.ext.asyncio import create_async_engine 

232 

233 engine = create_async_engine( 

234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False} 

235 ) 

236 

237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect` 

238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this 

239 attribute on its ``Connection`` object:: 

240 

241 from sqlalchemy import create_engine, event 

242 

243 engine = create_engine("sqlite:///myfile.db") 

244 

245 

246 @event.listens_for(engine, "connect") 

247 def do_connect(dbapi_connection, connection_record): 

248 # enable autocommit=False mode 

249 dbapi_connection.autocommit = False 

250 

251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite) 

252 

253 For older versions of ``sqlite3`` or for cross-compatiblity with older and 

254 newer versions, SQLAlchemy can also take over the job of transaction control. 

255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook 

256 to emit the "BEGIN" command directly, while also disabling SQLite's control 

257 of this command using the :meth:`.PoolEvents.connect` event hook to set the 

258 ``Connection.isolation_level`` attribute to ``None``:: 

259 

260 

261 from sqlalchemy import create_engine, event 

262 

263 engine = create_engine("sqlite:///myfile.db") 

264 

265 

266 @event.listens_for(engine, "connect") 

267 def do_connect(dbapi_connection, connection_record): 

268 # disable sqlite3's emitting of the BEGIN statement entirely. 

269 dbapi_connection.isolation_level = None 

270 

271 

272 @event.listens_for(engine, "begin") 

273 def do_begin(conn): 

274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly 

275 conn.exec_driver_sql("BEGIN") 

276 

277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine`` 

278 as in the example below:: 

279 

280 from sqlalchemy import create_engine, event 

281 from sqlalchemy.ext.asyncio import create_async_engine 

282 

283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 

284 

285 

286 @event.listens_for(engine.sync_engine, "connect") 

287 def do_connect(dbapi_connection, connection_record): 

288 # disable aiosqlite's emitting of the BEGIN statement entirely. 

289 dbapi_connection.isolation_level = None 

290 

291 

292 @event.listens_for(engine.sync_engine, "begin") 

293 def do_begin(conn): 

294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly 

295 conn.exec_driver_sql("BEGIN") 

296 

297.. _sqlite_isolation_level: 

298 

299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite 

300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

301 

302SQLAlchemy has a comprehensive database isolation feature with optional 

303autocommit support that is introduced in the section :ref:`dbapi_autocommit`. 

304 

305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes 

306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible 

307with the non-legacy isolation mode hooks documented in the previous 

308section at :ref:`sqlite_enabling_transactions`. 

309 

310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit, 

311create an engine setting the :paramref:`_sa.create_engine.isolation_level` 

312parameter to "AUTOCOMMIT":: 

313 

314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT") 

315 

316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit`` 

317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL`` 

318as well as hooks that emit ``BEGIN`` should be disabled. 

319 

320Additional Reading for SQLite / sqlite3 transaction control 

321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

322 

323Links with important information on SQLite, the sqlite3 driver, 

324as well as long historical conversations on how things got to their current state: 

325 

326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website 

327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well 

328 as the legacy isolation_level attribute. 

329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github 

330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github 

331 

332 

333INSERT/UPDATE/DELETE...RETURNING 

334--------------------------------- 

335 

336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING`` 

337syntax. ``INSERT..RETURNING`` may be used 

338automatically in some cases in order to fetch newly generated identifiers in 

339place of the traditional approach of using ``cursor.lastrowid``, however 

340``cursor.lastrowid`` is currently still preferred for simple single-statement 

341cases for its better performance. 

342 

343To specify an explicit ``RETURNING`` clause, use the 

344:meth:`._UpdateBase.returning` method on a per-statement basis:: 

345 

346 # INSERT..RETURNING 

347 result = connection.execute( 

348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2) 

349 ) 

350 print(result.all()) 

351 

352 # UPDATE..RETURNING 

353 result = connection.execute( 

354 table.update() 

355 .where(table.c.name == "foo") 

356 .values(name="bar") 

357 .returning(table.c.col1, table.c.col2) 

358 ) 

359 print(result.all()) 

360 

361 # DELETE..RETURNING 

362 result = connection.execute( 

363 table.delete() 

364 .where(table.c.name == "foo") 

365 .returning(table.c.col1, table.c.col2) 

366 ) 

367 print(result.all()) 

368 

369.. versionadded:: 2.0 Added support for SQLite RETURNING 

370 

371 

372.. _sqlite_foreign_keys: 

373 

374Foreign Key Support 

375------------------- 

376 

377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, 

378however by default these constraints have no effect on the operation of the 

379table. 

380 

381Constraint checking on SQLite has three prerequisites: 

382 

383* At least version 3.6.19 of SQLite must be in use 

384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY 

385 or SQLITE_OMIT_TRIGGER symbols enabled. 

386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all 

387 connections before use -- including the initial call to 

388 :meth:`sqlalchemy.schema.MetaData.create_all`. 

389 

390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for 

391new connections through the usage of events:: 

392 

393 from sqlalchemy.engine import Engine 

394 from sqlalchemy import event 

395 

396 

397 @event.listens_for(Engine, "connect") 

398 def set_sqlite_pragma(dbapi_connection, connection_record): 

399 # the sqlite3 driver will not set PRAGMA foreign_keys 

400 # if autocommit=False; set to True temporarily 

401 ac = dbapi_connection.autocommit 

402 dbapi_connection.autocommit = True 

403 

404 cursor = dbapi_connection.cursor() 

405 cursor.execute("PRAGMA foreign_keys=ON") 

406 cursor.close() 

407 

408 # restore previous autocommit setting 

409 dbapi_connection.autocommit = ac 

410 

411.. warning:: 

412 

413 When SQLite foreign keys are enabled, it is **not possible** 

414 to emit CREATE or DROP statements for tables that contain 

415 mutually-dependent foreign key constraints; 

416 to emit the DDL for these tables requires that ALTER TABLE be used to 

417 create or drop these constraints separately, for which SQLite has 

418 no support. 

419 

420.. seealso:: 

421 

422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ 

423 - on the SQLite web site. 

424 

425 :ref:`event_toplevel` - SQLAlchemy event API. 

426 

427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling 

428 mutually-dependent foreign key constraints. 

429 

430.. _sqlite_on_conflict_ddl: 

431 

432ON CONFLICT support for constraints 

433----------------------------------- 

434 

435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for 

436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as 

437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. 

438 

439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied 

440to primary key, unique, check, and not null constraints. In DDL, it is 

441rendered either within the "CONSTRAINT" clause or within the column definition 

442itself depending on the location of the target constraint. To render this 

443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be 

444specified with a string conflict resolution algorithm within the 

445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, 

446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object, 

447there 

448are individual parameters ``sqlite_on_conflict_not_null``, 

449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each 

450correspond to the three types of relevant constraint types that can be 

451indicated from a :class:`_schema.Column` object. 

452 

453.. seealso:: 

454 

455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite 

456 documentation 

457 

458.. versionadded:: 1.3 

459 

460 

461The ``sqlite_on_conflict`` parameters accept a string argument which is just 

462the resolution name to be chosen, which on SQLite can be one of ROLLBACK, 

463ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint 

464that specifies the IGNORE algorithm:: 

465 

466 some_table = Table( 

467 "some_table", 

468 metadata, 

469 Column("id", Integer, primary_key=True), 

470 Column("data", Integer), 

471 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"), 

472 ) 

473 

474The above renders CREATE TABLE DDL as: 

475 

476.. sourcecode:: sql 

477 

478 CREATE TABLE some_table ( 

479 id INTEGER NOT NULL, 

480 data INTEGER, 

481 PRIMARY KEY (id), 

482 UNIQUE (id, data) ON CONFLICT IGNORE 

483 ) 

484 

485 

486When using the :paramref:`_schema.Column.unique` 

487flag to add a UNIQUE constraint 

488to a single column, the ``sqlite_on_conflict_unique`` parameter can 

489be added to the :class:`_schema.Column` as well, which will be added to the 

490UNIQUE constraint in the DDL:: 

491 

492 some_table = Table( 

493 "some_table", 

494 metadata, 

495 Column("id", Integer, primary_key=True), 

496 Column( 

497 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE" 

498 ), 

499 ) 

500 

501rendering: 

502 

503.. sourcecode:: sql 

504 

505 CREATE TABLE some_table ( 

506 id INTEGER NOT NULL, 

507 data INTEGER, 

508 PRIMARY KEY (id), 

509 UNIQUE (data) ON CONFLICT IGNORE 

510 ) 

511 

512To apply the FAIL algorithm for a NOT NULL constraint, 

513``sqlite_on_conflict_not_null`` is used:: 

514 

515 some_table = Table( 

516 "some_table", 

517 metadata, 

518 Column("id", Integer, primary_key=True), 

519 Column( 

520 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL" 

521 ), 

522 ) 

523 

524this renders the column inline ON CONFLICT phrase: 

525 

526.. sourcecode:: sql 

527 

528 CREATE TABLE some_table ( 

529 id INTEGER NOT NULL, 

530 data INTEGER NOT NULL ON CONFLICT FAIL, 

531 PRIMARY KEY (id) 

532 ) 

533 

534 

535Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: 

536 

537 some_table = Table( 

538 "some_table", 

539 metadata, 

540 Column( 

541 "id", 

542 Integer, 

543 primary_key=True, 

544 sqlite_on_conflict_primary_key="FAIL", 

545 ), 

546 ) 

547 

548SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict 

549resolution algorithm is applied to the constraint itself: 

550 

551.. sourcecode:: sql 

552 

553 CREATE TABLE some_table ( 

554 id INTEGER NOT NULL, 

555 PRIMARY KEY (id) ON CONFLICT FAIL 

556 ) 

557 

558.. _sqlite_on_conflict_insert: 

559 

560INSERT...ON CONFLICT (Upsert) 

561----------------------------- 

562 

563.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for 

564 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as 

565 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. 

566 

567From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) 

568of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` 

569statement. A candidate row will only be inserted if that row does not violate 

570any unique or primary key constraints. In the case of a unique constraint violation, a 

571secondary action can occur which can be either "DO UPDATE", indicating that 

572the data in the target row should be updated, or "DO NOTHING", which indicates 

573to silently skip this row. 

574 

575Conflicts are determined using columns that are part of existing unique 

576constraints and indexes. These constraints are identified by stating the 

577columns and conditions that comprise the indexes. 

578 

579SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific 

580:func:`_sqlite.insert()` function, which provides 

581the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` 

582and :meth:`_sqlite.Insert.on_conflict_do_nothing`: 

583 

584.. sourcecode:: pycon+sql 

585 

586 >>> from sqlalchemy.dialects.sqlite import insert 

587 

588 >>> insert_stmt = insert(my_table).values( 

589 ... id="some_existing_id", data="inserted value" 

590 ... ) 

591 

592 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

593 ... index_elements=["id"], set_=dict(data="updated value") 

594 ... ) 

595 

596 >>> print(do_update_stmt) 

597 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

598 ON CONFLICT (id) DO UPDATE SET data = ?{stop} 

599 

600 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

601 

602 >>> print(do_nothing_stmt) 

603 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

604 ON CONFLICT (id) DO NOTHING 

605 

606.. versionadded:: 1.4 

607 

608.. seealso:: 

609 

610 `Upsert 

611 <https://sqlite.org/lang_UPSERT.html>`_ 

612 - in the SQLite documentation. 

613 

614 

615Specifying the Target 

616^^^^^^^^^^^^^^^^^^^^^ 

617 

618Both methods supply the "target" of the conflict using column inference: 

619 

620* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument 

621 specifies a sequence containing string column names, :class:`_schema.Column` 

622 objects, and/or SQL expression elements, which would identify a unique index 

623 or unique constraint. 

624 

625* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` 

626 to infer an index, a partial index can be inferred by also specifying the 

627 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: 

628 

629 .. sourcecode:: pycon+sql 

630 

631 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

632 

633 >>> do_update_stmt = stmt.on_conflict_do_update( 

634 ... index_elements=[my_table.c.user_email], 

635 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

636 ... set_=dict(data=stmt.excluded.data), 

637 ... ) 

638 

639 >>> print(do_update_stmt) 

640 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) 

641 ON CONFLICT (user_email) 

642 WHERE user_email LIKE '%@gmail.com' 

643 DO UPDATE SET data = excluded.data 

644 

645The SET Clause 

646^^^^^^^^^^^^^^^ 

647 

648``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

649existing row, using any combination of new values as well as values 

650from the proposed insertion. These values are specified using the 

651:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This 

652parameter accepts a dictionary which consists of direct values 

653for UPDATE: 

654 

655.. sourcecode:: pycon+sql 

656 

657 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

658 

659 >>> do_update_stmt = stmt.on_conflict_do_update( 

660 ... index_elements=["id"], set_=dict(data="updated value") 

661 ... ) 

662 

663 >>> print(do_update_stmt) 

664 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

665 ON CONFLICT (id) DO UPDATE SET data = ? 

666 

667.. warning:: 

668 

669 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take 

670 into account Python-side default UPDATE values or generation functions, 

671 e.g. those specified using :paramref:`_schema.Column.onupdate`. These 

672 values will not be exercised for an ON CONFLICT style of UPDATE, unless 

673 they are manually specified in the 

674 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. 

675 

676Updating using the Excluded INSERT Values 

677^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

678 

679In order to refer to the proposed insertion row, the special alias 

680:attr:`~.sqlite.Insert.excluded` is available as an attribute on 

681the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix 

682on a column, that informs the DO UPDATE to update the row with the value that 

683would have been inserted had the constraint not failed: 

684 

685.. sourcecode:: pycon+sql 

686 

687 >>> stmt = insert(my_table).values( 

688 ... id="some_id", data="inserted value", author="jlh" 

689 ... ) 

690 

691 >>> do_update_stmt = stmt.on_conflict_do_update( 

692 ... index_elements=["id"], 

693 ... set_=dict(data="updated value", author=stmt.excluded.author), 

694 ... ) 

695 

696 >>> print(do_update_stmt) 

697 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

698 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

699 

700Additional WHERE Criteria 

701^^^^^^^^^^^^^^^^^^^^^^^^^ 

702 

703The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts 

704a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` 

705parameter, which will limit those rows which receive an UPDATE: 

706 

707.. sourcecode:: pycon+sql 

708 

709 >>> stmt = insert(my_table).values( 

710 ... id="some_id", data="inserted value", author="jlh" 

711 ... ) 

712 

713 >>> on_update_stmt = stmt.on_conflict_do_update( 

714 ... index_elements=["id"], 

715 ... set_=dict(data="updated value", author=stmt.excluded.author), 

716 ... where=(my_table.c.status == 2), 

717 ... ) 

718 >>> print(on_update_stmt) 

719 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

720 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

721 WHERE my_table.status = ? 

722 

723 

724Skipping Rows with DO NOTHING 

725^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

726 

727``ON CONFLICT`` may be used to skip inserting a row entirely 

728if any conflict with a unique constraint occurs; below this is illustrated 

729using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: 

730 

731.. sourcecode:: pycon+sql 

732 

733 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

734 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

735 >>> print(stmt) 

736 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING 

737 

738 

739If ``DO NOTHING`` is used without specifying any columns or constraint, 

740it has the effect of skipping the INSERT for any unique violation which 

741occurs: 

742 

743.. sourcecode:: pycon+sql 

744 

745 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

746 >>> stmt = stmt.on_conflict_do_nothing() 

747 >>> print(stmt) 

748 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING 

749 

750.. _sqlite_type_reflection: 

751 

752Type Reflection 

753--------------- 

754 

755SQLite types are unlike those of most other database backends, in that 

756the string name of the type usually does not correspond to a "type" in a 

757one-to-one fashion. Instead, SQLite links per-column typing behavior 

758to one of five so-called "type affinities" based on a string matching 

759pattern for the type. 

760 

761SQLAlchemy's reflection process, when inspecting types, uses a simple 

762lookup table to link the keywords returned to provided SQLAlchemy types. 

763This lookup table is present within the SQLite dialect as it is for all 

764other dialects. However, the SQLite dialect has a different "fallback" 

765routine for when a particular type name is not located in the lookup map; 

766it instead implements the SQLite "type affinity" scheme located at 

767https://www.sqlite.org/datatype3.html section 2.1. 

768 

769The provided typemap will make direct associations from an exact string 

770name match for the following types: 

771 

772:class:`_types.BIGINT`, :class:`_types.BLOB`, 

773:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, 

774:class:`_types.CHAR`, :class:`_types.DATE`, 

775:class:`_types.DATETIME`, :class:`_types.FLOAT`, 

776:class:`_types.DECIMAL`, :class:`_types.FLOAT`, 

777:class:`_types.INTEGER`, :class:`_types.INTEGER`, 

778:class:`_types.NUMERIC`, :class:`_types.REAL`, 

779:class:`_types.SMALLINT`, :class:`_types.TEXT`, 

780:class:`_types.TIME`, :class:`_types.TIMESTAMP`, 

781:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, 

782:class:`_types.NCHAR` 

783 

784When a type name does not match one of the above types, the "type affinity" 

785lookup is used instead: 

786 

787* :class:`_types.INTEGER` is returned if the type name includes the 

788 string ``INT`` 

789* :class:`_types.TEXT` is returned if the type name includes the 

790 string ``CHAR``, ``CLOB`` or ``TEXT`` 

791* :class:`_types.NullType` is returned if the type name includes the 

792 string ``BLOB`` 

793* :class:`_types.REAL` is returned if the type name includes the string 

794 ``REAL``, ``FLOA`` or ``DOUB``. 

795* Otherwise, the :class:`_types.NUMERIC` type is used. 

796 

797.. _sqlite_partial_index: 

798 

799Partial Indexes 

800--------------- 

801 

802A partial index, e.g. one which uses a WHERE clause, can be specified 

803with the DDL system using the argument ``sqlite_where``:: 

804 

805 tbl = Table("testtbl", m, Column("data", Integer)) 

806 idx = Index( 

807 "test_idx1", 

808 tbl.c.data, 

809 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10), 

810 ) 

811 

812The index will be rendered at create time as: 

813 

814.. sourcecode:: sql 

815 

816 CREATE INDEX test_idx1 ON testtbl (data) 

817 WHERE data > 5 AND data < 10 

818 

819.. _sqlite_dotted_column_names: 

820 

821Dotted Column Names 

822------------------- 

823 

824Using table or column names that explicitly have periods in them is 

825**not recommended**. While this is generally a bad idea for relational 

826databases in general, as the dot is a syntactically significant character, 

827the SQLite driver up until version **3.10.0** of SQLite has a bug which 

828requires that SQLAlchemy filter out these dots in result sets. 

829 

830The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: 

831 

832 import sqlite3 

833 

834 assert sqlite3.sqlite_version_info < ( 

835 3, 

836 10, 

837 0, 

838 ), "bug is fixed in this version" 

839 

840 conn = sqlite3.connect(":memory:") 

841 cursor = conn.cursor() 

842 

843 cursor.execute("create table x (a integer, b integer)") 

844 cursor.execute("insert into x (a, b) values (1, 1)") 

845 cursor.execute("insert into x (a, b) values (2, 2)") 

846 

847 cursor.execute("select x.a, x.b from x") 

848 assert [c[0] for c in cursor.description] == ["a", "b"] 

849 

850 cursor.execute( 

851 """ 

852 select x.a, x.b from x where a=1 

853 union 

854 select x.a, x.b from x where a=2 

855 """ 

856 ) 

857 assert [c[0] for c in cursor.description] == ["a", "b"], [ 

858 c[0] for c in cursor.description 

859 ] 

860 

861The second assertion fails: 

862 

863.. sourcecode:: text 

864 

865 Traceback (most recent call last): 

866 File "test.py", line 19, in <module> 

867 [c[0] for c in cursor.description] 

868 AssertionError: ['x.a', 'x.b'] 

869 

870Where above, the driver incorrectly reports the names of the columns 

871including the name of the table, which is entirely inconsistent vs. 

872when the UNION is not present. 

873 

874SQLAlchemy relies upon column names being predictable in how they match 

875to the original statement, so the SQLAlchemy dialect has no choice but 

876to filter these out:: 

877 

878 

879 from sqlalchemy import create_engine 

880 

881 eng = create_engine("sqlite://") 

882 conn = eng.connect() 

883 

884 conn.exec_driver_sql("create table x (a integer, b integer)") 

885 conn.exec_driver_sql("insert into x (a, b) values (1, 1)") 

886 conn.exec_driver_sql("insert into x (a, b) values (2, 2)") 

887 

888 result = conn.exec_driver_sql("select x.a, x.b from x") 

889 assert result.keys() == ["a", "b"] 

890 

891 result = conn.exec_driver_sql( 

892 """ 

893 select x.a, x.b from x where a=1 

894 union 

895 select x.a, x.b from x where a=2 

896 """ 

897 ) 

898 assert result.keys() == ["a", "b"] 

899 

900Note that above, even though SQLAlchemy filters out the dots, *both 

901names are still addressable*:: 

902 

903 >>> row = result.first() 

904 >>> row["a"] 

905 1 

906 >>> row["x.a"] 

907 1 

908 >>> row["b"] 

909 1 

910 >>> row["x.b"] 

911 1 

912 

913Therefore, the workaround applied by SQLAlchemy only impacts 

914:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In 

915the very specific case where an application is forced to use column names that 

916contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and 

917:meth:`.Row.keys()` is required to return these dotted names unmodified, 

918the ``sqlite_raw_colnames`` execution option may be provided, either on a 

919per-:class:`_engine.Connection` basis:: 

920 

921 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql( 

922 """ 

923 select x.a, x.b from x where a=1 

924 union 

925 select x.a, x.b from x where a=2 

926 """ 

927 ) 

928 assert result.keys() == ["x.a", "x.b"] 

929 

930or on a per-:class:`_engine.Engine` basis:: 

931 

932 engine = create_engine( 

933 "sqlite://", execution_options={"sqlite_raw_colnames": True} 

934 ) 

935 

936When using the per-:class:`_engine.Engine` execution option, note that 

937**Core and ORM queries that use UNION may not function properly**. 

938 

939SQLite-specific table options 

940----------------------------- 

941 

942One option for CREATE TABLE is supported directly by the SQLite 

943dialect in conjunction with the :class:`_schema.Table` construct: 

944 

945* ``WITHOUT ROWID``:: 

946 

947 Table("some_table", metadata, ..., sqlite_with_rowid=False) 

948 

949* 

950 ``STRICT``:: 

951 

952 Table("some_table", metadata, ..., sqlite_strict=True) 

953 

954 .. versionadded:: 2.0.37 

955 

956.. seealso:: 

957 

958 `SQLite CREATE TABLE options 

959 <https://www.sqlite.org/lang_createtable.html>`_ 

960 

961.. _sqlite_include_internal: 

962 

963Reflecting internal schema tables 

964---------------------------------- 

965 

966Reflection methods that return lists of tables will omit so-called 

967"SQLite internal schema object" names, which are considered by SQLite 

968as any object name that is prefixed with ``sqlite_``. An example of 

969such an object is the ``sqlite_sequence`` table that's generated when 

970the ``AUTOINCREMENT`` column parameter is used. In order to return 

971these objects, the parameter ``sqlite_include_internal=True`` may be 

972passed to methods such as :meth:`_schema.MetaData.reflect` or 

973:meth:`.Inspector.get_table_names`. 

974 

975.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter. 

976 Previously, these tables were not ignored by SQLAlchemy reflection 

977 methods. 

978 

979.. note:: 

980 

981 The ``sqlite_include_internal`` parameter does not refer to the 

982 "system" tables that are present in schemas such as ``sqlite_master``. 

983 

984.. seealso:: 

985 

986 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite 

987 documentation. 

988 

989''' # noqa 

990from __future__ import annotations 

991 

992import datetime 

993import numbers 

994import re 

995from typing import Any 

996from typing import Callable 

997from typing import Optional 

998from typing import TYPE_CHECKING 

999 

1000from .json import JSON 

1001from .json import JSONIndexType 

1002from .json import JSONPathType 

1003from ... import exc 

1004from ... import schema as sa_schema 

1005from ... import sql 

1006from ... import text 

1007from ... import types as sqltypes 

1008from ... import util 

1009from ...engine import default 

1010from ...engine import processors 

1011from ...engine import reflection 

1012from ...engine.reflection import ReflectionDefaults 

1013from ...sql import coercions 

1014from ...sql import compiler 

1015from ...sql import elements 

1016from ...sql import roles 

1017from ...sql import schema 

1018from ...types import BLOB # noqa 

1019from ...types import BOOLEAN # noqa 

1020from ...types import CHAR # noqa 

1021from ...types import DECIMAL # noqa 

1022from ...types import FLOAT # noqa 

1023from ...types import INTEGER # noqa 

1024from ...types import NUMERIC # noqa 

1025from ...types import REAL # noqa 

1026from ...types import SMALLINT # noqa 

1027from ...types import TEXT # noqa 

1028from ...types import TIMESTAMP # noqa 

1029from ...types import VARCHAR # noqa 

1030 

1031if TYPE_CHECKING: 

1032 from ...engine.interfaces import DBAPIConnection 

1033 from ...engine.interfaces import Dialect 

1034 from ...engine.interfaces import IsolationLevel 

1035 from ...sql.type_api import _BindProcessorType 

1036 from ...sql.type_api import _ResultProcessorType 

1037 

1038 

1039class _SQliteJson(JSON): 

1040 def result_processor(self, dialect, coltype): 

1041 default_processor = super().result_processor(dialect, coltype) 

1042 

1043 def process(value): 

1044 try: 

1045 return default_processor(value) 

1046 except TypeError: 

1047 if isinstance(value, numbers.Number): 

1048 return value 

1049 else: 

1050 raise 

1051 

1052 return process 

1053 

1054 

1055class _DateTimeMixin: 

1056 _reg = None 

1057 _storage_format = None 

1058 

1059 def __init__(self, storage_format=None, regexp=None, **kw): 

1060 super().__init__(**kw) 

1061 if regexp is not None: 

1062 self._reg = re.compile(regexp) 

1063 if storage_format is not None: 

1064 self._storage_format = storage_format 

1065 

1066 @property 

1067 def format_is_text_affinity(self): 

1068 """return True if the storage format will automatically imply 

1069 a TEXT affinity. 

1070 

1071 If the storage format contains no non-numeric characters, 

1072 it will imply a NUMERIC storage format on SQLite; in this case, 

1073 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, 

1074 TIME_CHAR. 

1075 

1076 """ 

1077 spec = self._storage_format % { 

1078 "year": 0, 

1079 "month": 0, 

1080 "day": 0, 

1081 "hour": 0, 

1082 "minute": 0, 

1083 "second": 0, 

1084 "microsecond": 0, 

1085 } 

1086 return bool(re.search(r"[^0-9]", spec)) 

1087 

1088 def adapt(self, cls, **kw): 

1089 if issubclass(cls, _DateTimeMixin): 

1090 if self._storage_format: 

1091 kw["storage_format"] = self._storage_format 

1092 if self._reg: 

1093 kw["regexp"] = self._reg 

1094 return super().adapt(cls, **kw) 

1095 

1096 def literal_processor(self, dialect): 

1097 bp = self.bind_processor(dialect) 

1098 

1099 def process(value): 

1100 return "'%s'" % bp(value) 

1101 

1102 return process 

1103 

1104 

1105class DATETIME(_DateTimeMixin, sqltypes.DateTime): 

1106 r"""Represent a Python datetime object in SQLite using a string. 

1107 

1108 The default string storage format is:: 

1109 

1110 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1111 

1112 e.g.: 

1113 

1114 .. sourcecode:: text 

1115 

1116 2021-03-15 12:05:57.105542 

1117 

1118 The incoming storage format is by default parsed using the 

1119 Python ``datetime.fromisoformat()`` function. 

1120 

1121 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default 

1122 datetime string parsing. 

1123 

1124 The storage format can be customized to some degree using the 

1125 ``storage_format`` and ``regexp`` parameters, such as:: 

1126 

1127 import re 

1128 from sqlalchemy.dialects.sqlite import DATETIME 

1129 

1130 dt = DATETIME( 

1131 storage_format=( 

1132 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d" 

1133 ), 

1134 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)", 

1135 ) 

1136 

1137 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1138 from the datetime. Can't be specified together with ``storage_format`` 

1139 or ``regexp``. 

1140 

1141 :param storage_format: format string which will be applied to the dict 

1142 with keys year, month, day, hour, minute, second, and microsecond. 

1143 

1144 :param regexp: regular expression which will be applied to incoming result 

1145 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1146 strings. If the regexp contains named groups, the resulting match dict is 

1147 applied to the Python datetime() constructor as keyword arguments. 

1148 Otherwise, if positional groups are used, the datetime() constructor 

1149 is called with positional arguments via 

1150 ``*map(int, match_obj.groups(0))``. 

1151 

1152 """ # noqa 

1153 

1154 _storage_format = ( 

1155 "%(year)04d-%(month)02d-%(day)02d " 

1156 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1157 ) 

1158 

1159 def __init__(self, *args, **kwargs): 

1160 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1161 super().__init__(*args, **kwargs) 

1162 if truncate_microseconds: 

1163 assert "storage_format" not in kwargs, ( 

1164 "You can specify only " 

1165 "one of truncate_microseconds or storage_format." 

1166 ) 

1167 assert "regexp" not in kwargs, ( 

1168 "You can specify only one of " 

1169 "truncate_microseconds or regexp." 

1170 ) 

1171 self._storage_format = ( 

1172 "%(year)04d-%(month)02d-%(day)02d " 

1173 "%(hour)02d:%(minute)02d:%(second)02d" 

1174 ) 

1175 

1176 def bind_processor( 

1177 self, dialect: Dialect 

1178 ) -> Optional[_BindProcessorType[Any]]: 

1179 datetime_datetime = datetime.datetime 

1180 datetime_date = datetime.date 

1181 format_ = self._storage_format 

1182 

1183 def process(value): 

1184 if value is None: 

1185 return None 

1186 elif isinstance(value, datetime_datetime): 

1187 return format_ % { 

1188 "year": value.year, 

1189 "month": value.month, 

1190 "day": value.day, 

1191 "hour": value.hour, 

1192 "minute": value.minute, 

1193 "second": value.second, 

1194 "microsecond": value.microsecond, 

1195 } 

1196 elif isinstance(value, datetime_date): 

1197 return format_ % { 

1198 "year": value.year, 

1199 "month": value.month, 

1200 "day": value.day, 

1201 "hour": 0, 

1202 "minute": 0, 

1203 "second": 0, 

1204 "microsecond": 0, 

1205 } 

1206 else: 

1207 raise TypeError( 

1208 "SQLite DateTime type only accepts Python " 

1209 "datetime and date objects as input." 

1210 ) 

1211 

1212 return process 

1213 

1214 def result_processor( 

1215 self, dialect: Dialect, coltype: object 

1216 ) -> Optional[_ResultProcessorType[Any]]: 

1217 if self._reg: 

1218 return processors.str_to_datetime_processor_factory( 

1219 self._reg, datetime.datetime 

1220 ) 

1221 else: 

1222 return processors.str_to_datetime 

1223 

1224 

1225class DATE(_DateTimeMixin, sqltypes.Date): 

1226 r"""Represent a Python date object in SQLite using a string. 

1227 

1228 The default string storage format is:: 

1229 

1230 "%(year)04d-%(month)02d-%(day)02d" 

1231 

1232 e.g.: 

1233 

1234 .. sourcecode:: text 

1235 

1236 2011-03-15 

1237 

1238 The incoming storage format is by default parsed using the 

1239 Python ``date.fromisoformat()`` function. 

1240 

1241 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default 

1242 date string parsing. 

1243 

1244 

1245 The storage format can be customized to some degree using the 

1246 ``storage_format`` and ``regexp`` parameters, such as:: 

1247 

1248 import re 

1249 from sqlalchemy.dialects.sqlite import DATE 

1250 

1251 d = DATE( 

1252 storage_format="%(month)02d/%(day)02d/%(year)04d", 

1253 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"), 

1254 ) 

1255 

1256 :param storage_format: format string which will be applied to the 

1257 dict with keys year, month, and day. 

1258 

1259 :param regexp: regular expression which will be applied to 

1260 incoming result rows, replacing the use of ``date.fromisoformat()`` to 

1261 parse incoming strings. If the regexp contains named groups, the resulting 

1262 match dict is applied to the Python date() constructor as keyword 

1263 arguments. Otherwise, if positional groups are used, the date() 

1264 constructor is called with positional arguments via 

1265 ``*map(int, match_obj.groups(0))``. 

1266 

1267 """ 

1268 

1269 _storage_format = "%(year)04d-%(month)02d-%(day)02d" 

1270 

1271 def bind_processor( 

1272 self, dialect: Dialect 

1273 ) -> Optional[_BindProcessorType[Any]]: 

1274 datetime_date = datetime.date 

1275 format_ = self._storage_format 

1276 

1277 def process(value): 

1278 if value is None: 

1279 return None 

1280 elif isinstance(value, datetime_date): 

1281 return format_ % { 

1282 "year": value.year, 

1283 "month": value.month, 

1284 "day": value.day, 

1285 } 

1286 else: 

1287 raise TypeError( 

1288 "SQLite Date type only accepts Python " 

1289 "date objects as input." 

1290 ) 

1291 

1292 return process 

1293 

1294 def result_processor( 

1295 self, dialect: Dialect, coltype: object 

1296 ) -> Optional[_ResultProcessorType[Any]]: 

1297 if self._reg: 

1298 return processors.str_to_datetime_processor_factory( 

1299 self._reg, datetime.date 

1300 ) 

1301 else: 

1302 return processors.str_to_date 

1303 

1304 

1305class TIME(_DateTimeMixin, sqltypes.Time): 

1306 r"""Represent a Python time object in SQLite using a string. 

1307 

1308 The default string storage format is:: 

1309 

1310 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1311 

1312 e.g.: 

1313 

1314 .. sourcecode:: text 

1315 

1316 12:05:57.10558 

1317 

1318 The incoming storage format is by default parsed using the 

1319 Python ``time.fromisoformat()`` function. 

1320 

1321 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default 

1322 time string parsing. 

1323 

1324 The storage format can be customized to some degree using the 

1325 ``storage_format`` and ``regexp`` parameters, such as:: 

1326 

1327 import re 

1328 from sqlalchemy.dialects.sqlite import TIME 

1329 

1330 t = TIME( 

1331 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d", 

1332 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"), 

1333 ) 

1334 

1335 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1336 from the time. Can't be specified together with ``storage_format`` 

1337 or ``regexp``. 

1338 

1339 :param storage_format: format string which will be applied to the dict 

1340 with keys hour, minute, second, and microsecond. 

1341 

1342 :param regexp: regular expression which will be applied to incoming result 

1343 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1344 strings. If the regexp contains named groups, the resulting match dict is 

1345 applied to the Python time() constructor as keyword arguments. Otherwise, 

1346 if positional groups are used, the time() constructor is called with 

1347 positional arguments via ``*map(int, match_obj.groups(0))``. 

1348 

1349 """ 

1350 

1351 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1352 

1353 def __init__(self, *args, **kwargs): 

1354 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1355 super().__init__(*args, **kwargs) 

1356 if truncate_microseconds: 

1357 assert "storage_format" not in kwargs, ( 

1358 "You can specify only " 

1359 "one of truncate_microseconds or storage_format." 

1360 ) 

1361 assert "regexp" not in kwargs, ( 

1362 "You can specify only one of " 

1363 "truncate_microseconds or regexp." 

1364 ) 

1365 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" 

1366 

1367 def bind_processor(self, dialect): 

1368 datetime_time = datetime.time 

1369 format_ = self._storage_format 

1370 

1371 def process(value): 

1372 if value is None: 

1373 return None 

1374 elif isinstance(value, datetime_time): 

1375 return format_ % { 

1376 "hour": value.hour, 

1377 "minute": value.minute, 

1378 "second": value.second, 

1379 "microsecond": value.microsecond, 

1380 } 

1381 else: 

1382 raise TypeError( 

1383 "SQLite Time type only accepts Python " 

1384 "time objects as input." 

1385 ) 

1386 

1387 return process 

1388 

1389 def result_processor(self, dialect, coltype): 

1390 if self._reg: 

1391 return processors.str_to_datetime_processor_factory( 

1392 self._reg, datetime.time 

1393 ) 

1394 else: 

1395 return processors.str_to_time 

1396 

1397 

1398colspecs = { 

1399 sqltypes.Date: DATE, 

1400 sqltypes.DateTime: DATETIME, 

1401 sqltypes.JSON: _SQliteJson, 

1402 sqltypes.JSON.JSONIndexType: JSONIndexType, 

1403 sqltypes.JSON.JSONPathType: JSONPathType, 

1404 sqltypes.Time: TIME, 

1405} 

1406 

1407ischema_names = { 

1408 "BIGINT": sqltypes.BIGINT, 

1409 "BLOB": sqltypes.BLOB, 

1410 "BOOL": sqltypes.BOOLEAN, 

1411 "BOOLEAN": sqltypes.BOOLEAN, 

1412 "CHAR": sqltypes.CHAR, 

1413 "DATE": sqltypes.DATE, 

1414 "DATE_CHAR": sqltypes.DATE, 

1415 "DATETIME": sqltypes.DATETIME, 

1416 "DATETIME_CHAR": sqltypes.DATETIME, 

1417 "DOUBLE": sqltypes.DOUBLE, 

1418 "DECIMAL": sqltypes.DECIMAL, 

1419 "FLOAT": sqltypes.FLOAT, 

1420 "INT": sqltypes.INTEGER, 

1421 "INTEGER": sqltypes.INTEGER, 

1422 "JSON": JSON, 

1423 "NUMERIC": sqltypes.NUMERIC, 

1424 "REAL": sqltypes.REAL, 

1425 "SMALLINT": sqltypes.SMALLINT, 

1426 "TEXT": sqltypes.TEXT, 

1427 "TIME": sqltypes.TIME, 

1428 "TIME_CHAR": sqltypes.TIME, 

1429 "TIMESTAMP": sqltypes.TIMESTAMP, 

1430 "VARCHAR": sqltypes.VARCHAR, 

1431 "NVARCHAR": sqltypes.NVARCHAR, 

1432 "NCHAR": sqltypes.NCHAR, 

1433} 

1434 

1435 

1436class SQLiteCompiler(compiler.SQLCompiler): 

1437 extract_map = util.update_copy( 

1438 compiler.SQLCompiler.extract_map, 

1439 { 

1440 "month": "%m", 

1441 "day": "%d", 

1442 "year": "%Y", 

1443 "second": "%S", 

1444 "hour": "%H", 

1445 "doy": "%j", 

1446 "minute": "%M", 

1447 "epoch": "%s", 

1448 "dow": "%w", 

1449 "week": "%W", 

1450 }, 

1451 ) 

1452 

1453 def visit_truediv_binary(self, binary, operator, **kw): 

1454 return ( 

1455 self.process(binary.left, **kw) 

1456 + " / " 

1457 + "(%s + 0.0)" % self.process(binary.right, **kw) 

1458 ) 

1459 

1460 def visit_now_func(self, fn, **kw): 

1461 return "CURRENT_TIMESTAMP" 

1462 

1463 def visit_localtimestamp_func(self, func, **kw): 

1464 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')" 

1465 

1466 def visit_true(self, expr, **kw): 

1467 return "1" 

1468 

1469 def visit_false(self, expr, **kw): 

1470 return "0" 

1471 

1472 def visit_char_length_func(self, fn, **kw): 

1473 return "length%s" % self.function_argspec(fn) 

1474 

1475 def visit_aggregate_strings_func(self, fn, **kw): 

1476 return "group_concat%s" % self.function_argspec(fn) 

1477 

1478 def visit_cast(self, cast, **kwargs): 

1479 if self.dialect.supports_cast: 

1480 return super().visit_cast(cast, **kwargs) 

1481 else: 

1482 return self.process(cast.clause, **kwargs) 

1483 

1484 def visit_extract(self, extract, **kw): 

1485 try: 

1486 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( 

1487 self.extract_map[extract.field], 

1488 self.process(extract.expr, **kw), 

1489 ) 

1490 except KeyError as err: 

1491 raise exc.CompileError( 

1492 "%s is not a valid extract argument." % extract.field 

1493 ) from err 

1494 

1495 def returning_clause( 

1496 self, 

1497 stmt, 

1498 returning_cols, 

1499 *, 

1500 populate_result_map, 

1501 **kw, 

1502 ): 

1503 kw["include_table"] = False 

1504 return super().returning_clause( 

1505 stmt, returning_cols, populate_result_map=populate_result_map, **kw 

1506 ) 

1507 

1508 def limit_clause(self, select, **kw): 

1509 text = "" 

1510 if select._limit_clause is not None: 

1511 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

1512 if select._offset_clause is not None: 

1513 if select._limit_clause is None: 

1514 text += "\n LIMIT " + self.process(sql.literal(-1)) 

1515 text += " OFFSET " + self.process(select._offset_clause, **kw) 

1516 else: 

1517 text += " OFFSET " + self.process(sql.literal(0), **kw) 

1518 return text 

1519 

1520 def for_update_clause(self, select, **kw): 

1521 # sqlite has no "FOR UPDATE" AFAICT 

1522 return "" 

1523 

1524 def update_from_clause( 

1525 self, update_stmt, from_table, extra_froms, from_hints, **kw 

1526 ): 

1527 kw["asfrom"] = True 

1528 return "FROM " + ", ".join( 

1529 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

1530 for t in extra_froms 

1531 ) 

1532 

1533 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1534 return "%s IS NOT %s" % ( 

1535 self.process(binary.left), 

1536 self.process(binary.right), 

1537 ) 

1538 

1539 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1540 return "%s IS %s" % ( 

1541 self.process(binary.left), 

1542 self.process(binary.right), 

1543 ) 

1544 

1545 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

1546 if binary.type._type_affinity is sqltypes.JSON: 

1547 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1548 else: 

1549 expr = "JSON_EXTRACT(%s, %s)" 

1550 

1551 return expr % ( 

1552 self.process(binary.left, **kw), 

1553 self.process(binary.right, **kw), 

1554 ) 

1555 

1556 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

1557 if binary.type._type_affinity is sqltypes.JSON: 

1558 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1559 else: 

1560 expr = "JSON_EXTRACT(%s, %s)" 

1561 

1562 return expr % ( 

1563 self.process(binary.left, **kw), 

1564 self.process(binary.right, **kw), 

1565 ) 

1566 

1567 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

1568 # slightly old SQLite versions don't seem to be able to handle 

1569 # the empty set impl 

1570 return self.visit_empty_set_expr(type_) 

1571 

1572 def visit_empty_set_expr(self, element_types, **kw): 

1573 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( 

1574 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1575 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1576 ) 

1577 

1578 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1579 return self._generate_generic_binary(binary, " REGEXP ", **kw) 

1580 

1581 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1582 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) 

1583 

1584 def _on_conflict_target(self, clause, **kw): 

1585 if clause.inferred_target_elements is not None: 

1586 target_text = "(%s)" % ", ".join( 

1587 ( 

1588 self.preparer.quote(c) 

1589 if isinstance(c, str) 

1590 else self.process(c, include_table=False, use_schema=False) 

1591 ) 

1592 for c in clause.inferred_target_elements 

1593 ) 

1594 if clause.inferred_target_whereclause is not None: 

1595 target_text += " WHERE %s" % self.process( 

1596 clause.inferred_target_whereclause, 

1597 include_table=False, 

1598 use_schema=False, 

1599 literal_execute=True, 

1600 ) 

1601 

1602 else: 

1603 target_text = "" 

1604 

1605 return target_text 

1606 

1607 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

1608 target_text = self._on_conflict_target(on_conflict, **kw) 

1609 

1610 if target_text: 

1611 return "ON CONFLICT %s DO NOTHING" % target_text 

1612 else: 

1613 return "ON CONFLICT DO NOTHING" 

1614 

1615 def visit_on_conflict_do_update(self, on_conflict, **kw): 

1616 clause = on_conflict 

1617 

1618 target_text = self._on_conflict_target(on_conflict, **kw) 

1619 

1620 action_set_ops = [] 

1621 

1622 set_parameters = dict(clause.update_values_to_set) 

1623 # create a list of column assignment clauses as tuples 

1624 

1625 insert_statement = self.stack[-1]["selectable"] 

1626 cols = insert_statement.table.c 

1627 for c in cols: 

1628 col_key = c.key 

1629 

1630 if col_key in set_parameters: 

1631 value = set_parameters.pop(col_key) 

1632 elif c in set_parameters: 

1633 value = set_parameters.pop(c) 

1634 else: 

1635 continue 

1636 

1637 if coercions._is_literal(value): 

1638 value = elements.BindParameter(None, value, type_=c.type) 

1639 

1640 else: 

1641 if ( 

1642 isinstance(value, elements.BindParameter) 

1643 and value.type._isnull 

1644 ): 

1645 value = value._clone() 

1646 value.type = c.type 

1647 value_text = self.process(value.self_group(), use_schema=False) 

1648 

1649 key_text = self.preparer.quote(c.name) 

1650 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1651 

1652 # check for names that don't match columns 

1653 if set_parameters: 

1654 util.warn( 

1655 "Additional column names not matching " 

1656 "any column keys in table '%s': %s" 

1657 % ( 

1658 self.current_executable.table.name, 

1659 (", ".join("'%s'" % c for c in set_parameters)), 

1660 ) 

1661 ) 

1662 for k, v in set_parameters.items(): 

1663 key_text = ( 

1664 self.preparer.quote(k) 

1665 if isinstance(k, str) 

1666 else self.process(k, use_schema=False) 

1667 ) 

1668 value_text = self.process( 

1669 coercions.expect(roles.ExpressionElementRole, v), 

1670 use_schema=False, 

1671 ) 

1672 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1673 

1674 action_text = ", ".join(action_set_ops) 

1675 if clause.update_whereclause is not None: 

1676 action_text += " WHERE %s" % self.process( 

1677 clause.update_whereclause, include_table=True, use_schema=False 

1678 ) 

1679 

1680 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

1681 

1682 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1683 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)". 

1684 kw["eager_grouping"] = True 

1685 or_ = self._generate_generic_binary(binary, " | ", **kw) 

1686 and_ = self._generate_generic_binary(binary, " & ", **kw) 

1687 return f"({or_} - {and_})" 

1688 

1689 

1690class SQLiteDDLCompiler(compiler.DDLCompiler): 

1691 def get_column_specification(self, column, **kwargs): 

1692 coltype = self.dialect.type_compiler_instance.process( 

1693 column.type, type_expression=column 

1694 ) 

1695 colspec = self.preparer.format_column(column) + " " + coltype 

1696 default = self.get_column_default_string(column) 

1697 if default is not None: 

1698 

1699 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match( 

1700 r".*\W.*", default 

1701 ): 

1702 colspec += f" DEFAULT ({default})" 

1703 else: 

1704 colspec += f" DEFAULT {default}" 

1705 

1706 if not column.nullable: 

1707 colspec += " NOT NULL" 

1708 

1709 on_conflict_clause = column.dialect_options["sqlite"][ 

1710 "on_conflict_not_null" 

1711 ] 

1712 if on_conflict_clause is not None: 

1713 colspec += " ON CONFLICT " + on_conflict_clause 

1714 

1715 if column.primary_key: 

1716 if ( 

1717 column.autoincrement is True 

1718 and len(column.table.primary_key.columns) != 1 

1719 ): 

1720 raise exc.CompileError( 

1721 "SQLite does not support autoincrement for " 

1722 "composite primary keys" 

1723 ) 

1724 

1725 if ( 

1726 column.table.dialect_options["sqlite"]["autoincrement"] 

1727 and len(column.table.primary_key.columns) == 1 

1728 and issubclass(column.type._type_affinity, sqltypes.Integer) 

1729 and not column.foreign_keys 

1730 ): 

1731 colspec += " PRIMARY KEY" 

1732 

1733 on_conflict_clause = column.dialect_options["sqlite"][ 

1734 "on_conflict_primary_key" 

1735 ] 

1736 if on_conflict_clause is not None: 

1737 colspec += " ON CONFLICT " + on_conflict_clause 

1738 

1739 colspec += " AUTOINCREMENT" 

1740 

1741 if column.computed is not None: 

1742 colspec += " " + self.process(column.computed) 

1743 

1744 return colspec 

1745 

1746 def visit_primary_key_constraint(self, constraint, **kw): 

1747 # for columns with sqlite_autoincrement=True, 

1748 # the PRIMARY KEY constraint can only be inline 

1749 # with the column itself. 

1750 if len(constraint.columns) == 1: 

1751 c = list(constraint)[0] 

1752 if ( 

1753 c.primary_key 

1754 and c.table.dialect_options["sqlite"]["autoincrement"] 

1755 and issubclass(c.type._type_affinity, sqltypes.Integer) 

1756 and not c.foreign_keys 

1757 ): 

1758 return None 

1759 

1760 text = super().visit_primary_key_constraint(constraint) 

1761 

1762 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1763 "on_conflict" 

1764 ] 

1765 if on_conflict_clause is None and len(constraint.columns) == 1: 

1766 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ 

1767 "on_conflict_primary_key" 

1768 ] 

1769 

1770 if on_conflict_clause is not None: 

1771 text += " ON CONFLICT " + on_conflict_clause 

1772 

1773 return text 

1774 

1775 def visit_unique_constraint(self, constraint, **kw): 

1776 text = super().visit_unique_constraint(constraint) 

1777 

1778 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1779 "on_conflict" 

1780 ] 

1781 if on_conflict_clause is None and len(constraint.columns) == 1: 

1782 col1 = list(constraint)[0] 

1783 if isinstance(col1, schema.SchemaItem): 

1784 on_conflict_clause = list(constraint)[0].dialect_options[ 

1785 "sqlite" 

1786 ]["on_conflict_unique"] 

1787 

1788 if on_conflict_clause is not None: 

1789 text += " ON CONFLICT " + on_conflict_clause 

1790 

1791 return text 

1792 

1793 def visit_check_constraint(self, constraint, **kw): 

1794 text = super().visit_check_constraint(constraint) 

1795 

1796 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1797 "on_conflict" 

1798 ] 

1799 

1800 if on_conflict_clause is not None: 

1801 text += " ON CONFLICT " + on_conflict_clause 

1802 

1803 return text 

1804 

1805 def visit_column_check_constraint(self, constraint, **kw): 

1806 text = super().visit_column_check_constraint(constraint) 

1807 

1808 if constraint.dialect_options["sqlite"]["on_conflict"] is not None: 

1809 raise exc.CompileError( 

1810 "SQLite does not support on conflict clause for " 

1811 "column check constraint" 

1812 ) 

1813 

1814 return text 

1815 

1816 def visit_foreign_key_constraint(self, constraint, **kw): 

1817 local_table = constraint.elements[0].parent.table 

1818 remote_table = constraint.elements[0].column.table 

1819 

1820 if local_table.schema != remote_table.schema: 

1821 return None 

1822 else: 

1823 return super().visit_foreign_key_constraint(constraint) 

1824 

1825 def define_constraint_remote_table(self, constraint, table, preparer): 

1826 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

1827 

1828 return preparer.format_table(table, use_schema=False) 

1829 

1830 def visit_create_index( 

1831 self, create, include_schema=False, include_table_schema=True, **kw 

1832 ): 

1833 index = create.element 

1834 self._verify_index_table(index) 

1835 preparer = self.preparer 

1836 text = "CREATE " 

1837 if index.unique: 

1838 text += "UNIQUE " 

1839 

1840 text += "INDEX " 

1841 

1842 if create.if_not_exists: 

1843 text += "IF NOT EXISTS " 

1844 

1845 text += "%s ON %s (%s)" % ( 

1846 self._prepared_index_name(index, include_schema=True), 

1847 preparer.format_table(index.table, use_schema=False), 

1848 ", ".join( 

1849 self.sql_compiler.process( 

1850 expr, include_table=False, literal_binds=True 

1851 ) 

1852 for expr in index.expressions 

1853 ), 

1854 ) 

1855 

1856 whereclause = index.dialect_options["sqlite"]["where"] 

1857 if whereclause is not None: 

1858 where_compiled = self.sql_compiler.process( 

1859 whereclause, include_table=False, literal_binds=True 

1860 ) 

1861 text += " WHERE " + where_compiled 

1862 

1863 return text 

1864 

1865 def post_create_table(self, table): 

1866 table_options = [] 

1867 

1868 if not table.dialect_options["sqlite"]["with_rowid"]: 

1869 table_options.append("WITHOUT ROWID") 

1870 

1871 if table.dialect_options["sqlite"]["strict"]: 

1872 table_options.append("STRICT") 

1873 

1874 if table_options: 

1875 return "\n " + ",\n ".join(table_options) 

1876 else: 

1877 return "" 

1878 

1879 

1880class SQLiteTypeCompiler(compiler.GenericTypeCompiler): 

1881 def visit_large_binary(self, type_, **kw): 

1882 return self.visit_BLOB(type_) 

1883 

1884 def visit_DATETIME(self, type_, **kw): 

1885 if ( 

1886 not isinstance(type_, _DateTimeMixin) 

1887 or type_.format_is_text_affinity 

1888 ): 

1889 return super().visit_DATETIME(type_) 

1890 else: 

1891 return "DATETIME_CHAR" 

1892 

1893 def visit_DATE(self, type_, **kw): 

1894 if ( 

1895 not isinstance(type_, _DateTimeMixin) 

1896 or type_.format_is_text_affinity 

1897 ): 

1898 return super().visit_DATE(type_) 

1899 else: 

1900 return "DATE_CHAR" 

1901 

1902 def visit_TIME(self, type_, **kw): 

1903 if ( 

1904 not isinstance(type_, _DateTimeMixin) 

1905 or type_.format_is_text_affinity 

1906 ): 

1907 return super().visit_TIME(type_) 

1908 else: 

1909 return "TIME_CHAR" 

1910 

1911 def visit_JSON(self, type_, **kw): 

1912 # note this name provides NUMERIC affinity, not TEXT. 

1913 # should not be an issue unless the JSON value consists of a single 

1914 # numeric value. JSONTEXT can be used if this case is required. 

1915 return "JSON" 

1916 

1917 

1918class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 

1919 reserved_words = { 

1920 "add", 

1921 "after", 

1922 "all", 

1923 "alter", 

1924 "analyze", 

1925 "and", 

1926 "as", 

1927 "asc", 

1928 "attach", 

1929 "autoincrement", 

1930 "before", 

1931 "begin", 

1932 "between", 

1933 "by", 

1934 "cascade", 

1935 "case", 

1936 "cast", 

1937 "check", 

1938 "collate", 

1939 "column", 

1940 "commit", 

1941 "conflict", 

1942 "constraint", 

1943 "create", 

1944 "cross", 

1945 "current_date", 

1946 "current_time", 

1947 "current_timestamp", 

1948 "database", 

1949 "default", 

1950 "deferrable", 

1951 "deferred", 

1952 "delete", 

1953 "desc", 

1954 "detach", 

1955 "distinct", 

1956 "drop", 

1957 "each", 

1958 "else", 

1959 "end", 

1960 "escape", 

1961 "except", 

1962 "exclusive", 

1963 "exists", 

1964 "explain", 

1965 "false", 

1966 "fail", 

1967 "for", 

1968 "foreign", 

1969 "from", 

1970 "full", 

1971 "glob", 

1972 "group", 

1973 "having", 

1974 "if", 

1975 "ignore", 

1976 "immediate", 

1977 "in", 

1978 "index", 

1979 "indexed", 

1980 "initially", 

1981 "inner", 

1982 "insert", 

1983 "instead", 

1984 "intersect", 

1985 "into", 

1986 "is", 

1987 "isnull", 

1988 "join", 

1989 "key", 

1990 "left", 

1991 "like", 

1992 "limit", 

1993 "match", 

1994 "natural", 

1995 "not", 

1996 "notnull", 

1997 "null", 

1998 "of", 

1999 "offset", 

2000 "on", 

2001 "or", 

2002 "order", 

2003 "outer", 

2004 "plan", 

2005 "pragma", 

2006 "primary", 

2007 "query", 

2008 "raise", 

2009 "references", 

2010 "reindex", 

2011 "rename", 

2012 "replace", 

2013 "restrict", 

2014 "right", 

2015 "rollback", 

2016 "row", 

2017 "select", 

2018 "set", 

2019 "table", 

2020 "temp", 

2021 "temporary", 

2022 "then", 

2023 "to", 

2024 "transaction", 

2025 "trigger", 

2026 "true", 

2027 "union", 

2028 "unique", 

2029 "update", 

2030 "using", 

2031 "vacuum", 

2032 "values", 

2033 "view", 

2034 "virtual", 

2035 "when", 

2036 "where", 

2037 } 

2038 

2039 

2040class SQLiteExecutionContext(default.DefaultExecutionContext): 

2041 @util.memoized_property 

2042 def _preserve_raw_colnames(self): 

2043 return ( 

2044 not self.dialect._broken_dotted_colnames 

2045 or self.execution_options.get("sqlite_raw_colnames", False) 

2046 ) 

2047 

2048 def _translate_colname(self, colname): 

2049 # TODO: detect SQLite version 3.10.0 or greater; 

2050 # see [ticket:3633] 

2051 

2052 # adjust for dotted column names. SQLite 

2053 # in the case of UNION may store col names as 

2054 # "tablename.colname", or if using an attached database, 

2055 # "database.tablename.colname", in cursor.description 

2056 if not self._preserve_raw_colnames and "." in colname: 

2057 return colname.split(".")[-1], colname 

2058 else: 

2059 return colname, None 

2060 

2061 

2062class SQLiteDialect(default.DefaultDialect): 

2063 name = "sqlite" 

2064 supports_alter = False 

2065 

2066 # SQlite supports "DEFAULT VALUES" but *does not* support 

2067 # "VALUES (DEFAULT)" 

2068 supports_default_values = True 

2069 supports_default_metavalue = False 

2070 

2071 # sqlite issue: 

2072 # https://github.com/python/cpython/issues/93421 

2073 # note this parameter is no longer used by the ORM or default dialect 

2074 # see #9414 

2075 supports_sane_rowcount_returning = False 

2076 

2077 supports_empty_insert = False 

2078 supports_cast = True 

2079 supports_multivalues_insert = True 

2080 use_insertmanyvalues = True 

2081 tuple_in_values = True 

2082 supports_statement_cache = True 

2083 insert_null_pk_still_autoincrements = True 

2084 insert_returning = True 

2085 update_returning = True 

2086 update_returning_multifrom = True 

2087 delete_returning = True 

2088 update_returning_multifrom = True 

2089 

2090 supports_default_metavalue = True 

2091 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

2092 

2093 default_metavalue_token = "NULL" 

2094 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

2095 parenthesis.""" 

2096 

2097 default_paramstyle = "qmark" 

2098 execution_ctx_cls = SQLiteExecutionContext 

2099 statement_compiler = SQLiteCompiler 

2100 ddl_compiler = SQLiteDDLCompiler 

2101 type_compiler_cls = SQLiteTypeCompiler 

2102 preparer = SQLiteIdentifierPreparer 

2103 ischema_names = ischema_names 

2104 colspecs = colspecs 

2105 

2106 construct_arguments = [ 

2107 ( 

2108 sa_schema.Table, 

2109 { 

2110 "autoincrement": False, 

2111 "with_rowid": True, 

2112 "strict": False, 

2113 }, 

2114 ), 

2115 (sa_schema.Index, {"where": None}), 

2116 ( 

2117 sa_schema.Column, 

2118 { 

2119 "on_conflict_primary_key": None, 

2120 "on_conflict_not_null": None, 

2121 "on_conflict_unique": None, 

2122 }, 

2123 ), 

2124 (sa_schema.Constraint, {"on_conflict": None}), 

2125 ] 

2126 

2127 _broken_fk_pragma_quotes = False 

2128 _broken_dotted_colnames = False 

2129 

2130 @util.deprecated_params( 

2131 _json_serializer=( 

2132 "1.3.7", 

2133 "The _json_serializer argument to the SQLite dialect has " 

2134 "been renamed to the correct name of json_serializer. The old " 

2135 "argument name will be removed in a future release.", 

2136 ), 

2137 _json_deserializer=( 

2138 "1.3.7", 

2139 "The _json_deserializer argument to the SQLite dialect has " 

2140 "been renamed to the correct name of json_deserializer. The old " 

2141 "argument name will be removed in a future release.", 

2142 ), 

2143 ) 

2144 def __init__( 

2145 self, 

2146 native_datetime: bool = False, 

2147 json_serializer: Optional[Callable[..., Any]] = None, 

2148 json_deserializer: Optional[Callable[..., Any]] = None, 

2149 _json_serializer: Optional[Callable[..., Any]] = None, 

2150 _json_deserializer: Optional[Callable[..., Any]] = None, 

2151 **kwargs: Any, 

2152 ) -> None: 

2153 default.DefaultDialect.__init__(self, **kwargs) 

2154 

2155 if _json_serializer: 

2156 json_serializer = _json_serializer 

2157 if _json_deserializer: 

2158 json_deserializer = _json_deserializer 

2159 self._json_serializer = json_serializer 

2160 self._json_deserializer = json_deserializer 

2161 

2162 # this flag used by pysqlite dialect, and perhaps others in the 

2163 # future, to indicate the driver is handling date/timestamp 

2164 # conversions (and perhaps datetime/time as well on some hypothetical 

2165 # driver ?) 

2166 self.native_datetime = native_datetime 

2167 

2168 if self.dbapi is not None: 

2169 if self.dbapi.sqlite_version_info < (3, 7, 16): 

2170 util.warn( 

2171 "SQLite version %s is older than 3.7.16, and will not " 

2172 "support right nested joins, as are sometimes used in " 

2173 "more complex ORM scenarios. SQLAlchemy 1.4 and above " 

2174 "no longer tries to rewrite these joins." 

2175 % (self.dbapi.sqlite_version_info,) 

2176 ) 

2177 

2178 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These 

2179 # version checks are getting very stale. 

2180 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( 

2181 3, 

2182 10, 

2183 0, 

2184 ) 

2185 self.supports_default_values = self.dbapi.sqlite_version_info >= ( 

2186 3, 

2187 3, 

2188 8, 

2189 ) 

2190 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) 

2191 self.supports_multivalues_insert = ( 

2192 # https://www.sqlite.org/releaselog/3_7_11.html 

2193 self.dbapi.sqlite_version_info 

2194 >= (3, 7, 11) 

2195 ) 

2196 # see https://www.sqlalchemy.org/trac/ticket/2568 

2197 # as well as https://www.sqlite.org/src/info/600482d161 

2198 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( 

2199 3, 

2200 6, 

2201 14, 

2202 ) 

2203 

2204 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: 

2205 self.update_returning = self.delete_returning = ( 

2206 self.insert_returning 

2207 ) = False 

2208 

2209 if self.dbapi.sqlite_version_info < (3, 32, 0): 

2210 # https://www.sqlite.org/limits.html 

2211 self.insertmanyvalues_max_parameters = 999 

2212 

2213 _isolation_lookup = util.immutabledict( 

2214 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} 

2215 ) 

2216 

2217 def get_isolation_level_values(self, dbapi_connection): 

2218 return list(self._isolation_lookup) 

2219 

2220 def set_isolation_level( 

2221 self, dbapi_connection: DBAPIConnection, level: IsolationLevel 

2222 ) -> None: 

2223 isolation_level = self._isolation_lookup[level] 

2224 

2225 cursor = dbapi_connection.cursor() 

2226 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") 

2227 cursor.close() 

2228 

2229 def get_isolation_level(self, dbapi_connection): 

2230 cursor = dbapi_connection.cursor() 

2231 cursor.execute("PRAGMA read_uncommitted") 

2232 res = cursor.fetchone() 

2233 if res: 

2234 value = res[0] 

2235 else: 

2236 # https://www.sqlite.org/changes.html#version_3_3_3 

2237 # "Optional READ UNCOMMITTED isolation (instead of the 

2238 # default isolation level of SERIALIZABLE) and 

2239 # table level locking when database connections 

2240 # share a common cache."" 

2241 # pre-SQLite 3.3.0 default to 0 

2242 value = 0 

2243 cursor.close() 

2244 if value == 0: 

2245 return "SERIALIZABLE" 

2246 elif value == 1: 

2247 return "READ UNCOMMITTED" 

2248 else: 

2249 assert False, "Unknown isolation level %s" % value 

2250 

2251 @reflection.cache 

2252 def get_schema_names(self, connection, **kw): 

2253 s = "PRAGMA database_list" 

2254 dl = connection.exec_driver_sql(s) 

2255 

2256 return [db[1] for db in dl if db[1] != "temp"] 

2257 

2258 def _format_schema(self, schema, table_name): 

2259 if schema is not None: 

2260 qschema = self.identifier_preparer.quote_identifier(schema) 

2261 name = f"{qschema}.{table_name}" 

2262 else: 

2263 name = table_name 

2264 return name 

2265 

2266 def _sqlite_main_query( 

2267 self, 

2268 table: str, 

2269 type_: str, 

2270 schema: Optional[str], 

2271 sqlite_include_internal: bool, 

2272 ): 

2273 main = self._format_schema(schema, table) 

2274 if not sqlite_include_internal: 

2275 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" 

2276 else: 

2277 filter_table = "" 

2278 query = ( 

2279 f"SELECT name FROM {main} " 

2280 f"WHERE type='{type_}'{filter_table} " 

2281 "ORDER BY name" 

2282 ) 

2283 return query 

2284 

2285 @reflection.cache 

2286 def get_table_names( 

2287 self, connection, schema=None, sqlite_include_internal=False, **kw 

2288 ): 

2289 query = self._sqlite_main_query( 

2290 "sqlite_master", "table", schema, sqlite_include_internal 

2291 ) 

2292 names = connection.exec_driver_sql(query).scalars().all() 

2293 return names 

2294 

2295 @reflection.cache 

2296 def get_temp_table_names( 

2297 self, connection, sqlite_include_internal=False, **kw 

2298 ): 

2299 query = self._sqlite_main_query( 

2300 "sqlite_temp_master", "table", None, sqlite_include_internal 

2301 ) 

2302 names = connection.exec_driver_sql(query).scalars().all() 

2303 return names 

2304 

2305 @reflection.cache 

2306 def get_temp_view_names( 

2307 self, connection, sqlite_include_internal=False, **kw 

2308 ): 

2309 query = self._sqlite_main_query( 

2310 "sqlite_temp_master", "view", None, sqlite_include_internal 

2311 ) 

2312 names = connection.exec_driver_sql(query).scalars().all() 

2313 return names 

2314 

2315 @reflection.cache 

2316 def has_table(self, connection, table_name, schema=None, **kw): 

2317 self._ensure_has_table_connection(connection) 

2318 

2319 if schema is not None and schema not in self.get_schema_names( 

2320 connection, **kw 

2321 ): 

2322 return False 

2323 

2324 info = self._get_table_pragma( 

2325 connection, "table_info", table_name, schema=schema 

2326 ) 

2327 return bool(info) 

2328 

2329 def _get_default_schema_name(self, connection): 

2330 return "main" 

2331 

2332 @reflection.cache 

2333 def get_view_names( 

2334 self, connection, schema=None, sqlite_include_internal=False, **kw 

2335 ): 

2336 query = self._sqlite_main_query( 

2337 "sqlite_master", "view", schema, sqlite_include_internal 

2338 ) 

2339 names = connection.exec_driver_sql(query).scalars().all() 

2340 return names 

2341 

2342 @reflection.cache 

2343 def get_view_definition(self, connection, view_name, schema=None, **kw): 

2344 if schema is not None: 

2345 qschema = self.identifier_preparer.quote_identifier(schema) 

2346 master = f"{qschema}.sqlite_master" 

2347 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( 

2348 master, 

2349 ) 

2350 rs = connection.exec_driver_sql(s, (view_name,)) 

2351 else: 

2352 try: 

2353 s = ( 

2354 "SELECT sql FROM " 

2355 " (SELECT * FROM sqlite_master UNION ALL " 

2356 " SELECT * FROM sqlite_temp_master) " 

2357 "WHERE name = ? " 

2358 "AND type='view'" 

2359 ) 

2360 rs = connection.exec_driver_sql(s, (view_name,)) 

2361 except exc.DBAPIError: 

2362 s = ( 

2363 "SELECT sql FROM sqlite_master WHERE name = ? " 

2364 "AND type='view'" 

2365 ) 

2366 rs = connection.exec_driver_sql(s, (view_name,)) 

2367 

2368 result = rs.fetchall() 

2369 if result: 

2370 return result[0].sql 

2371 else: 

2372 raise exc.NoSuchTableError( 

2373 f"{schema}.{view_name}" if schema else view_name 

2374 ) 

2375 

2376 @reflection.cache 

2377 def get_columns(self, connection, table_name, schema=None, **kw): 

2378 pragma = "table_info" 

2379 # computed columns are threaded as hidden, they require table_xinfo 

2380 if self.server_version_info >= (3, 31): 

2381 pragma = "table_xinfo" 

2382 info = self._get_table_pragma( 

2383 connection, pragma, table_name, schema=schema 

2384 ) 

2385 columns = [] 

2386 tablesql = None 

2387 for row in info: 

2388 name = row[1] 

2389 type_ = row[2].upper() 

2390 nullable = not row[3] 

2391 default = row[4] 

2392 primary_key = row[5] 

2393 hidden = row[6] if pragma == "table_xinfo" else 0 

2394 

2395 # hidden has value 0 for normal columns, 1 for hidden columns, 

2396 # 2 for computed virtual columns and 3 for computed stored columns 

2397 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b 

2398 if hidden == 1: 

2399 continue 

2400 

2401 generated = bool(hidden) 

2402 persisted = hidden == 3 

2403 

2404 if tablesql is None and generated: 

2405 tablesql = self._get_table_sql( 

2406 connection, table_name, schema, **kw 

2407 ) 

2408 # remove create table 

2409 match = re.match( 

2410 ( 

2411 r"create table .*?\((.*)\)" 

2412 r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$" 

2413 ), 

2414 tablesql.strip(), 

2415 re.DOTALL | re.IGNORECASE, 

2416 ) 

2417 assert match, f"create table not found in {tablesql}" 

2418 tablesql = match.group(1).strip() 

2419 

2420 columns.append( 

2421 self._get_column_info( 

2422 name, 

2423 type_, 

2424 nullable, 

2425 default, 

2426 primary_key, 

2427 generated, 

2428 persisted, 

2429 tablesql, 

2430 ) 

2431 ) 

2432 if columns: 

2433 return columns 

2434 elif not self.has_table(connection, table_name, schema): 

2435 raise exc.NoSuchTableError( 

2436 f"{schema}.{table_name}" if schema else table_name 

2437 ) 

2438 else: 

2439 return ReflectionDefaults.columns() 

2440 

2441 def _get_column_info( 

2442 self, 

2443 name, 

2444 type_, 

2445 nullable, 

2446 default, 

2447 primary_key, 

2448 generated, 

2449 persisted, 

2450 tablesql, 

2451 ): 

2452 if generated: 

2453 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" 

2454 # somehow is "INTEGER GENERATED ALWAYS" 

2455 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) 

2456 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() 

2457 

2458 coltype = self._resolve_type_affinity(type_) 

2459 

2460 if default is not None: 

2461 default = str(default) 

2462 

2463 colspec = { 

2464 "name": name, 

2465 "type": coltype, 

2466 "nullable": nullable, 

2467 "default": default, 

2468 "primary_key": primary_key, 

2469 } 

2470 if generated: 

2471 sqltext = "" 

2472 if tablesql: 

2473 pattern = ( 

2474 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS" 

2475 r"\s+\((.*)\)\s*(?:virtual|stored)?" 

2476 ) 

2477 match = re.search( 

2478 re.escape(name) + pattern, tablesql, re.IGNORECASE 

2479 ) 

2480 if match: 

2481 sqltext = match.group(1) 

2482 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} 

2483 return colspec 

2484 

2485 def _resolve_type_affinity(self, type_): 

2486 """Return a data type from a reflected column, using affinity rules. 

2487 

2488 SQLite's goal for universal compatibility introduces some complexity 

2489 during reflection, as a column's defined type might not actually be a 

2490 type that SQLite understands - or indeed, my not be defined *at all*. 

2491 Internally, SQLite handles this with a 'data type affinity' for each 

2492 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', 

2493 'REAL', or 'NONE' (raw bits). The algorithm that determines this is 

2494 listed in https://www.sqlite.org/datatype3.html section 2.1. 

2495 

2496 This method allows SQLAlchemy to support that algorithm, while still 

2497 providing access to smarter reflection utilities by recognizing 

2498 column definitions that SQLite only supports through affinity (like 

2499 DATE and DOUBLE). 

2500 

2501 """ 

2502 match = re.match(r"([\w ]+)(\(.*?\))?", type_) 

2503 if match: 

2504 coltype = match.group(1) 

2505 args = match.group(2) 

2506 else: 

2507 coltype = "" 

2508 args = "" 

2509 

2510 if coltype in self.ischema_names: 

2511 coltype = self.ischema_names[coltype] 

2512 elif "INT" in coltype: 

2513 coltype = sqltypes.INTEGER 

2514 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: 

2515 coltype = sqltypes.TEXT 

2516 elif "BLOB" in coltype or not coltype: 

2517 coltype = sqltypes.NullType 

2518 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: 

2519 coltype = sqltypes.REAL 

2520 else: 

2521 coltype = sqltypes.NUMERIC 

2522 

2523 if args is not None: 

2524 args = re.findall(r"(\d+)", args) 

2525 try: 

2526 coltype = coltype(*[int(a) for a in args]) 

2527 except TypeError: 

2528 util.warn( 

2529 "Could not instantiate type %s with " 

2530 "reflected arguments %s; using no arguments." 

2531 % (coltype, args) 

2532 ) 

2533 coltype = coltype() 

2534 else: 

2535 coltype = coltype() 

2536 

2537 return coltype 

2538 

2539 @reflection.cache 

2540 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

2541 constraint_name = None 

2542 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2543 if table_data: 

2544 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY" 

2545 result = re.search(PK_PATTERN, table_data, re.I) 

2546 constraint_name = result.group(1) if result else None 

2547 

2548 cols = self.get_columns(connection, table_name, schema, **kw) 

2549 # consider only pk columns. This also avoids sorting the cached 

2550 # value returned by get_columns 

2551 cols = [col for col in cols if col.get("primary_key", 0) > 0] 

2552 cols.sort(key=lambda col: col.get("primary_key")) 

2553 pkeys = [col["name"] for col in cols] 

2554 

2555 if pkeys: 

2556 return {"constrained_columns": pkeys, "name": constraint_name} 

2557 else: 

2558 return ReflectionDefaults.pk_constraint() 

2559 

2560 @reflection.cache 

2561 def get_foreign_keys(self, connection, table_name, schema=None, **kw): 

2562 # sqlite makes this *extremely difficult*. 

2563 # First, use the pragma to get the actual FKs. 

2564 pragma_fks = self._get_table_pragma( 

2565 connection, "foreign_key_list", table_name, schema=schema 

2566 ) 

2567 

2568 fks = {} 

2569 

2570 for row in pragma_fks: 

2571 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) 

2572 

2573 if not rcol: 

2574 # no referred column, which means it was not named in the 

2575 # original DDL. The referred columns of the foreign key 

2576 # constraint are therefore the primary key of the referred 

2577 # table. 

2578 try: 

2579 referred_pk = self.get_pk_constraint( 

2580 connection, rtbl, schema=schema, **kw 

2581 ) 

2582 referred_columns = referred_pk["constrained_columns"] 

2583 except exc.NoSuchTableError: 

2584 # ignore not existing parents 

2585 referred_columns = [] 

2586 else: 

2587 # note we use this list only if this is the first column 

2588 # in the constraint. for subsequent columns we ignore the 

2589 # list and append "rcol" if present. 

2590 referred_columns = [] 

2591 

2592 if self._broken_fk_pragma_quotes: 

2593 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) 

2594 

2595 if numerical_id in fks: 

2596 fk = fks[numerical_id] 

2597 else: 

2598 fk = fks[numerical_id] = { 

2599 "name": None, 

2600 "constrained_columns": [], 

2601 "referred_schema": schema, 

2602 "referred_table": rtbl, 

2603 "referred_columns": referred_columns, 

2604 "options": {}, 

2605 } 

2606 fks[numerical_id] = fk 

2607 

2608 fk["constrained_columns"].append(lcol) 

2609 

2610 if rcol: 

2611 fk["referred_columns"].append(rcol) 

2612 

2613 def fk_sig(constrained_columns, referred_table, referred_columns): 

2614 return ( 

2615 tuple(constrained_columns) 

2616 + (referred_table,) 

2617 + tuple(referred_columns) 

2618 ) 

2619 

2620 # then, parse the actual SQL and attempt to find DDL that matches 

2621 # the names as well. SQLite saves the DDL in whatever format 

2622 # it was typed in as, so need to be liberal here. 

2623 

2624 keys_by_signature = { 

2625 fk_sig( 

2626 fk["constrained_columns"], 

2627 fk["referred_table"], 

2628 fk["referred_columns"], 

2629 ): fk 

2630 for fk in fks.values() 

2631 } 

2632 

2633 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2634 

2635 def parse_fks(): 

2636 if table_data is None: 

2637 # system tables, etc. 

2638 return 

2639 

2640 # note that we already have the FKs from PRAGMA above. This whole 

2641 # regexp thing is trying to locate additional detail about the 

2642 # FKs, namely the name of the constraint and other options. 

2643 # so parsing the columns is really about matching it up to what 

2644 # we already have. 

2645 FK_PATTERN = ( 

2646 r"(?:CONSTRAINT (\w+) +)?" 

2647 r"FOREIGN KEY *\( *(.+?) *\) +" 

2648 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501 

2649 r"((?:ON (?:DELETE|UPDATE) " 

2650 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" 

2651 r"((?:NOT +)?DEFERRABLE)?" 

2652 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" 

2653 ) 

2654 for match in re.finditer(FK_PATTERN, table_data, re.I): 

2655 ( 

2656 constraint_name, 

2657 constrained_columns, 

2658 referred_quoted_name, 

2659 referred_name, 

2660 referred_columns, 

2661 onupdatedelete, 

2662 deferrable, 

2663 initially, 

2664 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8) 

2665 constrained_columns = list( 

2666 self._find_cols_in_sig(constrained_columns) 

2667 ) 

2668 if not referred_columns: 

2669 referred_columns = constrained_columns 

2670 else: 

2671 referred_columns = list( 

2672 self._find_cols_in_sig(referred_columns) 

2673 ) 

2674 referred_name = referred_quoted_name or referred_name 

2675 options = {} 

2676 

2677 for token in re.split(r" *\bON\b *", onupdatedelete.upper()): 

2678 if token.startswith("DELETE"): 

2679 ondelete = token[6:].strip() 

2680 if ondelete and ondelete != "NO ACTION": 

2681 options["ondelete"] = ondelete 

2682 elif token.startswith("UPDATE"): 

2683 onupdate = token[6:].strip() 

2684 if onupdate and onupdate != "NO ACTION": 

2685 options["onupdate"] = onupdate 

2686 

2687 if deferrable: 

2688 options["deferrable"] = "NOT" not in deferrable.upper() 

2689 if initially: 

2690 options["initially"] = initially.upper() 

2691 

2692 yield ( 

2693 constraint_name, 

2694 constrained_columns, 

2695 referred_name, 

2696 referred_columns, 

2697 options, 

2698 ) 

2699 

2700 fkeys = [] 

2701 

2702 for ( 

2703 constraint_name, 

2704 constrained_columns, 

2705 referred_name, 

2706 referred_columns, 

2707 options, 

2708 ) in parse_fks(): 

2709 sig = fk_sig(constrained_columns, referred_name, referred_columns) 

2710 if sig not in keys_by_signature: 

2711 util.warn( 

2712 "WARNING: SQL-parsed foreign key constraint " 

2713 "'%s' could not be located in PRAGMA " 

2714 "foreign_keys for table %s" % (sig, table_name) 

2715 ) 

2716 continue 

2717 key = keys_by_signature.pop(sig) 

2718 key["name"] = constraint_name 

2719 key["options"] = options 

2720 fkeys.append(key) 

2721 # assume the remainders are the unnamed, inline constraints, just 

2722 # use them as is as it's extremely difficult to parse inline 

2723 # constraints 

2724 fkeys.extend(keys_by_signature.values()) 

2725 if fkeys: 

2726 return fkeys 

2727 else: 

2728 return ReflectionDefaults.foreign_keys() 

2729 

2730 def _find_cols_in_sig(self, sig): 

2731 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): 

2732 yield match.group(1) or match.group(2) 

2733 

2734 @reflection.cache 

2735 def get_unique_constraints( 

2736 self, connection, table_name, schema=None, **kw 

2737 ): 

2738 auto_index_by_sig = {} 

2739 for idx in self.get_indexes( 

2740 connection, 

2741 table_name, 

2742 schema=schema, 

2743 include_auto_indexes=True, 

2744 **kw, 

2745 ): 

2746 if not idx["name"].startswith("sqlite_autoindex"): 

2747 continue 

2748 sig = tuple(idx["column_names"]) 

2749 auto_index_by_sig[sig] = idx 

2750 

2751 table_data = self._get_table_sql( 

2752 connection, table_name, schema=schema, **kw 

2753 ) 

2754 unique_constraints = [] 

2755 

2756 def parse_uqs(): 

2757 if table_data is None: 

2758 return 

2759 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)' 

2760 INLINE_UNIQUE_PATTERN = ( 

2761 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]' 

2762 r"+[a-z0-9_ ]+?[\t ]+UNIQUE" 

2763 ) 

2764 

2765 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): 

2766 name, cols = match.group(1, 2) 

2767 yield name, list(self._find_cols_in_sig(cols)) 

2768 

2769 # we need to match inlines as well, as we seek to differentiate 

2770 # a UNIQUE constraint from a UNIQUE INDEX, even though these 

2771 # are kind of the same thing :) 

2772 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): 

2773 cols = list( 

2774 self._find_cols_in_sig(match.group(1) or match.group(2)) 

2775 ) 

2776 yield None, cols 

2777 

2778 for name, cols in parse_uqs(): 

2779 sig = tuple(cols) 

2780 if sig in auto_index_by_sig: 

2781 auto_index_by_sig.pop(sig) 

2782 parsed_constraint = {"name": name, "column_names": cols} 

2783 unique_constraints.append(parsed_constraint) 

2784 # NOTE: auto_index_by_sig might not be empty here, 

2785 # the PRIMARY KEY may have an entry. 

2786 if unique_constraints: 

2787 return unique_constraints 

2788 else: 

2789 return ReflectionDefaults.unique_constraints() 

2790 

2791 @reflection.cache 

2792 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

2793 table_data = self._get_table_sql( 

2794 connection, table_name, schema=schema, **kw 

2795 ) 

2796 

2797 # NOTE NOTE NOTE 

2798 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way 

2799 # to parse CHECK constraints that contain newlines themselves using 

2800 # regular expressions, and the approach here relies upon each 

2801 # individual 

2802 # CHECK constraint being on a single line by itself. This 

2803 # necessarily makes assumptions as to how the CREATE TABLE 

2804 # was emitted. A more comprehensive DDL parsing solution would be 

2805 # needed to improve upon the current situation. See #11840 for 

2806 # background 

2807 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *" 

2808 cks = [] 

2809 

2810 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I): 

2811 

2812 name = match.group(1) 

2813 

2814 if name: 

2815 name = re.sub(r'^"|"$', "", name) 

2816 

2817 cks.append({"sqltext": match.group(2), "name": name}) 

2818 cks.sort(key=lambda d: d["name"] or "~") # sort None as last 

2819 if cks: 

2820 return cks 

2821 else: 

2822 return ReflectionDefaults.check_constraints() 

2823 

2824 @reflection.cache 

2825 def get_indexes(self, connection, table_name, schema=None, **kw): 

2826 pragma_indexes = self._get_table_pragma( 

2827 connection, "index_list", table_name, schema=schema 

2828 ) 

2829 indexes = [] 

2830 

2831 # regular expression to extract the filter predicate of a partial 

2832 # index. this could fail to extract the predicate correctly on 

2833 # indexes created like 

2834 # CREATE INDEX i ON t (col || ') where') WHERE col <> '' 

2835 # but as this function does not support expression-based indexes 

2836 # this case does not occur. 

2837 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) 

2838 

2839 if schema: 

2840 schema_expr = "%s." % self.identifier_preparer.quote_identifier( 

2841 schema 

2842 ) 

2843 else: 

2844 schema_expr = "" 

2845 

2846 include_auto_indexes = kw.pop("include_auto_indexes", False) 

2847 for row in pragma_indexes: 

2848 # ignore implicit primary key index. 

2849 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html 

2850 if not include_auto_indexes and row[1].startswith( 

2851 "sqlite_autoindex" 

2852 ): 

2853 continue 

2854 indexes.append( 

2855 dict( 

2856 name=row[1], 

2857 column_names=[], 

2858 unique=row[2], 

2859 dialect_options={}, 

2860 ) 

2861 ) 

2862 

2863 # check partial indexes 

2864 if len(row) >= 5 and row[4]: 

2865 s = ( 

2866 "SELECT sql FROM %(schema)ssqlite_master " 

2867 "WHERE name = ? " 

2868 "AND type = 'index'" % {"schema": schema_expr} 

2869 ) 

2870 rs = connection.exec_driver_sql(s, (row[1],)) 

2871 index_sql = rs.scalar() 

2872 predicate_match = partial_pred_re.search(index_sql) 

2873 if predicate_match is None: 

2874 # unless the regex is broken this case shouldn't happen 

2875 # because we know this is a partial index, so the 

2876 # definition sql should match the regex 

2877 util.warn( 

2878 "Failed to look up filter predicate of " 

2879 "partial index %s" % row[1] 

2880 ) 

2881 else: 

2882 predicate = predicate_match.group(1) 

2883 indexes[-1]["dialect_options"]["sqlite_where"] = text( 

2884 predicate 

2885 ) 

2886 

2887 # loop thru unique indexes to get the column names. 

2888 for idx in list(indexes): 

2889 pragma_index = self._get_table_pragma( 

2890 connection, "index_info", idx["name"], schema=schema 

2891 ) 

2892 

2893 for row in pragma_index: 

2894 if row[2] is None: 

2895 util.warn( 

2896 "Skipped unsupported reflection of " 

2897 "expression-based index %s" % idx["name"] 

2898 ) 

2899 indexes.remove(idx) 

2900 break 

2901 else: 

2902 idx["column_names"].append(row[2]) 

2903 

2904 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last 

2905 if indexes: 

2906 return indexes 

2907 elif not self.has_table(connection, table_name, schema): 

2908 raise exc.NoSuchTableError( 

2909 f"{schema}.{table_name}" if schema else table_name 

2910 ) 

2911 else: 

2912 return ReflectionDefaults.indexes() 

2913 

2914 def _is_sys_table(self, table_name): 

2915 return table_name in { 

2916 "sqlite_schema", 

2917 "sqlite_master", 

2918 "sqlite_temp_schema", 

2919 "sqlite_temp_master", 

2920 } 

2921 

2922 @reflection.cache 

2923 def _get_table_sql(self, connection, table_name, schema=None, **kw): 

2924 if schema: 

2925 schema_expr = "%s." % ( 

2926 self.identifier_preparer.quote_identifier(schema) 

2927 ) 

2928 else: 

2929 schema_expr = "" 

2930 try: 

2931 s = ( 

2932 "SELECT sql FROM " 

2933 " (SELECT * FROM %(schema)ssqlite_master UNION ALL " 

2934 " SELECT * FROM %(schema)ssqlite_temp_master) " 

2935 "WHERE name = ? " 

2936 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2937 ) 

2938 rs = connection.exec_driver_sql(s, (table_name,)) 

2939 except exc.DBAPIError: 

2940 s = ( 

2941 "SELECT sql FROM %(schema)ssqlite_master " 

2942 "WHERE name = ? " 

2943 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2944 ) 

2945 rs = connection.exec_driver_sql(s, (table_name,)) 

2946 value = rs.scalar() 

2947 if value is None and not self._is_sys_table(table_name): 

2948 raise exc.NoSuchTableError(f"{schema_expr}{table_name}") 

2949 return value 

2950 

2951 def _get_table_pragma(self, connection, pragma, table_name, schema=None): 

2952 quote = self.identifier_preparer.quote_identifier 

2953 if schema is not None: 

2954 statements = [f"PRAGMA {quote(schema)}."] 

2955 else: 

2956 # because PRAGMA looks in all attached databases if no schema 

2957 # given, need to specify "main" schema, however since we want 

2958 # 'temp' tables in the same namespace as 'main', need to run 

2959 # the PRAGMA twice 

2960 statements = ["PRAGMA main.", "PRAGMA temp."] 

2961 

2962 qtable = quote(table_name) 

2963 for statement in statements: 

2964 statement = f"{statement}{pragma}({qtable})" 

2965 cursor = connection.exec_driver_sql(statement) 

2966 if not cursor._soft_closed: 

2967 # work around SQLite issue whereby cursor.description 

2968 # is blank when PRAGMA returns no rows: 

2969 # https://www.sqlite.org/cvstrac/tktview?tn=1884 

2970 result = cursor.fetchall() 

2971 else: 

2972 result = [] 

2973 if result: 

2974 return result 

2975 else: 

2976 return []